V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
MySQL 5.5 Community Server
MySQL 5.6 Community Server
Percona Configuration Wizard
XtraBackup 搭建主从复制
Great Sites on MySQL
Percona
MySQL Performance Blog
Severalnines
推荐管理工具
Sequel Pro
phpMyAdmin
推荐书目
MySQL Cookbook
MySQL 相关项目
MariaDB
Drizzle
参考文档
http://mysql-python.sourceforge.net/MySQLdb.html
Keshawn
V2EX  ›  MySQL

mysql 实现任务队列的疑问

  •  
  •   Keshawn · 2016-05-14 13:10:54 +08:00 · 7554 次点击
    这是一个创建于 3117 天前的主题,其中的信息可能已经有所发展或是发生改变。

    请教各位,我现在有个系统,想由原来的单个 worker 处理任务扩展成多个 worker ,有些问题没考虑明白,希望能够得到指点。

    场景

    用户请求对应一个作业 job ,存放在 Mysql 的 job 表中,通过 status 字段判断状态( pending :待处理, finished :已完成,等等),后台有多个 worker 轮询 job 表,读取 pending 状态的 job ,然后进行处理

    问题

    1. worker 轮询的条件仅仅是根据 job 的状态,如何保证多个 worker 不会读到同一条记录?读到后立即 job 状态吗?
    2. 因为不限制的话, worker 会把当前所有 pending job 读取出来,假如让 worker 每次只读取一条记录,如何实现?需要什么判断条件?
    3. 有没有必要使用 redis 来实现任务队列?
    4. 不知道还有没有其他更好的方案
    22 条回复    2016-07-19 11:03:25 +08:00
    des
        1
    des  
       2016-05-14 13:45:03 +08:00 via Android
    如果不多的话可以用锁,
    多了可以开一个 worker 专门用来分发任务
    spance
        2
    spance  
       2016-05-14 14:09:34 +08:00
    用 RDBMS 做共享通信,代价太大,还不合适。
    这不就是典型的消息中间件的应用场景么, activeMQ, rabbitMQ 还有很多。
    非要用 RDBMS 基本上就是锁表了,如果你要严格的并发一致性的话,基本上就没什么性能可言了。
    neoblackcap
        3
    neoblackcap  
       2016-05-14 14:57:08 +08:00
    redis 去做消息队列比你 MySQL 强多了。
    redis 是在内存里面跑,还没有锁,一个 pub/sub 即可搞定。
    当然你对消息要求更高的话就上各种消息队列中间件, rabbitMQ 什么的,能对你的消息进行持久化。
    cevincheung
        4
    cevincheung  
       2016-05-14 15:16:38 +08:00
    轻量级消息队列 beanstalkd
    lecher
        5
    lecher  
       2016-05-14 15:31:18 +08:00
    redis 做起来更方便, lpub/lsub 都是原子操作。
    业务逻辑实现起来更简单,代码量也少。

    mysql 一定要做,可以考虑这样的操作:
    update task set status='process' where status='pending' limit 1;
    只要并发不是很恐怖,这个操作可以当成是原子操作,只有符合 status='pending'的才会被更新,拿到成功更新的状态就说明成功取到任务,然后取具体的数据进行处理。
    armoni
        6
    armoni  
       2016-05-14 15:33:07 +08:00
    如上, MQ 即可
    miaoever
        7
    miaoever  
       2016-05-14 15:55:16 +08:00 via Android   ❤️ 1
    我们线上某个系统就使用了 db 作为消息队列(各种历史原因吧)。 系统有多个 slave 作为生产者会向 db 中插入消息( insert ), 同时,根据(固定的)消费者数量,插入的时候还会给每条消息上的某个字段填上任意一个消费者的唯一标示。有一个 master 会起若干线程作为消费者去 db 中捞取属于自己的消息进行消费( select, delete )。可能是目前消息量还不够大?(每天百万级别没到千万),遇到的问题基本都不在 db 的性能上。最大的困扰是为了实现很多 MQ 基本的语义(比如上面提到的 consume once ),你需要自己在业务上做很多工作,而你会发现自己在业务代码中来保证语义正确同时兼顾水平扩展性,确实不是件容易的事情。
    miaoever
        8
    miaoever  
       2016-05-14 16:02:37 +08:00 via Android
    再补充一下我的观点,遇到需要做持久化的场景,数据库的性能很有竞争力。更多需要考虑的是业务上的场景和需求。
    yangyaofei
        9
    yangyaofei  
       2016-05-14 17:46:25 +08:00
    内存里维护一份 job 然后加锁不行么?
    yangyaofei
        10
    yangyaofei  
       2016-05-14 18:06:30 +08:00
    最近也在做类似的东西,但是我的 job 的数量会很少....

    我是直接进程间队列传输 job(我叫 task)的参数和要执行的 action,给 taskManager,启动一个进程,或者终结一个进程等等任务.然后确定启动或者终止后会在相应的 taskList 中添加或者除去这个 task.

    处理的结果和 task 运行中的状态由 task 自己写入数据库

    获取结果的时候直接从另一个进程中获取


    当然我这个简单,没有考虑到 task 很多需要排队的问题,关于这个我是这样设想的.用信号量(其实一个 queue 啊什么的都一样),信号量初始值是 process_MAX,taskManager 将其-1 一次就生成一个 task,等于 0 就 block,task 完成就将这个信号量加 1.
    另外有一个进程专门将没有开始的 job 扔进一个 queue 里面,到了 max_size 就 block.taskManager 每次从这个 queue 里面取一个 task 执行就好了


    当然我这个是小量的时候管用,大量了锁使瓶颈吧...可以将这个开 N 倍,每个部分负责 task_ID%N=1,2,3,4 等等部分就可以了吧....或者还可以精简构架,但是大概意思就是这个意思
    gamexg
        11
    gamexg  
       2016-05-14 18:16:58 +08:00 via Android
    用过数据库做过任务队列,不过我每个任务最少需要半个小时,所以不用在意性能。
    当时是使用的事务+ select update + limit 1 来取任务,在事务内更新状态。
    realpg
        12
    realpg  
       2016-05-14 18:53:09 +08:00
    非要用 mysql
    select for update
    做好 limit 即可
    mahone3297
        13
    mahone3297  
       2016-05-15 17:24:51 +08:00
    大家都说锁。
    锁的问题是,当你多个 worker 起来去消费这个消息的时候,大家都在争这个消息,最后只有 1 个人得到了,其他的人都没得到,你多个 worker 的效果是完全没有的。
    还是最好要限制某个消息只能某个 worker 能获取比较好。
    kimmykuang
        14
    kimmykuang  
       2016-05-15 22:33:47 +08:00
    我们现在线上的做法是开了 10 张 queue 表,起 10 个进程来消费这 10 张表,其实对于每个 queue 都是单进程消费,所以不存在竞争的问题,然后就是有个调度器来负责将 job 分配到这 10 个 queue 里去,但是还是有一些可优化的地方的,如 job 的优先级等,只能说目前满足了线上业务的需求,也一直想换成 redis 或者干脆用 MQ ,但是一直没有这个时间。
    wengang285
        15
    wengang285  
       2016-05-16 09:50:09 +08:00
    1 、读取之后立马 update set status = 处理中 where status= pending
    2 、 limit 1
    3 、有,但是需要自己监控队列长度,达到一定长度之后拒绝写入,否则容易吃满内存
    4 、 kafka
    realpg
        16
    realpg  
       2016-05-16 12:29:26 +08:00
    @wengang285
    高并发系统是不能这么搞的。
    1ms 的时间差就会被处理掉。
    之前有一个超高并发的系统用 MYISAM ,没有事务支持,没有 select for update
    当时我做的方案是以 update 代 select
    字段加 status1 status2 类型 unsigned bigint 20
    从程序生成两个随机数 a1 a2 如果要消费 5 个 那就
    update xxxxxx set status1=a1 status2=a2 limit 5
    然后再二次 select where status1=a1 status2=a2
    这样才能保证不会出问题
    wengang285
        17
    wengang285  
       2016-05-16 14:38:29 +08:00
    @realpg 我不知道你说的哪条有问题,如果是第一条, staus 新增一个处理中的状态,一个 worker 读取一条记录之后,立马带 where 条件 update staus 为处理中,如果失败,表示这条记录已经被别的 worker 修改为处理中或者处理完成了,那么流程退出;如果 update 成功,那么这条记录就不会被别的 worker 拿到了,可以满足 lz 的要求啊,而且这种方法我在你说的高并发环境下也用过,并没有什么问题啊。
    你说的方法,有点把简单问题复杂化,想象一下这样一种情况,如果两个 worker 随机的 a1 和 a2 是一样的,那你的代码会不会一次消耗 10 个呢?
    changwei
        18
    changwei  
       2016-05-16 15:34:06 +08:00
    这个要看用户规模吧, t.changwei.me/2 这种云端抢二楼应用也是用 mysql 实现类似于任务队列的功能,用户数量不过几百人,执行成功率之类的数据看起来还行。
    changwei
        19
    changwei  
       2016-05-16 15:38:43 +08:00
    @kimmykuang 但是如果后期要扩展 worker ,那么表数量也要跟着改,是不是有紧耦合问题,还有就是新增任务的时候也要考虑到十张表都是均衡插入,不会出现一张表加多了任务,另一张表加少了任务?
    kimmykuang
        20
    kimmykuang  
       2016-05-16 17:19:08 +08:00
    @changwei 一张表多几个任务另一张表少几个任务在目前还是可以容忍的点,基本是轮询分配任务的
    alsotang
        21
    alsotang  
       2016-06-06 20:06:12 +08:00
    感觉这篇文章是为你这个问题写的。。。你的 4 个疑问都提到了,实现起来也不难:

    https://zhuanlan.zhihu.com/p/20293493?refer=alsotang 《一个简单的 mysql 队列问题》
    fuxkcsdn
        22
    fuxkcsdn  
       2016-07-19 11:03:25 +08:00
    BEGIN;
    SELECT id,field1,field2 FROM job WHERE status IS NULL LIMIT 1 FOR UPDATE;
    UPDATE job SET status='working' WHERE id=:id;
    COMMIT;
    之前用 SELECT FOR UPDATE 实现过,曾经用单条语句实现过查询更新功能(不需要用事务),数据量小的情况下速度和上面的事务差不多,数据量一多就卡到爆...
    上面的语句虽然会锁表( SELECT FOR UPDATE 没用到索引的话就都是表级锁),但速度上还行,至少扛百万数据量, 30 个 work 还是可以的(数据库用的阿里云 RDS,1C1G )

    单条语句的 SQL
    UPDATE job SET status='working', id=(@id := id), field1=(@field1 := field1),field2=(@field2 := field2) WHERE status IS NULL LIMIT 1;
    SELECT @id AS id,@field1 AS field1,@field2 AS field2;
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1856 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 16:26 · PVG 00:26 · LAX 08:26 · JFK 11:26
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.