You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多线程并发运行时如何实现有序打印输出?

解决多线程端口扫描输出乱序问题

这个问题太常见了——多线程环境下多个线程同时往标准输出(stdout)写内容,因为操作系统的输出流没有同步机制,就会导致打印内容交错混乱。别担心,我们有两种完全不影响扫描速度的解决方案:

方案一:用线程锁同步输出操作

线程锁(threading.Lock)可以保证同一时间只有一个线程能执行打印操作,从根源上避免输出乱序。这种方案支持实时输出,锁的开销极小,几乎不会拖慢扫描速度。

修改后的代码示例

首先创建全局锁对象,再修改connCheck函数和线程创建逻辑:

import threading
import socket

# 创建全局打印锁,确保同一时间只有一个线程能写输出
print_lock = threading.Lock()

def connCheck(newurl, schemess, port, print_lock):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(3)
        result = sock.connect_ex((ip, port))
        newip = schemess + ip + slash
        output = ""
        
        if port == 21:
            if result == 0:
                output = f"\t [+] {port}"
                ftpurls.append(newip)
                dicta(newip, port)
        else:
            if result == 0:
                output = f"\t {port}\n"
                dicta(newip, port)
        
        # 只有有内容要打印时才获取锁
        if output:
            with print_lock:
                print(output, end='')
        
        sock.close()
    except Exception as e:
        error_msg = f"{str(e)}\n"
        with print_lock:
            print(error_msg, end='')

# 创建线程时传入锁对象
th1 = threading.Thread(target=connCheck, args=(newurl, schemess, 21, print_lock))
th2 = threading.Thread(target=connCheck, args=(newurl, schemess, 22, print_lock))
th3 = threading.Thread(target=connCheck, args=(newurl, schemess, 80, print_lock))
th4 = threading.Thread(target=connCheck, args=(newurl, schemess, 8080, print_lock))

# 启动所有线程
th1.start()
th2.start()
th3.start()
th4.start()

# 注意:必须等待所有线程结束,原代码只join th4会导致主线程提前退出
th1.join()
th2.join()
th3.join()
th4.join()

方案二:用队列收集结果,主线程统一排序输出

这种方法效率更高——线程不需要等待锁,只需要把扫描结果放入线程安全的队列,主线程等所有扫描任务完成后,再取出结果排序输出。完全不影响扫描速度,还能实现严格的端口号有序输出。

修改后的代码示例

import threading
import socket
from queue import Queue

# 创建线程安全的结果队列
result_queue = Queue()

def connCheck(newurl, schemess, port, result_queue):
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(3)
        result = sock.connect_ex((ip, port))
        newip = schemess + ip + slash
        
        if result == 0:
            if port == 21:
                ftpurls.append(newip)
                dicta(newip, port)
                # 存入队列:(端口号, 输出内容),方便后续排序
                result_queue.put( (port, f"\t [+] {port}") )
            else:
                dicta(newip, port)
                result_queue.put( (port, f"\t {port}") )
        
        sock.close()
    except Exception as e:
        # 错误信息标记为端口号-1,后续排序会放在最后
        result_queue.put( (-1, f"{str(e)}") )

# 创建线程时传入队列
th1 = threading.Thread(target=connCheck, args=(newurl, schemess, 21, result_queue))
th2 = threading.Thread(target=connCheck, args=(newurl, schemess, 22, result_queue))
th3 = threading.Thread(target=connCheck, args=(newurl, schemess, 80, result_queue))
th4 = threading.Thread(target=connCheck, args=(newurl, schemess, 8080, result_queue))

# 启动线程
th1.start()
th2.start()
th3.start()
th4.start()

# 等待所有线程完成扫描
th1.join()
th2.join()
th3.join()
th4.join()

# 收集结果并按端口号排序(错误信息放在最后)
results = []
while not result_queue.empty():
    results.append(result_queue.get())

results.sort(key=lambda x: x[0] if x[0] != -1 else float('inf'))

# 统一输出有序结果
for item in results:
    print(item[1])

方案选择建议

  • 如果需要实时看到扫描结果,选方案一,锁的开销可以忽略不计;
  • 如果更在意扫描效率,或者希望最后严格按端口号有序输出,选方案二,线程完全不需要等待IO操作,专注于扫描任务。

内容的提问来源于stack exchange,提问作者januu agrawal

火山引擎 最新活动