You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用SELECT FOR UPDATE锁定同一用户关联的所有任务行?

解决同一Worker锁定同一person_id所有任务行的简便方案

我之前在处理类似的分布式任务调度场景时,遇到过完全一样的问题,给你几个实用的实现思路,都是不需要锁persons表的:

方法1:用CTE原子性锁定目标person_id的所有行

这个方案利用CTE(公共表表达式)的原子性,先选出一个未被其他Worker锁定的person_id,再一次性锁定该person_id下所有status='new'的任务行,完全避免了竞态问题:

WITH target_person AS (
    -- 先选一个未被锁定的、有新任务的person_id,同时锁定对应的行
    SELECT DISTINCT person_id
    FROM jobs
    WHERE status = 'new'
    LIMIT 1
    FOR UPDATE SKIP LOCKED
)
-- 锁定该person_id下所有新任务行
SELECT j.*
FROM jobs j
JOIN target_person tp ON j.person_id = tp.person_id
WHERE j.status = 'new'
FOR UPDATE;

为什么这个方案可行?

  • 整个CTE+主查询在同一个事务中执行,是原子操作:CTE里的SELECT会先锁定选中person_id对应的至少一行任务,其他Worker的SKIP LOCKED会直接跳过这个person_id;
  • 主查询会进一步锁定该person_id下所有剩余的new状态任务行,确保同一Worker能拿到该用户的所有待处理任务。

方法2:先标记任务状态再处理(更安全的生产级方案)

如果你的业务允许,可以先把目标person_id的所有新任务标记为「处理中」,再取出这些任务处理,这样能彻底避免重复调度的风险:

WITH target_person AS (
    SELECT DISTINCT person_id
    FROM jobs
    WHERE status = 'new'
    LIMIT 1
    FOR UPDATE SKIP LOCKED
), marked_jobs AS (
    -- 原子性更新任务状态为processing,同时返回这些任务
    UPDATE jobs
    SET status = 'processing'
    WHERE status = 'new'
      AND person_id IN (SELECT person_id FROM target_person)
    RETURNING *
)
SELECT * FROM marked_jobs;

这个方案的优势:

  • 任务一旦被标记为processing,其他Worker的查询会自动过滤掉这些行,完全杜绝重复处理;
  • 即使Worker进程意外崩溃,也可以通过定时任务把超时的processing任务重置为new,保证任务不丢失。

为什么你之前的方案不行?

  • GROUP BY的查询无法直接用FOR UPDATE,因为聚合操作返回的不是原表行,数据库无法锁定原始记录;
  • 普通子查询的方式存在竞态:子查询选出person_id后,主查询执行前,其他Worker可能已经锁定了该person_id的部分行,导致你无法拿到全部任务。

内容的提问来源于stack exchange,提问作者Andrew Rozdolsky

火山引擎 最新活动