如何基于Ryu控制器实现QoS深度包检测并区分语音视频流量
解决语音/视频流量识别及Ryu SDN规则下发问题
刚好碰到过类似的场景,给你几个实用的解决方案,帮你搞定语音/视频流量识别+Ryu规则下发的问题:
一、为什么TOS字段没用?
首先得说清楚:现在很多网络设备(路由器、交换机)会重置或忽略TOS字段,导致你抓到的包都是tos=0x0,所以靠TOS区分流量根本不靠谱,得换其他识别方式。
二、替代TOS的流量识别方法(基于Scapy)
1. 端口匹配(最简单直接)
语音/视频协议大多有标准端口,用Scapy过滤这些端口就能快速识别:
- SIP(语音信令):UDP/TCP 5060(明文)、5061(加密)
- RTP(语音/视频媒体流):通常用10000-20000之间的UDP端口
- RTMP(视频流):TCP 1935
- WebRTC:虽然用随机端口,但可以结合SIP/SDP信令先获取端口
Scapy示例代码:
from scapy.all import sniff, UDP, TCP def classify_traffic(packet): # 识别RTP流量 if UDP in packet: if 10000 <= packet[UDP].dport <= 20000 or 10000 <= packet[UDP].sport <= 20000: print("识别到RTP媒体流") # 这里可以调用Ryu下发规则的逻辑 # 识别SIP信令 elif TCP in packet or UDP in packet: if (packet[TCP].dport == 5060 or packet[TCP].sport == 5060) or \ (packet[UDP].dport == 5060 or packet[UDP].sport == 5060): print("识别到SIP信令") # 可以解析SDP获取媒体端口 sniff(prn=classify_traffic, store=0)
2. 协议头部特征识别(更准确)
对于RTP这类有固定头部的协议,直接解析头部特征更可靠,比如RTP的版本号固定为2,还有负载类型(Payload Type)对应不同的编码格式:
- 语音常用:G.711(PT=0/8)、G.729(PT=18)
- 视频常用:H.264(PT=96)、VP8(PT=100)
Scapy支持RTP解析,需要先导入对应模块:
from scapy.layers.rtp import RTP from scapy.all import sniff, UDP def classify_rtp(packet): if UDP in packet and RTP in packet: rtp_layer = packet[RTP] # 检查RTP版本 if rtp_layer.version == 2: # 根据负载类型判断是语音还是视频 if rtp_layer.payload_type in [0,8,18]: print("识别到语音RTP流") elif rtp_layer.payload_type in [96,100,101]: print("识别到视频RTP流") # 触发Ryu规则下发逻辑 sniff(prn=classify_rtp, store=0)
3. 流量行为特征识别(应对动态端口)
像WebRTC这种用随机端口的场景,没法靠端口或头部直接识别,可以分析流量的行为:
- 语音流:UDP包大小均匀(通常100-300字节),包间隔稳定(20-50ms左右)
- 视频流:UDP包大小波动较大(500-1500字节),包间隔相对稳定
你可以在Scapy里统计连续数据包的大小和时间差,设置阈值来判断。
三、识别后向Ryu下发流表规则
识别到目标流量后,有两种方式给Ryu控制器下发规则:
1. 用Ryu REST API(快速上手)
Ryu默认支持REST API,只要加载ryu.app.ofctl_rest模块,就可以用HTTP请求下发流表。比如用Python的requests库发送POST请求:
import requests import json def send_flow_rule(controller_ip, src_ip, dst_port): url = f"http://{controller_ip}:8080/stats/flowentry/add" flow_rule = { "dpid": 1, # 目标交换机的DPID "priority": 1000, # 高优先级,确保先匹配 "match": { "eth_type": 0x0800, "ip_proto": 17, # UDP "udp_dst": dst_port }, "actions": [ {"type": "SET_QUEUE", "queue_id": 0}, # 放到高优先级队列 {"type": "OUTPUT", "port": 2} # 转发到指定端口 ], "idle_timeout": 300, # 空闲超时5分钟,自动删除规则 "hard_timeout": 600 # 硬超时10分钟 } response = requests.post(url, json=flow_rule) if response.status_code == 200: print("流表规则下发成功")
2. 集成到Ryu应用(更高效)
如果不想用REST API,可以把流量识别逻辑直接写到Ryu应用里,控制器通过PacketIn事件获取数据包,用Scapy解析后直接下发规则,这样不需要单独的Scapy抓包脚本:
from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.ofproto import ofproto_v1_3 from scapy.all import IP, UDP, RTP class QoSSwitch(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self, *args, **kwargs): super(QoSSwitch, self).__init__(*args, **kwargs) @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) def packet_in_handler(self, ev): msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto parser = datapath.ofproto_parser # 用Scapy解析数据包 packet = IP(msg.data) if UDP in packet and RTP in packet: rtp = packet[RTP] if rtp.version == 2 and rtp.payload_type in [0,8,18]: # 识别到语音流,下发高优先级规则 match = parser.OFPMatch(eth_type=0x0800, ip_proto=17, udp_dst=packet[UDP].dport) actions = [parser.OFPActionSetQueue(0), parser.OFPActionOutput(ofproto.OFPP_FLOOD)] self.add_flow(datapath, 1000, match, actions, idle_timeout=300) def add_flow(self, datapath, priority, match, actions, idle_timeout=0, hard_timeout=0): ofproto = datapath.ofproto parser = datapath.ofproto_parser inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)] mod = parser.OFPFlowMod(datapath=datapath, priority=priority, match=match, instructions=inst, idle_timeout=idle_timeout, hard_timeout=hard_timeout) datapath.send_msg(mod)
四、注意事项
- 很多网络设备会改写TOS/DSCP字段,所以永远不要依赖TOS来识别流量;
- 对于动态端口的流量(比如WebRTC),建议先通过信令协议(比如SIP的SDP)获取媒体端口,再结合RTP头部识别;
- 下发流表时一定要设置超时时间,避免流表条目过多导致交换机性能下降;
- 如果是加密的语音/视频流(比如SRTP),头部特征会被加密,这时候只能靠行为特征或端口识别。
内容的提问来源于stack exchange,提问作者alaeddinebenhassir




