树莓派使用Python 3读取自定义结构原始以太网数据包并存储
嗨,恭喜你已经搞定了发送原始以太网包的环节!接收设备发来的原始包并存储其实没那么复杂,我给你梳理一套可行的方案,基于Python 3和树莓派的环境:
前置准备
首先,确保树莓派的以太网接口能捕获到目标设备的数据包。如果设备和树莓派在同一物理网段,一般默认就能收到,但开启混杂模式可以避免因为MAC地址过滤漏掉包,执行这条命令:
sudo ip link set eth0 promisc on
(把eth0换成你的树莓派实际以太网接口名,用ip link show就能查到)
Python接收与存储代码实现
下面是完整的示例代码,包含包接收、解析自定义结构、存储到本地文件的逻辑,你可以根据自己的实际结构调整细节:
import socket import struct import time # 配置区:根据你的实际情况修改 INTERFACE = "eth0" # 树莓派以太网接口名称 STORAGE_FILE = "captured_packets.bin" # 存储数据包的二进制文件路径 # 自定义Header的struct格式:比如假设你的header是两个16位无符号整数(长度+类型),小端模式 # 你需要根据自己的header结构修改这个格式字符串,参考struct模块文档 CUSTOM_HEADER_FORMAT = "<HH" CUSTOM_HEADER_SIZE = struct.calcsize(CUSTOM_HEADER_FORMAT) CRC_LENGTH = 4 # 假设你的CRC是32位,占4字节 def main(): # 创建原始以太网socket:AF_PACKET是Linux专属的原始套接字类型 try: # ETH_P_ALL(0x0003)表示捕获所有类型的以太网帧,如果你知道设备用的自定义以太网类型号,可以替换成对应值 sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003)) sock.bind((INTERFACE, 0)) # 绑定到指定接口 except PermissionError: print("⚠️ 错误:需要root权限运行,请用sudo执行脚本!") return except OSError as e: print(f"❌ 创建socket失败:{e}") return print(f"✅ 开始监听{INTERFACE}接口的原始数据包...") print(f"📦 数据包将存储到:{STORAGE_FILE}") # 以追加模式打开存储文件,确保断电后数据不丢失 with open(STORAGE_FILE, "ab") as storage_f: while True: # 接收数据包:bufsize设为65535足够覆盖所有以太网帧 raw_frame, _ = sock.recvfrom(65535) # 以太网帧的前14字节是标准帧头:目标MAC(6)+源MAC(6)+以太网类型(2) eth_frame_header = raw_frame[:14] _, src_mac, _ = struct.unpack("!6s6sH", eth_frame_header) # 跳过标准以太网帧头,得到你的自定义数据包(header+data+crc) custom_packet = raw_frame[14:] # 校验数据包长度是否合法(至少要能容纳header和crc) if len(custom_packet) < CUSTOM_HEADER_SIZE + CRC_LENGTH: print("⚠️ 收到不完整数据包,跳过") continue # 解析自定义Header header_data = custom_packet[:CUSTOM_HEADER_SIZE] header_fields = struct.unpack(CUSTOM_HEADER_FORMAT, header_data) # 假设header里第一个字段是data的长度,用来校验数据完整性 expected_data_len = header_fields[0] actual_data_len = len(custom_packet[CUSTOM_HEADER_SIZE:-CRC_LENGTH]) if actual_data_len != expected_data_len: print(f"⚠️ 数据长度不匹配(预期{expected_data_len},实际{actual_data_len}),跳过") continue # 提取data和CRC部分 payload_data = custom_packet[CUSTOM_HEADER_SIZE:-CRC_LENGTH] received_crc = custom_packet[-CRC_LENGTH:] # 打印日志(可选,方便调试) timestamp = time.strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] 收到来自{src_mac.hex()}的包 | Header字段:{header_fields} | Data长度:{actual_data_len} | CRC:{received_crc.hex()}") # 存储数据包:这里我们把时间戳、包长度、原始自定义包一起写入,方便后续解析 # 时间戳用毫秒级,8字节;包长度用2字节;然后是原始包数据 storage_f.write(struct.pack("!Q", int(time.time() * 1000))) storage_f.write(struct.pack("!H", len(custom_packet))) storage_f.write(custom_packet) storage_f.flush() # 强制写入磁盘,避免缓存丢失 if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n🛑 用户中断,程序退出")
关键细节说明
- 权限问题:必须用
sudo运行脚本,因为创建原始套接字需要root权限。 - 自定义结构适配:你需要根据自己的
header实际字段修改CUSTOM_HEADER_FORMAT,比如如果header包含一个32位整数和一个字符串,格式字符串可能是<I10s(小端、32位int、10字节字符串),具体参考Python的struct模块文档。 - 以太网类型过滤:如果你的设备使用了自定义的以太网类型号(比如0x88XX这类非标准值),可以把
socket.htons(0x0003)换成对应的类型号,这样只会捕获目标类型的包,减少无关数据干扰。 - 存储扩展:如果需要更灵活的存储,可以把数据写入SQLite数据库,或者用CSV记录元数据+单独的二进制文件存原始包,方便后续查询和分析。
- CRC校验:可以在代码里添加CRC计算逻辑,对比收到的CRC值,验证数据包是否完整。
后续读取存储的数据
如果要读取存储的captured_packets.bin文件,可以参考这段代码:
import struct def read_stored_packets(file_path): with open(file_path, "rb") as f: while True: # 读取时间戳(8字节) timestamp_bytes = f.read(8) if not timestamp_bytes: break timestamp = struct.unpack("!Q", timestamp_bytes)[0] / 1000 # 读取包长度(2字节) packet_len = struct.unpack("!H", f.read(2))[0] # 读取原始自定义包 custom_packet = f.read(packet_len) # 这里可以继续解析header、data、crc,和接收时的逻辑一致 print(f"时间戳:{timestamp} | 包长度:{packet_len}") read_stored_packets("captured_packets.bin")
内容的提问来源于stack exchange,提问作者Alban Mourier




