最近遇到的业务场景:
如果用户输入时上一次的初始化还没结束,就会出现多次初始化的并行。我目前的处理方式是初始化开始时记下当前状态,每结束一个串行的异步任务,都进行一次状态比对。简化后的代码如下:
/**
* @type {number | undefined}
*/
let state = undefined
/**
* @param {number} ms
* @returns {Promise<void>}
* @description sleep for ms milliseconds to simulate async task
*/
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
/**
* @returns {Promise<void>}
* @description init async task
*/
async function init() {
const current = Date.now()
state = current
await sleep(500)
if (state !== current) {
console.warn(`state ${current} init: canceled`)
return
}
await sleep(500)
if (state !== current) {
console.warn(`state ${current} init: canceled`)
return
}
console.log(`state ${current} init: done`)
state = undefined
}
但我感觉这种做法不太合理,虽然上面用当前时间戳模拟了状态,但实际上业务场景中可能会出现某几次用户输入一致,所以状态一致,进而导致这几次输入执行的初始化过程无法正常中断。像 C#中有 CancellationToken
可以直接调用 Cancel
取消异步任务。不知道 JS 是否拥有类似的设计,或者对于这类业务场景有更好的处理方式?
1
chenliangngng 127 天前
为什么要用时间戳,直接存储状态不就可以了
不论是 promise 和 ajax 都可以取消 |
2
Pencillll 127 天前 via Android
我一般把整个流程写成一个 task ,并提供 cancel 函数,需要取消时就主动调用 task.cancel()
|
3
pursuer 127 天前
js 现在还没有自动取消异步任务的方法。可以用 AbortSignal ,但要每次 await 前 throwIfAbort 。或实现一个类似效果的东西。
|
4
rabbbit 127 天前
const sleep = (time) => {
return new Promise((resolve) => { setTimeout(() => resolve(), time); }); }; const task = (id) => { let _reject; const p = new Promise(async (resolve, reject) => { _reject = reject; const time = Math.random() * 10; await sleep(time); resolve({ id, time }); }).then(({ id, time }) => console.log(id, time)); return () => _reject(); }; const main = () => { let cancelFn; for (let i = 0; i < 5; i++) { cancelFn?.(); cancelFn = task(i); } }; main(); |
5
DOLLOR 127 天前
不要用时间戳,可以用 symbol 来标记状态。Symbol()创建的每个 symbol 都是唯一的。
|
6
dvsilch OP @rabbbit 这个方式我之前实现过,但是存在麻烦的地方是:业务场景需要外部和内部都拥有 cancel 的能力,结果就是需要暴露一个额外的方法同时内部持有这个 reject ,担心会出现因为持有 reject 导致 promise 无法从内存中释放的情况
|
7
dvsilch OP @chenliangngng
@DOLLOR @pursuer 不好意思主贴里没说清楚,代码里的状态只是随便写了一个东西,实际业务并不是这么实现的。而且,与其说会因为状态出现 bug ,更不如说是我感觉这种「每次异步任务后需要主动判断状态」的做法太傻逼了... |
8
doommm 127 天前
我在业务中也遇到了这种问题,有一连串的异步调用链需要取消(可能执行到任意一步,不需要撤销,只要停止执行后续就行),没啥头绪。
有一些比较简单的场景,像异步拉取一些列表数据之类的,我尝试过用 Rxjs 来处理。switchMap 还是挺好用的 |
9
rabbbit 127 天前
脑洞,不推荐这么写
const sleep = (time) => { return new Promise((resolve) => { setTimeout(() => resolve(), time); }); }; async function* task(id) { yield sleep(Math.random() * 100); console.log('id: ', id, 'num: ', 1); yield sleep(Math.random() * 100); console.log('id: ', id, 'num: ', 2); yield sleep(Math.random() * 100); console.log('id: ', id, 'num: ', 3); } const main = async () => { for (let i = 0; i < 5; i++) { const start = Date.now(); for await (const t of task(i)) { const diff = Date.now() - start; if (diff > 100) { break; } else { console.log("time: ", diff); } } console.log('end ---') } }; main(); |
10
rabbbit 127 天前 1
排版不好,放 github 了,另外还有一些例子
https://gist.github.com/Aaron-Bird/42359bf78fe0e868946cb5897f6ca7cd |
11
chnwillliu 127 天前 via Android 1
rxjs switchMap ?
|
12
Projection 127 天前
用 RxJS 可以方便地实现,跟 AI 交流了一会后给出了如下代码:
import { Subject, from } from 'rxjs'; import { switchMap } from 'rxjs/operators'; // 模拟异步任务 A 、B 、C const taskA = (input) => { console.log(`Starting task A with input: ${input}`); return new Promise((resolve) => { setTimeout(() => { console.log(`Finishing task A with input: ${input}`); resolve(input + 'A'); }, 1000); // 假设任务 A 需要 1 秒钟 }); }; const taskB = (input) => { console.log(`Starting task B with input: ${input}`); return new Promise((resolve) => { setTimeout(() => { console.log(`Finishing task B with input: ${input}`); resolve(input + 'B'); }, 1000); // 假设任务 B 需要 1 秒钟 }); }; const taskC = (input) => { console.log(`Starting task C with input: ${input}`); return new Promise((resolve) => { setTimeout(() => { console.log(`Finishing task C with input: ${input}`); resolve(input + 'C'); }, 1000); // 假设任务 C 需要 1 秒钟 }); }; const subject = new Subject(); subject .pipe( switchMap((value) => from(taskA(value)).pipe( switchMap((resultA) => from(taskB(resultA))), switchMap((resultB) => from(taskC(resultB))), ), ), ) .subscribe((result) => { console.log('Final result:', result); }); // 发出一些值 subject.next('1'); // 发出第一个值 setTimeout(() => subject.next('2'), 500); // 在 0.5 秒后发出第二个值,中止第一个任务并开始新的任务 setTimeout(() => subject.next('3'), 2000); // 在 2 秒后发出第三个值 输出结果: Starting task A with input: 1 Starting task A with input: 2 Finishing task A with input: 1 Finishing task A with input: 2 Starting task B with input: 2A Starting task A with input: 3 Finishing task B with input: 2A Finishing task A with input: 3 Starting task B with input: 3A Finishing task B with input: 3A Starting task C with input: 3AB Finishing task C with input: 3AB Final result: 3ABC 实际上用全套 RxJS API 更简单,无奈 API 忘了好多 |
13
Projection 127 天前
给输出结果加个时间线:
[1] Starting task A with input: 1 [514] Starting task A with input: 2 [1010] Finishing task A with input: 1 [1537] Finishing task A with input: 2 [1539] Starting task B with input: 2A [2019] Starting task A with input: 3 [2542] Finishing task B with input: 2A [3023] Finishing task A with input: 3 [3024] Starting task B with input: 3A [4027] Finishing task B with input: 3A [4028] Starting task C with input: 3AB [5038] Finishing task C with input: 3AB [5040] Final result: 3ABC |
14
Projection 127 天前
@rabbbit 用生成器确实是一个好办法,在可被取消的时间点添加 yield 。暂停后只要不调用 next() 方法就不会继续执行,这样就需要自己写一个执行器,调度的时机可以自己灵活控制,像 co 那种执行器是自动执行的,async 函数也是自动执行的。
|
15
Projection 127 天前 1
我用生成器简单实现了一下,代码 https://pastebin.com/hi04KbPd
function delay(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } const then = Date.now(); async function task(name, input) { console.log(`[${Date.now() - then}] ${name}: start, input: ${input}`); await delay(1_000); console.log(`[${Date.now() - then}] ${name}: done, input: ${input}`); return input + name; } let token = 0; async function onUpdate(input) { let current = ++token; async function* gen() { let result = await task('a', input); yield; result = await task('b', result); yield; result = await task('c', result); console.log(`[${Date.now() - then}] final result: ${result}`); return result; } for await (const _ of gen()) { if (token !== current) break; } } onUpdate('1'); await delay(500); onUpdate('2'); await delay(2_000); onUpdate('3'); 输出结果: [0] a: start, input: 1 [510] a: start, input: 2 [1006] a: done, input: 1 [1526] a: done, input: 2 [1527] b: start, input: 2a [2516] a: start, input: 3 [2533] b: done, input: 2a [3520] a: done, input: 3 [3520] b: start, input: 3a [4528] b: done, input: 3a [4530] c: start, input: 3ab [5536] c: done, input: 3ab [5537] final result: 3abc |
16
paopjian 127 天前
这好像是个面试题,串行异步任务链的中止与继续,你看是这个么
function processTasks(...tasks) { let isRunning = false; // 是否正在执行,初始化 false,不然第一次启动不了 const results = []; // 保存执行结果 let i = 0; // 当前任务索引 let prom = null; return { start() { return new Promise(async (resolve, reject) => { if (prom) { // 结束了 prom.then(resolve, reject); return; } if (isRunning) return; isRunning = true; while (i < tasks.length) { try { results.push(await tasks[i]()); } catch (err) { isRunning = false; reject(err); prom = Promise.reject(err); return; } i++; if (!isRunning && i < tasks.length) return; //中断 } // 全部执行完毕 isRunning = false; resolve(results); prom = Promise.resolve(results); }); }, pause() { console.log('暂停任务'); isRunning = false; }, } } |
17
amlee 126 天前
https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
有一个 AbortSignal ,似乎可以做到,看下上面那个链接里面的 implementing_an_abortable_api 这一节 |
18
iidear2015 126 天前
感觉你逻辑挺合理的,好像只能写个工具方法重构一下
``` // 一个 task 包含多个异步 job // 每个 job 运行前需要校验当前 task 是否在运行 // 运行新 task 时取消之前的 task const createScheduler = (jobs) => { let runningTask = 0; const run = async () => { runningTask += 1; const thisTask = runningTask; for (let job of jobs) { if (thisTask === runningTask) { await job(); } else { return; } } }; return run; }; const init = createScheduler([ () => sleep(500), () => sleep(500), () => sleep(500) ]); ``` |
19
jones2000 126 天前
做一个任务队列,输入一次,移除上一次剩余的没有完成的任务,再往队列里面加一组异步的任务, 异步的任务处理从任务队列里面取。
|
20
dvsilch OP @pursuer
@amlee 看了下,AbortSignal 大概也需要在没做支持的异步任务上套一层 Promise 或者每次异步结束前做状态检测,也不是很好 @rabbbit @Projection rxjs 之前也有看过,无奈确实是经验不够一眼瞎。目前看来对我而言修改起来最简单的方式,就是自己写一个 generator 然后来手动判定是否执行下一步了,至少把状态判断统一在了 for 循环里,少写不少东西 |
22
edward1987 126 天前
也可以简单的写个装饰函数,把所有异步任务的函数都装饰一遍再调用
function trans(funcN){ return function(...args){ if(state!==current){return}; return funcN(...args) } } 然后重新赋值每个异步任务函数 funcA = trans(funcA); funcB= trans(funcB); |
23
yigefanqie 125 天前
没太理解业务场景,这不应该是给用户输入的事件加 debounce 的事吗
|
24
nzbin 125 天前
只要是异步,毫无疑问 rxjs 就是更好的处理方式
|
25
ddch1997 125 天前
我是用 redux-saga ,fork 一个异步 task 任务,然后 take 一个停止 action ,cancel 掉这个 task 任务就好
|
26
cjd6568358 124 天前
把所有的串行异步用 promise.then 链式连接起来。对外暴露 promise,每次用户有新输入调用 promise.reject 终止异步任务链。然后生成新的
|
27
RabbitDR 124 天前
@yigefanqie 不一样,他是想取消并行的初始化,防抖不会取消函数的执行。比如连续输入 abc 后 abc 三次输入被防抖了,开始初始化,但这时用户再输入 d ,会进行第二次初始化,他想第二次初始化时如果第一次还在初始化则中止第一次的初始化。
|
28
yigefanqie 124 天前
我理解的这样:用户输入时的异步串行任务是不是都是无副作用的,如果是无作用的,直接丢弃之前的初始化,重新初始化就行了,配合上 debounce 降低下初始化触发频率。丢弃掉的初始化如果都是 promise 不用担心内存问题,会自动回收的。
|
29
yigefanqie 124 天前
@RabbitDR 不太会回复。回复发在楼上一层了。
|
30
tsanie 123 天前
@yigefanqie #28 一般初始化都是资源占用型过程,最佳实践还是直接中断。
不过对 op 提到的 CancellationToken 有些疑问,token 自身是无法 Cancel 的,只能通过 IsCancellationRequested 获取终止状态或者调用 ThrowIfCancellationRequested() 在被终止时抛出异常,和 js 中的 AbortSignal 是一样样的( aborted, throwIfAborted()) 可以主动调用 Cancel 的是 CancellationTokenSource ,对应在 js 中是 AbortController |
31
dvsilch OP @tsanie
是我笔误了,确实是 CancellationTokenSource(cts),C#里可以做到外部内部各提供一个 cts ,初始化时将两个 cts link 然后将 ct 链式传递到各个异步任务,需要中断时任意一个 cts 直接 cancel 即可,会立刻走到 catch OperationCanceledException 的分支 但我看了一下 AbortController 似乎只能做到对 fetch api 执行中断,不是特别符合我当前的业务场景...不仅仅是网络请求,还有一些文件的读取、以及等待用户另外输入的 Promise 。这部分如果取消不掉的话,我能想到的形式就是要么把各个 reject 动态推入移出队列,要么就是每一轮异步任务结束后判断状态。两种做法都感觉不太合理,所以目前的做法是暂时跟 #9 #15 一样写一个自定义的 generator 来统一这部分逻辑,有空再去研究研究 rxjs 的实现 |
32
zy445566 123 天前
|
33
forty 93 天前
用单独的线程执行任务,需要取消的时候直接杀线程,简单粗暴又有效
|