Spark Streaming集成Kafka报错:找不到kafka模块求助
解决Spark 3.0与Kafka集成的ImportError问题
兄弟,你遇到的ImportError: No module named kafka本质是API路径不对+依赖版本完全不匹配导致的,结合你用的Spark 3.0.0-preview2版本,我给你一步步梳理解决方案:
问题根源拆解
- 旧API已被淘汰:你代码里导入的
pyspark.streaming.kafka.KafkaUtils是Spark Streaming针对Kafka 0.8版本的旧API,Spark 3.0已经不再维护这个模块,官方推荐用Kafka 0.10+的新API,对应的导入路径是pyspark.streaming.kafka010。 - 依赖版本严重不匹配:你提交命令里用的
spark-streaming-kafka-0-8_2.11:2.0.1和Spark 3.0.0-preview2版本差距太大,而且Spark 3.0默认适配Scala 2.12(部分发行版可能是2.11,但必须和Spark版本对应),旧版本依赖根本兼容不了。 - 依赖混合冲突:你同时指定了0-8和0-10的Kafka jar,这会导致类加载混乱,反而加重问题。
具体修复步骤
1. 修改Python代码(适配Kafka 0.10+ API)
把旧的导入和Kafka连接代码替换成0.10版本的实现,这里用更推荐的createDirectStream(比旧的createStream性能更好,支持精确一次语义):
import os os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7' import findspark findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7') import pyspark import sys from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext # 替换成0.10版本的导入路径 from pyspark.streaming.kafka010 import KafkaUtils, ConsumerStrategies, LocationStrategies if __name__=="__main__": sc = SparkContext(appName="SparkStreamAISfromKAFKA") sc.setLogLevel("WARN") ssc = StreamingContext(sc, 1) # 配置Kafka参数 kafka_params = { "bootstrap.servers": "my-kafka-broker", "group.id": "raw-event-streaming-consumer", "auto.offset.reset": "latest" # 根据需求选latest/earliest } topics = ["enriched_ais_messages"] # 使用createDirectStream创建流 kvs = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topics, kafka_params) ) lines = kvs.map(lambda x: x.value) # 0.10版本里消息内容在value字段 lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint() ssc.start() ssc.awaitTermination()
2. 修正spark-submit命令(依赖版本匹配)
删除旧的0-8依赖和多余的jar,使用和Spark 3.0.0-preview2匹配的Kafka 0.10依赖:
/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit \ --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0-preview2 \ --master spark://mysparkip:7077 \ spark_streamer.py
注意:如果你的Spark发行版是基于Scala 2.11的,把
_2.12改成_2.11,你可以通过Spark安装目录下lib文件夹里的scala-library jar文件名确认(比如scala-library-2.12.10.jar就是2.12版本)。
3. 额外注意事项
- 如果你坚持要用旧的0.8 API(不推荐),必须保证依赖版本和Spark版本严格对应,比如Spark 3.0对应的0-8依赖是
org.apache.spark:spark-streaming-kafka-0-8_2.12:3.0.0-preview2,但官方已标记这个API为废弃,后续版本会移除。 - 确认你的Kafka broker版本是0.10及以上,0.10+的API不兼容低于0.10的Kafka集群。
内容的提问来源于stack exchange,提问作者e7lT2P




