无法向Spark发送编码JSON数据,Socket流传输无输出求助
折腾一整晚没搞定确实闹心,我帮你梳理几个最可能导致服务端空白无输出的关键问题,按优先级来排查:
1. 给发送的JSON添加换行符(最可能的根因)
Spark Streaming的socketTextStream默认是按换行符(\n)分割数据流的——只有当它收到带换行符的数据包时,才会把内容当作一条完整记录处理并输出。
看你客户端的发送逻辑:
out = json.dumps(row)# encode it before sending self.conn.send(bytes(out,encoding = "utf8"))
这里发送的JSON字符串末尾没有加换行符,Spark会一直缓存收到的数据,直到触发换行才会输出。修改成这样就行:
out = json.dumps(row) + "\n" # 给每条JSON添加换行符 self.conn.send(bytes(out, encoding="utf8"))
2. 严格遵守Socket连接的启动顺序
你的数据发送程序是Socket服务端(调用了s.bind()和s.listen()),而Spark的socketTextStream是作为Socket客户端去连接它的。所以必须:
- 先启动你的数据发送程序,等它打印出
Listening for a client at localhost 12345之后,再启动Spark Streaming服务端 - 如果先启动Spark,它会一直处于等待连接的状态,直到你的发送程序启动并建立连接,才会开始接收数据
3. 修复数据读取逻辑的潜在bug
你代码里这两行有隐患:
every_1_climate = next(c) # ... alist = each.split(',')
c是你打开的ClimateData文件对象,但你已经用csv.reader(c)创建了ClimateData_csv,直接对文件对象c调用next()会跳过csv.reader的指针,导致数据读取混乱。应该改成:
every_1_climate = next(ClimateData_csv)
另外,next_n_lines(f,5)这个自定义函数你没贴出来,要确保它能正确读取FireData的5行原始字符串(因为你后面用了each.split(',')解析),如果这个函数出问题,可能导致没有数据发送或者格式错误。
4. 完善Spark Streaming的基础配置
你的Spark代码里缺少awaitTermination(),这会导致Spark Streaming启动后直接退出,根本没机会接收数据:
ssc.start() ssc.awaitTermination() # 加上这行,让程序保持运行等待数据
另外,你设置的批次间隔是5秒(StreamingContext(sc, 5)),所以即使数据发送正常,也需要等待最多5秒才会看到输出,别着急。
快速验证方法
可以先做个简单测试:
- 用Python写个极简Socket客户端,手动发送带换行的JSON字符串到
localhost:12345,看Spark是否能输出 - 在你的发送代码里,确认每次
print('Sending line',out)都能正常打印,确保确实有数据在发送
按照上面的步骤排查,应该能解决你的问题。
内容的提问来源于stack exchange,提问作者Damon




