V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
AboPlus
V2EX  ›  程序员

1024 快乐!请教下各位大佬一个关于 celery worker 的问题!

  •  
  •   AboPlus · 2023-10-24 12:04:52 +08:00 · 1006 次点击
    这是一个创建于 402 天前的主题,其中的信息可能已经有所发展或是发生改变。

    说下现在的使用场景,我们目前有一个需要计算的任务提交给超算上的求解器进行求解计算,在计算的过程中会产生大量的计算结果文件,但是文件过多后续不容易管理,准备改成将数据都追加到一个二进制文件中,在这个过程中,我们需要实时的把计算结果文件进行解析、存储等操作,这个过程是需要实时交互的,所以每次得到结果都需要实时做处理。

    这个处理的过程使用的是 celery 完成的,现在的问题是同一个任务我们会推送到同一个队列中去,但是同一个任务会开启多个 worker 进行工作,这就导致可能会出现处理速度不一致、即后处理的任务先完成的可能(简单来说就是不会按照顺序执行),在这种情况下如果我想使用对象存储( COS )的追加上传,就需要保证每次追加上传数据的起始位置的确定的,也就是需要一个一个结果文件才能保证应该从哪个位置进行追加上传。

    对于这种场景,想请教下各位大佬有没有什么好的建议?

    比如 celery 存不存在什么机制帮助完成这个场景,或者有什么其他的方案完成?

    16 条回复    2023-10-26 11:26:42 +08:00
    AboPlus
        1
    AboPlus  
    OP
       2023-10-24 13:50:41 +08:00
    补充下,后端框架用的 django ,目前问题蛮多的,包括 celery 僵死进程和内存泄漏问题等...不知道各位大佬是否遇到过这样的问题?以及有什么解决方案或者思路没
    Raikiriww
        2
    Raikiriww  
       2023-10-24 13:50:57 +08:00   ❤️ 1
    设置 worker --concurrency=1 可以使同时只有一个任务执行。
    AboPlus
        3
    AboPlus  
    OP
       2023-10-24 13:59:30 +08:00
    @Raikiriww
    是这样的,不同的任务(任务 1 ,任务 2 ,任务 3 )产生的结果可能推送到同一个队列中,同时这个队列如果有三个 worker ( worker1 ,worker2 ,worker3 )的话,那么 worker1 处理任务 1 的数据那么后续任务 1 的结果数据都由 worker1 来处理不会被 worker2 和 worker3 处理,其他同理,如果能这样的话就既能实现不同任务间的并发处理,又能实现一个任务间的顺序处理,不知道这种思路能否实现
    zhuxuanyu0720
        4
    zhuxuanyu0720  
       2023-10-24 14:12:38 +08:00   ❤️ 1
    1.在 COS 存储端,可以考虑用每个 object name 来决定顺序,例如加上时间戳等唯一 ID,而不是用 append 的方式。
    2.也可以考虑使用消息队列,Worker 按序从 MQ 中消费结果文件路径,进一步确保顺序。
    3.对结果文件本身,也可以添加序号或标识,这样即使顺序错乱,解析时也能恢复正确顺序
    slark2020
        5
    slark2020  
       2023-10-24 14:20:38 +08:00   ❤️ 1
    下发任务的时候先生成任务的序列号 seq ,把序列号也下发给任务。任务完成后可以根据自己的序列号,来处理结果存储顺序的问题。
    AboPlus
        6
    AboPlus  
    OP
       2023-10-24 14:21:19 +08:00
    @zhuxuanyu0720
    1.准备采用 append 的方式主要是计算出的结果文件太多了,所以考虑准备在计算过程中就以追加的形式将结果追加到一个二进制文件中,如果这样修改那么在上传的时候要么反复上传(这样每次会越来越慢),要么就用追加的形式,文件太多不好进行管理也是其中一个原因
    2.Worker 按序从 MQ 中消费结果文件路径这一步得保证一个队列中只能有一个 worker 在工作,这样就不能使用多 worker 了,提前多创建几个队列代替,这样需要更大的内存?
    AboPlus
        7
    AboPlus  
    OP
       2023-10-24 14:35:18 +08:00
    @slark2020
    目前类似是这么做的,但是这么做在进行上传的时候做不到 append 上传,假如任务产生了数据 1-数据 10 ,我一个队列有三个 worker 在工作,那么同一时间可能数据 1 数据 2 数据 3 都有可能被处理并且处理的顺序确定不下来,就导致数据上传的时候可能出现问题
    slark2020
        8
    slark2020  
       2023-10-24 14:51:57 +08:00   ❤️ 2
    那用 celery workflow 呢: https://docs.celeryq.dev/en/stable/userguide/canvas.html#groups
    使用 Group 形式编排 10 个任务下发到 worker ,任务处理完成后,用一个后续的任务处理这 10 个任务的结果,后续任务只需要按编排的顺序,处理每个 Task 的结果上传到 COS 。不过这种需要 celery mq 可以存储得下你的结果文件。
    Raikiriww
        9
    Raikiriww  
       2023-10-24 15:42:15 +08:00
    @AboPlus #3 这个我就不清楚了,我是最近刚部署 Celery 用,只知道设置并发这种简单的用法。
    AboPlus
        10
    AboPlus  
    OP
       2023-10-24 18:06:52 +08:00
    @slark2020 我试试这种方式能不能行,谢谢大佬提供的思路!
    AboPlus
        11
    AboPlus  
    OP
       2023-10-24 18:07:06 +08:00
    @Raikiriww 好的,感谢!
    SmiteChow
        12
    SmiteChow  
       2023-10-24 18:32:18 +08:00   ❤️ 1
    建议把顺序写入参数保存
    AboPlus
        13
    AboPlus  
    OP
       2023-10-25 09:08:04 +08:00
    @SmiteChow 之前考虑过这个方式,但是并发情况下不太好处理
    misoomang
        14
    misoomang  
       2023-10-25 14:50:57 +08:00   ❤️ 1
    似乎使用 celery 的 chord 方法场景更合适,可以并发处理任务的结果,当所有结果准备好以后进行回调做顺序的编排

    https://docs.celeryq.dev/en/stable/userguide/canvas.html#chords
    AboPlus
        15
    AboPlus  
    OP
       2023-10-25 18:43:19 +08:00
    @misoomang 是的,今天一直在研究这个,恰好遇到一个没能解决的问题,就是示例中的在执行 result.get()这一步操作的时候,我每次执行都会卡住程序,实际上程序已经执行完了,但是不清楚为什么没有返回结果,导致程序卡住了,一直没搞明白是什么原因导致的,大佬是否有一些头绪?
    AboPlus
        16
    AboPlus  
    OP
       2023-10-26 11:26:42 +08:00
    @misoomang 找到原因啦,被自己蠢到了,我在配置文件中添加了 CELERY_IGNORE_RESULT = True,所以 get 不到结果
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2882 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 03:44 · PVG 11:44 · LAX 19:44 · JFK 22:44
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.