现在有这样一个函数 processBatch ,负责读取数据,执行一些操作后再更新它们,相关的数据库操作都在事务内执行。伪代码如下:
function processBatch():
tx = db.beginTransaction()
// 1. 批量读取:取出最多 N 条“待处理”数据
items = tx.query("SELECT * FROM tasks WHERE status = 'PENDING' LIMIT N")
for item in items:
// 2. 业务处理
doBusinessLogic(item)
// 3. 更新状态
tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
tx.commit()
// 线程 A
spawn threadA:
processBatch()
// 线程 B (几乎同时执行)
spawn threadB:
processBatch()
但由于 processBatch 在多个地方都会被调用,因此存在并发问题。线程 A 和线程 B 执行时可能查询到同一批数据,导致这批数据被处理两次。解决这个问题有两个方案:
我的问题是:
感谢各位赐教
感谢各位的热情回复,但我想问的不是“如何在应用层实现锁来解决这个问题”,而是“为什么”的问题。大多数回复都给出了在应用层解决的方案,所以我再重新描述下我的问题:
![]() |
1
wxyz 13 天前
感觉是外部事务的颗粒度太大了,
一次查询多条数据,但没有立即更新该批次数据的状态,肯定会导致查询到重复的数据的; 建议增加一层内存或缓存级别的互斥锁,锁任务的 id 以及一个任务一个事务,这样可以保证一个任务每次只有一个线程处理。 |
![]() |
2
wyntalgeer 13 天前
噗……不会并发执行的多线程?
|
![]() |
3
luckyrayyy 13 天前 ![]() 互联网公司的习惯一般并发控制都放在业务逻辑上,不太依赖数据库,就是用方案 A ,当然你数据库还是需要设置合理的隔离级别。在你的业务场景里,遇到并发,没抢到锁的线程是等待,还是直接返回报错不处理了,如果是前者的话,可以把所有的处理逻辑放到一个有序队列里,依次执行。
|
4
kanepan19 13 天前
|
5
kanepan19 13 天前
接上面的 加一个状态,处理中
|
6
lvlongxiang199 13 天前
|
7
kanepan19 13 天前
boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)
多线程 只有一个成功。 是否需要抛异常,看业务决定 |
8
Georgedoe 13 天前
3. 哥们不会还没用过 deepseek 和 chatgpt 吧
|
![]() |
9
siweipancc 13 天前 via iPhone
不要在数据库玩这个……
|
10
hwdq0012 13 天前
1.读写分离,写放队列执行
2.读到的数据可能会重复,处理掉,用 etcd ,zookeeper, rides 之类的 3.没写过后端,轻喷 |
![]() |
11
rekulas 13 天前
加锁不够优雅,不如消息队列,不过数据少用不上队列,但你可以用生产者消费者的思维来改写,保证只有一个生产者就行了
|
12
vikaptain 13 天前
悲观锁、乐观锁、队列
|
13
unused 13 天前
除了上面提到的,还可以考虑用某种查询条件对数据分区,让不同线程查询到不同的数据
|
14
yinmin 13 天前
方案 A 和方案 B 都有问题。
如果 doBusinessLogic 是 IO 密集型,推荐使用 ThreadPoolExecutor 。操作步骤如下: 1. 设置 ThreadPoolExecutor 并发 workers ,例如 workers=5 也就是有 5 个并发同时处理; 2. 函数 processBatch:将 pending 的任务 ID submit 到 ThreadPoolExecutor 队列里; 3. ThreadPoolExecutor workers 处理:先 update tasks set status='RUNNING' where id=? and status='PENDING',如果返回更新记录数=0 ,就直接 return 不处理; (这是一个防冲突的技巧),再 doBusinessLogic(id),然后 update tasks set status='DONE' where id=? 好处是:突发高并发时,任务是加入到队列的,不会挤爆服务器;可以设置并发 workers 同时处理。 |
15
Ayanokouji 13 天前
这不是并发的问题,是设计的问题。
简单点,给 processBatch 加一个 start_time 和 end_time 参数,保证查到不一样的数据。 |
16
mooyo 13 天前
两阶段乐观锁吧,
第一阶段先拿一部分,从 Waiting 改成 Running ,拿到了再去执行。 |
![]() |
17
sagaxu 13 天前
SELECT 取出 id 列表,遍历的时候按照随机顺序(如果业务逻辑允许),用 SELECT FOR UPDATE 锁住每一行,检测状态,把 PENDING 更新为 PROCESSING ,处理完成后再更新为 DONE 。这里要有一个机制,把停留在 PROCESSING 超时的任务重新放回 PENDING 或者标记为 DONE 。
如果类似任务比较多的话,可以引入一个任务调度系统,别自己搞了,要填的坑和细节非常多。 |
18
encounter2017 13 天前
这完全没必要用锁,是设计的问题,流程调整下就好了。
我理解你这块是离线的业务对吧。 首先 select id from tasks where status = 'PENDING' 拿出全量需要处理的数据,做成一个离线文件或者放内存里(看你自己的数据规模决定) 接着实现一个缓冲,简单点可以在内存里面构造一个比如长度为 16 的队列,存放下一批需要处理的数据 然后是设置并发度,比如说 4 ,这一块你用线程/纤程/进程 实现都可以,依次从队列里面取任务,队列空了在获取下一批数据到队列里面。 这一块自己实现细节还挺多的,比如任务失败了如何重跑,需不需要做背压之类的。 我之前做过类似的,用框架实现,对应的代码就很简洁,伪代码类似这样 ``` ids.toStream.buffer(16).mapPar(4)(row => processData(row)) ``` |
19
EMMMMMMMMM 13 天前
你就不能给任务分一下片吗?
线程 1:WHERE status = 'PENDING' and id % 2 = 0 线程 2:WHERE status = 'PENDING' and id % 2 = 1 说实话, 在互联网干了七八年了,多线程的代码屈指可数,都是多进程 |
![]() |
20
Romic 13 天前
1. 使用分布式锁 性能一般
2. 使用数据库的乐观锁 加 version 开发成本太高,需要多维护一个 version 字段 3. 推荐方案,将数据分批打散,比如 1000 条数据 2 个现成并行执行,那么 1-500 是线程 A 执行。501-1000 线程 B 执行。经典方案。 话说这种问题直接丢给 ai ,很多方案。以前的 deepseek R1 模型的方案完整 准确率高。现在好像不行啦。 gg 思密达。 |
![]() |
21
netnr 13 天前
这类似发短信的系统,很多地方调用发短信接口,都先写入发送记录表,状态为待发送,然后起一个任务循环执行发送并改状态;
如果是在一个进程的前提下,可以用线程安全的先进先出队列,把 processBatch 添加到队列,另起一个线程来消费队列 |
![]() |
22
netnr 13 天前
|
![]() |
23
ThreeK 12 天前
1 、要是不能控制调用方并发就推介方案 B ,方案 A 容易等不到锁。
2 、2.1 doBusinessLogic 如果 IO 多计算少可以考虑并发执行。2.2 processBatch 可以写成幂等的,最终一致就行,多执行几遍还能申请加资源。 3 、你这不算高并发,高并发一般不存在同一数据并发处理。高并发并发大但调用都带着唯一 id ,直接分布式锁解决同一 id 并发问题,你这种同一数据多处调用应该是锁等待/唤醒问题。 我认为你们这业务槽点太多 1) 事务太大不能拆就只能等报错了。 2 )改 status 像事件驱动(消息通知),又不知你们写的什么。 3 ) limit N 像是要批处理又好多处调用,调用的地方还不加条件,光用 limit N 来确定数据属于一个事务。。。。 |
24
geebos 12 天前
这种场景一般用生产-消费者模型,一个线程查,多个线程处理
|
25
thevita 12 天前 ![]() 没说清楚啊,与你的事务会会发生冲突的都有啥啊,仅仅同一个逻辑的不同任务吗?有没有 读-写冲突?有没有其他不同粒度、不同逻辑的写-写冲突,doBusinessLogic 里面有不有 外部一致性要求?
超大事务呗,某些系统很常见,并不是所有业务都是互联网,上面的不要看到这种就报警 锁放外部(方案 A )正如你所说,只解决了 processBatch 的并发问题,但是不能避免其他事物的更新,依然可能导致 write-skew ,除非你保证只要该这个表,都拿锁,那和表锁其实也没太大差别,就看你们的数据库实现得整么样了 锁表(方案 B )通过合理的加锁,能避免 write-skew, 但是冲突域会变大,影响系统吞吐,甚至某些 db 可能会阻塞读,但是话又说回来,如果你的场景类似,半夜批量计算,冲突可能低那种,耶完全可以接受 其他方案: 其实具体看你能接受 哪部分 可以被适当取舍,比如上面只讨论了锁的情况,取舍的就是与其他事物的冲突 如果你能接受适当若化 这个超大事务的原子性的话还可以: processBatch 内加锁,这个锁止解决 不同 processBatch 任务间的冲突(更好的办法可能是引入一个协调者来保证 不同 processBatch 尽量不冲突),然后更新使用乐观锁+重试,让 这个 batch 实现最终一致,也不失为一种办法(当然,这里没讨论你的 doBusinessLogic 有不有外部一致性的情况) |
26
prosgtsr 12 天前 via iPhone
如果是我的话,我会改成一个线程查,然后多线程领取任务再处理。
要问怎么学,我也不知道,我也是草台班子 |
27
listenerri 12 天前 via Android
问题在哪里发生的,就尽可能在那里解决
|
28
NoDataNoBB 12 天前
select for update
|
![]() |
29
shangfabao 12 天前
上边写的是对的,先 update,毕竟看你的逻辑,是否 update 是没有看执行逻辑的返回结果的
|
![]() |
30
kai1412 12 天前
多线程分页取 分配好每页的数量 线程取完各自根据主键 id 更新也不会有冲突
|
32
heiya 12 天前
实现上来说 A 和 B 都行,不过锁粒度都很大。根据你对 processBatch()发生并发可能性的预测,还有一些别的方案:1.并发频繁发生,可以用#28 的 select for update 2.并发偶尔发生,在表里边加一列版本号,每次更新时对比这个值是否和取出时的值一致,不一致就是有并发。这样的锁粒度控制在行上。不过假如 10 条数据 2 条有并发问题,这种情况又得额外处理。
|
33
chiaoyuja 12 天前 ![]() 应用层加锁
加锁可以做到显式控制并发,你能清楚知道「谁在处理」; 不依赖数据库细节,业务逻辑统一,不受数据库种类、配置影响; 代码层面的锁(如 Redis 分布式锁)支持跨进程、跨服务的同步; 开发和调试更直观,出问题也容易定位。 数据库加锁 • 数据库事务的隔离级别越高,性能越差(如 SERIALIZABLE 会阻塞读取); • 很难用 SQL 实现「原子地读取并更新任务状态」这种逻辑,语法不友好; • 不同数据库实现不同,如 PostgreSQL 支持 SELECT ... FOR UPDATE SKIP LOCKED ,但 MySQL 实现不同,兼容性差; • 数据库锁只在事务内有效,不能用于跨服务同步控制; • 如果多个服务部署在不同机器上,仅靠 DB 锁,还是可能出现争抢/重入的问题。 |
![]() |
34
testliyu 11 天前
我们之前是直接加分布式锁
|