V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
tuoov
V2EX  ›  数据库

请教一个关于并发控制的问题

  •  
  •   tuoov · 13 天前 · 2992 次点击

    现在有这样一个函数 processBatch ,负责读取数据,执行一些操作后再更新它们,相关的数据库操作都在事务内执行。伪代码如下:

    function processBatch():
        tx = db.beginTransaction()
        // 1. 批量读取:取出最多 N 条“待处理”数据
        items = tx.query("SELECT * FROM tasks WHERE status = 'PENDING' LIMIT N")
        
        for item in items:
            // 2. 业务处理
            doBusinessLogic(item)
            // 3. 更新状态
            tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
        tx.commit()
    
    // 线程 A
    spawn threadA:
        processBatch()
    
    // 线程 B (几乎同时执行)
    spawn threadB:
        processBatch()
    

    但由于 processBatch 在多个地方都会被调用,因此存在并发问题。线程 A 和线程 B 执行时可能查询到同一批数据,导致这批数据被处理两次。解决这个问题有两个方案:

    • 方案 A:在 processBatch 的逻辑中增加锁,这样在任意时刻,该函数都不会并发执行
    • 方案 B:调整数据库事务的隔离级别或锁表,即使 processBatch 并发执行了,底层的数据操作不会出现并发的情况

    我的问题是:

    1. 哪个方案更符合最佳实践?原因是什么
    2. 在保持 processBatch 会被多个地方调用不变的前提下,有没有更好的方案?
    3. 如果想学习这类并发相关的问题和解决方案,应该搜索什么关键词

    感谢各位赐教

    第 1 条附言  ·  12 天前

    感谢各位的热情回复,但我想问的不是“如何在应用层实现锁来解决这个问题”,而是“为什么”的问题。大多数回复都给出了在应用层解决的方案,所以我再重新描述下我的问题:

    1. 为什么选择在应用层解决这个问题(即方案 A 或它的变种),而不是在数据库层面解决 (即方案B)
    2. 基于什么样的原则或考虑,让 方案 A 成大多数人的选择,而非方案 B?
    34 条回复    2025-04-25 09:50:44 +08:00
    wxyz
        1
    wxyz  
       13 天前
    感觉是外部事务的颗粒度太大了,
    一次查询多条数据,但没有立即更新该批次数据的状态,肯定会导致查询到重复的数据的;
    建议增加一层内存或缓存级别的互斥锁,锁任务的 id 以及一个任务一个事务,这样可以保证一个任务每次只有一个线程处理。
    wyntalgeer
        2
    wyntalgeer  
       13 天前
    噗……不会并发执行的多线程?
    luckyrayyy
        3
    luckyrayyy  
       13 天前   ❤️ 1
    互联网公司的习惯一般并发控制都放在业务逻辑上,不太依赖数据库,就是用方案 A ,当然你数据库还是需要设置合理的隔离级别。在你的业务场景里,遇到并发,没抢到锁的线程是等待,还是直接返回报错不处理了,如果是前者的话,可以把所有的处理逻辑放到一个有序队列里,依次执行。
    kanepan19
        4
    kanepan19  
       13 天前
    简单的很

    for item in items:
    //乐观锁
    boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)
    if(!flag){
    // 抛异常
    }
    // 2. 业务处理
    doBusinessLogic(item)
    // 3. 更新状态
    tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
    tx.commit()
    kanepan19
        5
    kanepan19  
       13 天前
    接上面的 加一个状态,处理中
    lvlongxiang199
        6
    lvlongxiang199  
       13 天前
    ` tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)` -> ` tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ? and status = 'PENDING' ", item.id)`

    `processBatch` 也可以想办法做成串行的
    kanepan19
        7
    kanepan19  
       13 天前
    boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)

    多线程 只有一个成功。
    是否需要抛异常,看业务决定
    Georgedoe
        8
    Georgedoe  
       13 天前
    3. 哥们不会还没用过 deepseek 和 chatgpt 吧
    siweipancc
        9
    siweipancc  
       13 天前 via iPhone
    不要在数据库玩这个……
    hwdq0012
        10
    hwdq0012  
       13 天前
    1.读写分离,写放队列执行
    2.读到的数据可能会重复,处理掉,用 etcd ,zookeeper, rides 之类的
    3.没写过后端,轻喷
    rekulas
        11
    rekulas  
       13 天前
    加锁不够优雅,不如消息队列,不过数据少用不上队列,但你可以用生产者消费者的思维来改写,保证只有一个生产者就行了
    vikaptain
        12
    vikaptain  
       13 天前
    悲观锁、乐观锁、队列
    unused
        13
    unused  
       13 天前
    除了上面提到的,还可以考虑用某种查询条件对数据分区,让不同线程查询到不同的数据
    yinmin
        14
    yinmin  
       13 天前
    方案 A 和方案 B 都有问题。

    如果 doBusinessLogic 是 IO 密集型,推荐使用 ThreadPoolExecutor 。操作步骤如下:

    1. 设置 ThreadPoolExecutor 并发 workers ,例如 workers=5 也就是有 5 个并发同时处理;
    2. 函数 processBatch:将 pending 的任务 ID submit 到 ThreadPoolExecutor 队列里;
    3. ThreadPoolExecutor workers 处理:先 update tasks set status='RUNNING' where id=? and status='PENDING',如果返回更新记录数=0 ,就直接 return 不处理; (这是一个防冲突的技巧),再 doBusinessLogic(id),然后 update tasks set status='DONE' where id=?

    好处是:突发高并发时,任务是加入到队列的,不会挤爆服务器;可以设置并发 workers 同时处理。
    Ayanokouji
        15
    Ayanokouji  
       13 天前
    这不是并发的问题,是设计的问题。
    简单点,给 processBatch 加一个 start_time 和 end_time 参数,保证查到不一样的数据。
    mooyo
        16
    mooyo  
       13 天前
    两阶段乐观锁吧,

    第一阶段先拿一部分,从 Waiting 改成 Running ,拿到了再去执行。
    sagaxu
        17
    sagaxu  
       13 天前
    SELECT 取出 id 列表,遍历的时候按照随机顺序(如果业务逻辑允许),用 SELECT FOR UPDATE 锁住每一行,检测状态,把 PENDING 更新为 PROCESSING ,处理完成后再更新为 DONE 。这里要有一个机制,把停留在 PROCESSING 超时的任务重新放回 PENDING 或者标记为 DONE 。

    如果类似任务比较多的话,可以引入一个任务调度系统,别自己搞了,要填的坑和细节非常多。
    encounter2017
        18
    encounter2017  
       13 天前
    这完全没必要用锁,是设计的问题,流程调整下就好了。
    我理解你这块是离线的业务对吧。

    首先 select id from tasks where status = 'PENDING' 拿出全量需要处理的数据,做成一个离线文件或者放内存里(看你自己的数据规模决定)
    接着实现一个缓冲,简单点可以在内存里面构造一个比如长度为 16 的队列,存放下一批需要处理的数据

    然后是设置并发度,比如说 4 ,这一块你用线程/纤程/进程 实现都可以,依次从队列里面取任务,队列空了在获取下一批数据到队列里面。

    这一块自己实现细节还挺多的,比如任务失败了如何重跑,需不需要做背压之类的。

    我之前做过类似的,用框架实现,对应的代码就很简洁,伪代码类似这样

    ```
    ids.toStream.buffer(16).mapPar(4)(row => processData(row))
    ```
    EMMMMMMMMM
        19
    EMMMMMMMMM  
       13 天前
    你就不能给任务分一下片吗?
    线程 1:WHERE status = 'PENDING' and id % 2 = 0
    线程 2:WHERE status = 'PENDING' and id % 2 = 1

    说实话, 在互联网干了七八年了,多线程的代码屈指可数,都是多进程
    Romic
        20
    Romic  
       13 天前
    1. 使用分布式锁 性能一般
    2. 使用数据库的乐观锁 加 version 开发成本太高,需要多维护一个 version 字段
    3. 推荐方案,将数据分批打散,比如 1000 条数据 2 个现成并行执行,那么 1-500 是线程 A 执行。501-1000 线程 B 执行。经典方案。
    话说这种问题直接丢给 ai ,很多方案。以前的 deepseek R1 模型的方案完整 准确率高。现在好像不行啦。
    gg 思密达。
    netnr
        21
    netnr  
       13 天前
    这类似发短信的系统,很多地方调用发短信接口,都先写入发送记录表,状态为待发送,然后起一个任务循环执行发送并改状态;

    如果是在一个进程的前提下,可以用线程安全的先进先出队列,把 processBatch 添加到队列,另起一个线程来消费队列
    netnr
        22
    netnr  
       13 天前
    贴一个 C# 实现的类

    ThreeK
        23
    ThreeK  
       12 天前
    1 、要是不能控制调用方并发就推介方案 B ,方案 A 容易等不到锁。
    2 、2.1 doBusinessLogic 如果 IO 多计算少可以考虑并发执行。2.2 processBatch 可以写成幂等的,最终一致就行,多执行几遍还能申请加资源。
    3 、你这不算高并发,高并发一般不存在同一数据并发处理。高并发并发大但调用都带着唯一 id ,直接分布式锁解决同一 id 并发问题,你这种同一数据多处调用应该是锁等待/唤醒问题。
    我认为你们这业务槽点太多
    1) 事务太大不能拆就只能等报错了。
    2 )改 status 像事件驱动(消息通知),又不知你们写的什么。
    3 ) limit N 像是要批处理又好多处调用,调用的地方还不加条件,光用 limit N 来确定数据属于一个事务。。。。
    geebos
        24
    geebos  
       12 天前
    这种场景一般用生产-消费者模型,一个线程查,多个线程处理
    thevita
        25
    thevita  
       12 天前   ❤️ 1
    没说清楚啊,与你的事务会会发生冲突的都有啥啊,仅仅同一个逻辑的不同任务吗?有没有 读-写冲突?有没有其他不同粒度、不同逻辑的写-写冲突,doBusinessLogic 里面有不有 外部一致性要求?

    超大事务呗,某些系统很常见,并不是所有业务都是互联网,上面的不要看到这种就报警

    锁放外部(方案 A )正如你所说,只解决了 processBatch 的并发问题,但是不能避免其他事物的更新,依然可能导致 write-skew ,除非你保证只要该这个表,都拿锁,那和表锁其实也没太大差别,就看你们的数据库实现得整么样了

    锁表(方案 B )通过合理的加锁,能避免 write-skew, 但是冲突域会变大,影响系统吞吐,甚至某些 db 可能会阻塞读,但是话又说回来,如果你的场景类似,半夜批量计算,冲突可能低那种,耶完全可以接受

    其他方案:
    其实具体看你能接受 哪部分 可以被适当取舍,比如上面只讨论了锁的情况,取舍的就是与其他事物的冲突

    如果你能接受适当若化 这个超大事务的原子性的话还可以: processBatch 内加锁,这个锁止解决 不同 processBatch 任务间的冲突(更好的办法可能是引入一个协调者来保证 不同 processBatch 尽量不冲突),然后更新使用乐观锁+重试,让 这个 batch 实现最终一致,也不失为一种办法(当然,这里没讨论你的 doBusinessLogic 有不有外部一致性的情况)
    prosgtsr
        26
    prosgtsr  
       12 天前 via iPhone
    如果是我的话,我会改成一个线程查,然后多线程领取任务再处理。
    要问怎么学,我也不知道,我也是草台班子
    listenerri
        27
    listenerri  
       12 天前 via Android
    问题在哪里发生的,就尽可能在那里解决
    NoDataNoBB
        28
    NoDataNoBB  
       12 天前
    select for update
    shangfabao
        29
    shangfabao  
       12 天前
    上边写的是对的,先 update,毕竟看你的逻辑,是否 update 是没有看执行逻辑的返回结果的
    kai1412
        30
    kai1412  
       12 天前
    多线程分页取 分配好每页的数量 线程取完各自根据主键 id 更新也不会有冲突
    kai1412
        31
    kai1412  
       12 天前
    @kai1412 分页查的时候记得根据主键 id 排序
    heiya
        32
    heiya  
       12 天前
    实现上来说 A 和 B 都行,不过锁粒度都很大。根据你对 processBatch()发生并发可能性的预测,还有一些别的方案:1.并发频繁发生,可以用#28 的 select for update 2.并发偶尔发生,在表里边加一列版本号,每次更新时对比这个值是否和取出时的值一致,不一致就是有并发。这样的锁粒度控制在行上。不过假如 10 条数据 2 条有并发问题,这种情况又得额外处理。
    chiaoyuja
        33
    chiaoyuja  
       12 天前   ❤️ 1
    应用层加锁
    加锁可以做到显式控制并发,你能清楚知道「谁在处理」;
    不依赖数据库细节,业务逻辑统一,不受数据库种类、配置影响;
    代码层面的锁(如 Redis 分布式锁)支持跨进程、跨服务的同步;
    开发和调试更直观,出问题也容易定位。
    数据库加锁
    • 数据库事务的隔离级别越高,性能越差(如 SERIALIZABLE 会阻塞读取);
    • 很难用 SQL 实现「原子地读取并更新任务状态」这种逻辑,语法不友好;
    • 不同数据库实现不同,如 PostgreSQL 支持 SELECT ... FOR UPDATE SKIP LOCKED ,但 MySQL 实现不同,兼容性差;
    • 数据库锁只在事务内有效,不能用于跨服务同步控制;
    • 如果多个服务部署在不同机器上,仅靠 DB 锁,还是可能出现争抢/重入的问题。
    testliyu
        34
    testliyu  
       11 天前
    我们之前是直接加分布式锁
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3065 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 13:19 · PVG 21:19 · LAX 06:19 · JFK 09:19
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.