You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

无法向Spark发送编码JSON数据,Socket流传输无输出求助

解决Spark Streaming无输出的排查方案

折腾一整晚没搞定确实闹心,我帮你梳理几个最可能导致服务端空白无输出的关键问题,按优先级来排查:

1. 给发送的JSON添加换行符(最可能的根因)

Spark Streaming的socketTextStream默认是按换行符(\n)分割数据流的——只有当它收到带换行符的数据包时,才会把内容当作一条完整记录处理并输出。

看你客户端的发送逻辑:

out = json.dumps(row)# encode it before sending
self.conn.send(bytes(out,encoding = "utf8"))

这里发送的JSON字符串末尾没有加换行符,Spark会一直缓存收到的数据,直到触发换行才会输出。修改成这样就行:

out = json.dumps(row) + "\n"  # 给每条JSON添加换行符
self.conn.send(bytes(out, encoding="utf8"))

2. 严格遵守Socket连接的启动顺序

你的数据发送程序是Socket服务端(调用了s.bind()s.listen()),而Spark的socketTextStream是作为Socket客户端去连接它的。所以必须:

  • 先启动你的数据发送程序,等它打印出Listening for a client at localhost 12345之后,再启动Spark Streaming服务端
  • 如果先启动Spark,它会一直处于等待连接的状态,直到你的发送程序启动并建立连接,才会开始接收数据

3. 修复数据读取逻辑的潜在bug

你代码里这两行有隐患:

every_1_climate = next(c)
# ...
alist = each.split(',')

c是你打开的ClimateData文件对象,但你已经用csv.reader(c)创建了ClimateData_csv,直接对文件对象c调用next()会跳过csv.reader的指针,导致数据读取混乱。应该改成:

every_1_climate = next(ClimateData_csv)

另外,next_n_lines(f,5)这个自定义函数你没贴出来,要确保它能正确读取FireData的5行原始字符串(因为你后面用了each.split(',')解析),如果这个函数出问题,可能导致没有数据发送或者格式错误。

4. 完善Spark Streaming的基础配置

你的Spark代码里缺少awaitTermination(),这会导致Spark Streaming启动后直接退出,根本没机会接收数据:

ssc.start()
ssc.awaitTermination()  # 加上这行,让程序保持运行等待数据

另外,你设置的批次间隔是5秒(StreamingContext(sc, 5)),所以即使数据发送正常,也需要等待最多5秒才会看到输出,别着急。

快速验证方法

可以先做个简单测试:

  1. 用Python写个极简Socket客户端,手动发送带换行的JSON字符串到localhost:12345,看Spark是否能输出
  2. 在你的发送代码里,确认每次print('Sending line',out)都能正常打印,确保确实有数据在发送

按照上面的步骤排查,应该能解决你的问题。

内容的提问来源于stack exchange,提问作者Damon

火山引擎 最新活动