You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark Streaming无法接收Kafka数据问题求助

解决Spark Streaming集成Kafka时的Py4JError问题

看起来你遇到的问题主要来自版本不兼容依赖配置错误代码拼写问题,下面一步步帮你排查解决:

1. 核心问题:Spark与Kafka Streaming依赖版本不匹配

你使用的是Spark 2.4.6,但引入的是spark-streaming-kafka-0-10_2.12-3.0.0.jar——这个jar包是给Spark 3.0.0用的,和你的Spark 2.4.6完全不兼容,这是导致Py4J通信错误的最主要原因。

你需要下载与Spark 2.4.6版本完全匹配的spark-streaming-kafka-0-10包:

  • 如果你的Spark 2.4.6是基于Scala 2.11构建的(hadoop2.7版本的Spark默认通常是Scala 2.11),下载spark-streaming-kafka-0-10_2.11-2.4.6.jar
  • 如果是Scala 2.12版本的Spark 2.4.6,下载spark-streaming-kafka-0-10_2.12-2.4.6.jar

另外,这个包还依赖kafka-clients的对应版本(建议用2.0.0左右,和Spark 2.4.6兼容),最好一并下载放到Spark的jars目录,或者在启动时指定。

2. 修正SparkSession的依赖配置方式

你当前的spark.sparkContext.config()用法是错误的,直接传jar路径不会被识别。正确的配置方式是指定spark.jars参数:

spark = SparkSession.builder.master("local[*]") \
    .appName("kafkaStreaming") \
    .config("spark.jars", "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/spark-streaming-kafka-0-10_2.11-2.4.6.jar") \
    .getOrCreate()

如果有多个依赖jar(比如kafka-clients),可以用逗号分隔路径:

.config("spark.jars", "/path/to/spark-streaming-kafka.jar,/path/to/kafka-clients.jar")

另外,更推荐的方式是在启动Jupyter Notebook时就指定依赖,比如:

PYSPARK_SUBMIT_ARGS="--jars /path/to/spark-streaming-kafka-0-10_2.11-2.4.6.jar,/path/to/kafka-clients-2.0.0.jar pyspark-shell" jupyter notebook

这样能确保依赖在Spark上下文初始化时就被加载。

3. 修正代码中的拼写错误

你的代码里lines.flatmap(lambda line: line.split(" "))是错误的,PySpark中RDD的正确方法名是**flatMap**(驼峰式,M大写),改成:

words = lines.flatMap(lambda line: line.split(" "))

4. 优化代码的输出逻辑

当前的print(words)只会打印DStream的对象信息,不会打印实际接收到的数据。如果你想看到数据流内容,需要用foreachRDD来处理每个批次的数据:

words.foreachRDD(lambda rdd: print(rdd.collect()))

修正后的完整代码示例

import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

n_secs = 3
topic = "generate"

# 正确配置依赖并创建SparkSession
spark = SparkSession.builder.master("local[*]") \
    .appName("kafkaStreaming") \
    .config("spark.jars", "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/spark-streaming-kafka-0-10_2.11-2.4.6.jar") \
    .getOrCreate()

sc = spark.sparkContext
ssc = StreamingContext(sc, n_secs)

# 创建Kafka Direct Stream
kStream = KafkaUtils.createDirectStream(ssc, [topic], {
    'bootstrap.servers':'localhost:9092',
    'group.id':'test-group',
    'auto.offset.reset':'latest'})

lines = kStream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))

# 打印每个批次的单词数据
words.foreachRDD(lambda rdd: print("Received data:", rdd.collect()))

ssc.start()
time.sleep(100)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

额外检查点

  • 确保Kafka服务正常运行,localhost:9092可以访问
  • 确保你的Kafka生产者确实在往generate主题发送数据
  • 检查Spark的日志(默认在spark-2.4.6-bin-hadoop2.7/logs目录),如果还有错误,可以从日志里找到更详细的Java端异常信息,帮助进一步排查

内容的提问来源于stack exchange,提问作者Saranya

火山引擎 最新活动