You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用Python(Scapy)在TCP流中注入FIN段提前终止连接

解决Scapy发送FIN包延迟及序列号同步问题

首先,咱们先拆解你当前遇到的两个核心问题:FIN包延迟到达序列号/确认号不同步,这两个问题大多来自脚本触发条件不准确、序列号计算错误,以及脚本与curl请求的执行时机不匹配。

你的当前脚本存在的问题

  1. 触发条件错误:你用pkt[TCP].flags == 16(单纯ACK)来捕获数据包,但服务器发送数据段时,通常会带PSH+ACK标志(对应flags值为0x18=24),而非单纯的ACK,这会导致你无法正确捕获到服务器发送的带数据的数据包,自然触发FIN的时机就不对。
  2. 序列号/确认号计算错误ack=pkt[TCP].seq + 1完全不符合TCP规则——服务器发送的每个MSS数据包携带1460字节的数据,所以客户端的ACK号应该是服务器的seq + 数据包的实际数据长度(也就是len(pkt[TCP].payload)),而FIN包的序列号应该是客户端当前已发送的最后一个字节的下一个序号(可以通过跟踪服务器的ACK值来获取)。
  3. 执行时机不匹配:你先启动sniff再执行curl,如果sniff启动慢了,可能会漏掉前几个数据包;另外,Scapy发送的包不经过系统TCP栈,系统本身的连接状态会干扰,导致FIN延迟。

修正后的解决方案

下面是调整后的脚本,我会逐段解释关键逻辑:

from scapy.all import *
from time import sleep
import subprocess

# 先设置iptables规则,阻止系统发送RST包,避免干扰Scapy的手动TCP操作
os.system("iptables -I OUTPUT -p tcp --tcp-flags ALL RST,ACK -j DROP")
os.system("iptables -I OUTPUT -p tcp --tcp-flags ALL RST -j DROP")

# 全局变量:跟踪服务器发送的数据包数量,以及客户端的当前序列号
packet_count = 0
client_seq = 0

def handle_packet(pkt):
    global packet_count, client_seq
    # 只处理目标服务器发送的TCP流量
    if pkt.haslayer(TCP) and pkt[IP].src == "1.1.1.1":
        # 从服务器的ACK包中记录客户端的当前序列号
        if pkt[TCP].flags & 0x10:  # 检查ACK标志位
            client_seq = pkt[TCP].ack
        
        # 只统计带实际数据的数据包(过滤空ACK包)
        if len(pkt[TCP].payload) > 0:
            packet_count += 1
            print(f"捕获到服务器第{packet_count}个数据包,Seq: {pkt[TCP].seq}, 数据长度: {len(pkt[TCP].payload)}")
            
            # 当捕获到第5个数据包时,立即发送FIN包终止连接
            if packet_count == 5:
                # 计算正确的确认号:服务器的Seq + 本次数据包的长度
                ack_num = pkt[TCP].seq + len(pkt[TCP].payload)
                # 构造符合TCP规范的FIN包
                fin_pkt = IP(dst=pkt[IP].src, src=pkt[IP].dst) / \
                          TCP(dport=pkt[TCP].sport, sport=pkt[TCP].dport,
                              seq=client_seq, ack=ack_num, flags='F')
                # 静默发送FIN包,避免冗余输出
                send(fin_pkt, verbose=0)
                print(f"已发送FIN包,客户端Seq: {client_seq}, 确认号: {ack_num}")
                # 发送完成后终止嗅探
                return True

# 先后台启动curl请求,确保不会错过连接初期的数据包
subprocess.Popen(["curl", "http://1.1.1.1/your-target-resource"], 
                 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# 短暂等待,让TCP握手完成
sleep(1)

# 启动嗅探:指定网卡、过滤目标服务器的TCP流量,发送FIN后自动停止
sniff(iface="ens160", prn=handle_packet, filter="tcp host 1.1.1.1", 
      stop_filter=lambda x: packet_count >=5)

# 测试完成后清理iptables规则(可选,避免影响后续正常连接)
os.system("iptables -D OUTPUT -p tcp --tcp-flags ALL RST,ACK -j DROP")
os.system("iptables -D OUTPUT -p tcp --tcp-flags ALL RST -j DROP")

关键调整点说明

  • 触发逻辑优化:通过检查len(pkt[TCP].payload) > 0准确统计服务器发送的带数据的数据包,确保第5个包触发FIN。
  • 序列号修正:从服务器的ACK包中获取客户端的当前序列号,ACK号则用服务器的Seq加上数据包的实际长度,完全符合TCP协议规范。
  • 执行时机调整:先后台启动curl,再启动sniff,避免错过连接初期的数据包;添加短暂延时确保TCP握手完成。
  • 自动停止嗅探:用stop_filter在发送完FIN后立即停止sniff,减少资源占用。

测试注意事项

  1. 确保脚本以root权限运行(Scapy和iptables操作都需要)。
  2. 替换http://1.1.1.1/your-target-resource为你实际的测试资源地址。
  3. 测试完成后记得清理iptables规则,避免影响其他TCP连接。

内容的提问来源于stack exchange,提问作者Martin S.

火山引擎 最新活动