You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多线程竞态条件行为随时间变化的原因及线程同步正确实现方式问询

理解Python多线程的竞态条件与线程同步问题

一、为什么Jupyter Notebook中COUNTER的值会随时间变化?

你观察到的现象核心原因有两个:

  1. 线程执行的异步性
    当你在Jupyter Notebook中运行启动线程的单元格时,这些线程会在后台并发执行,而单元格本身的执行会立即结束(不会等待线程完成)。当你后续运行检查COUNTER的单元格时,读取到的值完全取决于此时有多少个线程完成了它们的循环:

    • 间隔5秒运行检查单元格:此时10个线程刚好全部完成,所以COUNTER达到预期的10*10000=100000
    • 间隔更长时间(比如你提到的10秒):如果不小心重复运行了启动线程的单元格,就会再启动10个线程,导致COUNTER翻倍到200000;或者单纯是线程执行延迟,直到10秒才全部完成
    • 同一单元格内print和assert的结果不同:因为print执行时还有线程在修改COUNTER,等到assert执行时,又有几个线程完成了循环,所以值会变化。
  2. 竞态条件的潜在影响
    即使你等所有线程都执行完毕,COUNTER的值也可能小于预期——这才是竞态条件的核心问题。COUNTER += 1看起来是一个原子操作,但实际上它分为三步:

    • 读取当前COUNTER的值
    • 将值加1
    • 把新值写回COUNTER
      在多线程环境下,多个线程可能同时读取到同一个旧值,各自加1后写回,最终导致一次“无效”的增量。比如线程A和B都读取到COUNTER=5,A写回6,B也写回6,两次操作只让COUNTER增加了1,而不是2。你的代码里加了time.sleep(0.001),会让线程切换更频繁,这种竞态条件的表现会更明显。

二、如何在Python脚本中等待所有线程并发完成后处理COUNTER?

你提到的join()用法错误导致线程串行执行,是因为你把start()join()放在了同一个循环里——启动一个线程就立刻等待它完成,自然无法并发。正确的做法是先启动所有线程,再统一等待它们全部结束

import threading, time, random

COUNTER = 0
def increement(n):
    global COUNTER
    for _ in range(n):
        COUNTER += 1
        time.sleep(0.001)
    print(f"{threading.current_thread().name} finished counting")

ITERATIONS = 10000
threads = [threading.Thread(target=increement, args=(ITERATIONS,)) for _ in range(10)]

# 第一步:启动所有线程(此时所有线程并发运行)
for t in threads:
    t.start()

# 第二步:等待每个线程完成(主线程阻塞,直到所有子线程结束)
for t in threads:
    t.join()

# 现在可以安全地访问COUNTER了
assert COUNTER == (len(threads) * ITERATIONS), f"Invalid value for counter: {COUNTER}, expected value: {len(threads) * ITERATIONS}"

这样操作后,所有线程会同时启动并并发执行,join()只是让主线程暂停,直到所有子线程都完成,不会影响线程之间的并行性。


三、解决竞态条件的根本方案

上面的代码能保证等待线程完成,但依然无法解决竞态条件导致的COUNTER值不正确。要彻底解决这个问题,需要使用线程同步机制,最常用的是threading.Lock()

import threading, time, random

COUNTER = 0
# 创建一个全局锁对象
lock = threading.Lock()

def increement(n):
    global COUNTER
    for _ in range(n):
        # 使用with语句自动获取和释放锁,确保同一时间只有一个线程执行COUNTER +=1
        with lock:
            COUNTER += 1
        time.sleep(0.001)
    print(f"{threading.current_thread().name} finished counting")

ITERATIONS = 10000
threads = [threading.Thread(target=increement, args=(ITERATIONS,)) for _ in range(10)]

for t in threads:
    t.start()
for t in threads:
    t.join()

# 现在COUNTER的值一定会等于预期
assert COUNTER == (len(threads) * ITERATIONS), f"Invalid value for counter: {COUNTER}, expected value: {len(threads) * ITERATIONS}"

锁的作用是把COUNTER +=1这个非原子操作变成原子操作:同一时间只有一个线程能进入with lock代码块,确保读取、修改、写回的过程不会被其他线程打断,从而保证最终结果的正确性。


内容的提问来源于stack exchange,提问作者Bharath Ram

火山引擎 最新活动