You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多线程Kahoot代码扫描程序输出重复混乱问题求助

问题排查与解决方案

我来帮你拆解这个输出混乱的问题——这本质是多线程环境下的标准输出竞争,再加上代码里的一些小逻辑问题,咱们一步步来解决:

为什么会出现输出混乱?

  • print操作的线程竞争:Python的print()看起来简单,但它并不是完全线程安全的。如果多个线程(包括你自己启动的线程,或是Kahoot客户端库内部的异步线程)同时调用print(),它们的输出内容可能会“挤”在一起——比如线程A刚输出了一半数字,线程B的print就插进来,导致你看到重复、错乱的数字串。
  • 计数器逻辑的小坑:你的代码里print(code)是在code +=1之后执行的,这意味着你打印的数字其实是下一个要检查的代码,不是当前正在请求的那个。如果后续你增加线程数量,每个线程各自维护自己的code变量,还会导致大量重复请求,输出彻底无序。

修复方案

我们可以通过线程锁解决输出竞争,同时改用全局共享计数器避免重复请求,调整后的代码如下:

from kahoot import client
import requests
import time
import threading

# 全局共享计数器和锁,确保线程安全
current_code = 100
max_code = 1428567
# 用于同步print输出的锁
print_lock = threading.Lock()
# 用于同步计数器递增的锁
counter_lock = threading.Lock()

def kahoot_join(thread_name):
    global current_code
    while True:
        # 先安全获取当前要检查的代码,避免多个线程拿到重复值
        with counter_lock:
            if current_code > max_code:
                break
            code_to_check = current_code
            current_code += 1
        
        converted_code = str(code_to_check)
        try:
            # 增加超时,避免请求卡住线程
            request = requests.get(f"https://kahoot.it/reserve/session/{converted_code}", timeout=5)
        except requests.exceptions.RequestException as e:
            # 线程安全地打印错误信息
            with print_lock:
                print(f"{thread_name} 请求失败 [{code_to_check}]: {str(e)[:50]}...")
            continue
        
        # 安全打印当前检查的代码
        with print_lock:
            print(f"{thread_name} 正在检查: {code_to_check}")
        
        if request.status_code == 200:
            bot = client()
            bot.join(converted_code, "bot")
            
            def joinHandle():
                pass
            bot.on("joined", joinHandle)
            
            # 安全打印找到有效代码的信息
            with print_lock:
                print(f"✅ {thread_name} 找到有效代码: {converted_code} | 下一个待检查代码: {current_code}")

if __name__ == "__main__":
    # 可以调整线程数量,建议2-5个,避免被限流
    thread_count = 3
    threads = []
    for i in range(thread_count):
        thread = threading.Thread(target=kahoot_join, args=(f"线程-{i+1}",))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程执行完毕
    for thread in threads:
        thread.join()

关键改进点

  • 全局计数器+锁:用current_code作为共享计数器,配合counter_lock确保每个线程只会拿到唯一的、未被检查过的代码,彻底避免重复请求,同时保证计数器递增的原子性。
  • print锁:所有print()调用都用print_lock包裹,确保同一时间只有一个线程能输出内容,解决了输出混乱的问题。
  • 修复代码逻辑:现在打印的code_to_check是当前正在请求的代码,找到有效代码时也会输出正确的数值。
  • 异常处理:增加了请求超时和异常捕获,避免单个请求失败导致线程崩溃。

额外提醒

  • 不要设置太多线程(建议2-5个),Kahoot服务器可能会对频繁请求限流甚至封禁IP。
  • 可以在请求之间加短暂延迟(比如time.sleep(0.1)),降低被封禁的风险。

内容的提问来源于stack exchange,提问作者Sheep

火山引擎 最新活动