You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

树莓派使用Python 3读取自定义结构原始以太网数据包并存储

嗨,恭喜你已经搞定了发送原始以太网包的环节!接收设备发来的原始包并存储其实没那么复杂,我给你梳理一套可行的方案,基于Python 3和树莓派的环境:

前置准备

首先,确保树莓派的以太网接口能捕获到目标设备的数据包。如果设备和树莓派在同一物理网段,一般默认就能收到,但开启混杂模式可以避免因为MAC地址过滤漏掉包,执行这条命令:

sudo ip link set eth0 promisc on

(把eth0换成你的树莓派实际以太网接口名,用ip link show就能查到)

Python接收与存储代码实现

下面是完整的示例代码,包含包接收、解析自定义结构、存储到本地文件的逻辑,你可以根据自己的实际结构调整细节:

import socket
import struct
import time

# 配置区:根据你的实际情况修改
INTERFACE = "eth0"  # 树莓派以太网接口名称
STORAGE_FILE = "captured_packets.bin"  # 存储数据包的二进制文件路径
# 自定义Header的struct格式:比如假设你的header是两个16位无符号整数(长度+类型),小端模式
# 你需要根据自己的header结构修改这个格式字符串,参考struct模块文档
CUSTOM_HEADER_FORMAT = "<HH"
CUSTOM_HEADER_SIZE = struct.calcsize(CUSTOM_HEADER_FORMAT)
CRC_LENGTH = 4  # 假设你的CRC是32位,占4字节

def main():
    # 创建原始以太网socket:AF_PACKET是Linux专属的原始套接字类型
    try:
        # ETH_P_ALL(0x0003)表示捕获所有类型的以太网帧,如果你知道设备用的自定义以太网类型号,可以替换成对应值
        sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x0003))
        sock.bind((INTERFACE, 0))  # 绑定到指定接口
    except PermissionError:
        print("⚠️ 错误:需要root权限运行,请用sudo执行脚本!")
        return
    except OSError as e:
        print(f"❌ 创建socket失败:{e}")
        return

    print(f"✅ 开始监听{INTERFACE}接口的原始数据包...")
    print(f"📦 数据包将存储到:{STORAGE_FILE}")

    # 以追加模式打开存储文件,确保断电后数据不丢失
    with open(STORAGE_FILE, "ab") as storage_f:
        while True:
            # 接收数据包:bufsize设为65535足够覆盖所有以太网帧
            raw_frame, _ = sock.recvfrom(65535)
            
            # 以太网帧的前14字节是标准帧头:目标MAC(6)+源MAC(6)+以太网类型(2)
            eth_frame_header = raw_frame[:14]
            _, src_mac, _ = struct.unpack("!6s6sH", eth_frame_header)
            
            # 跳过标准以太网帧头,得到你的自定义数据包(header+data+crc)
            custom_packet = raw_frame[14:]
            
            # 校验数据包长度是否合法(至少要能容纳header和crc)
            if len(custom_packet) < CUSTOM_HEADER_SIZE + CRC_LENGTH:
                print("⚠️ 收到不完整数据包,跳过")
                continue
            
            # 解析自定义Header
            header_data = custom_packet[:CUSTOM_HEADER_SIZE]
            header_fields = struct.unpack(CUSTOM_HEADER_FORMAT, header_data)
            # 假设header里第一个字段是data的长度,用来校验数据完整性
            expected_data_len = header_fields[0]
            actual_data_len = len(custom_packet[CUSTOM_HEADER_SIZE:-CRC_LENGTH])
            if actual_data_len != expected_data_len:
                print(f"⚠️ 数据长度不匹配(预期{expected_data_len},实际{actual_data_len}),跳过")
                continue
            
            # 提取data和CRC部分
            payload_data = custom_packet[CUSTOM_HEADER_SIZE:-CRC_LENGTH]
            received_crc = custom_packet[-CRC_LENGTH:]
            
            # 打印日志(可选,方便调试)
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{timestamp}] 收到来自{src_mac.hex()}的包 | Header字段:{header_fields} | Data长度:{actual_data_len} | CRC:{received_crc.hex()}")
            
            # 存储数据包:这里我们把时间戳、包长度、原始自定义包一起写入,方便后续解析
            # 时间戳用毫秒级,8字节;包长度用2字节;然后是原始包数据
            storage_f.write(struct.pack("!Q", int(time.time() * 1000)))
            storage_f.write(struct.pack("!H", len(custom_packet)))
            storage_f.write(custom_packet)
            storage_f.flush()  # 强制写入磁盘,避免缓存丢失

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n🛑 用户中断,程序退出")

关键细节说明

  1. 权限问题:必须用sudo运行脚本,因为创建原始套接字需要root权限。
  2. 自定义结构适配:你需要根据自己的header实际字段修改CUSTOM_HEADER_FORMAT,比如如果header包含一个32位整数和一个字符串,格式字符串可能是<I10s(小端、32位int、10字节字符串),具体参考Python的struct模块文档。
  3. 以太网类型过滤:如果你的设备使用了自定义的以太网类型号(比如0x88XX这类非标准值),可以把socket.htons(0x0003)换成对应的类型号,这样只会捕获目标类型的包,减少无关数据干扰。
  4. 存储扩展:如果需要更灵活的存储,可以把数据写入SQLite数据库,或者用CSV记录元数据+单独的二进制文件存原始包,方便后续查询和分析。
  5. CRC校验:可以在代码里添加CRC计算逻辑,对比收到的CRC值,验证数据包是否完整。

后续读取存储的数据

如果要读取存储的captured_packets.bin文件,可以参考这段代码:

import struct

def read_stored_packets(file_path):
    with open(file_path, "rb") as f:
        while True:
            # 读取时间戳(8字节)
            timestamp_bytes = f.read(8)
            if not timestamp_bytes:
                break
            timestamp = struct.unpack("!Q", timestamp_bytes)[0] / 1000
            # 读取包长度(2字节)
            packet_len = struct.unpack("!H", f.read(2))[0]
            # 读取原始自定义包
            custom_packet = f.read(packet_len)
            # 这里可以继续解析header、data、crc,和接收时的逻辑一致
            print(f"时间戳:{timestamp} | 包长度:{packet_len}")

read_stored_packets("captured_packets.bin")

内容的提问来源于stack exchange,提问作者Alban Mourier

火山引擎 最新活动