You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Spark Streaming集成Kafka报错:找不到kafka模块求助

解决Spark 3.0与Kafka集成的ImportError问题

兄弟,你遇到的ImportError: No module named kafka本质是API路径不对+依赖版本完全不匹配导致的,结合你用的Spark 3.0.0-preview2版本,我给你一步步梳理解决方案:

问题根源拆解

  • 旧API已被淘汰:你代码里导入的pyspark.streaming.kafka.KafkaUtils是Spark Streaming针对Kafka 0.8版本的旧API,Spark 3.0已经不再维护这个模块,官方推荐用Kafka 0.10+的新API,对应的导入路径是pyspark.streaming.kafka010
  • 依赖版本严重不匹配:你提交命令里用的spark-streaming-kafka-0-8_2.11:2.0.1和Spark 3.0.0-preview2版本差距太大,而且Spark 3.0默认适配Scala 2.12(部分发行版可能是2.11,但必须和Spark版本对应),旧版本依赖根本兼容不了。
  • 依赖混合冲突:你同时指定了0-8和0-10的Kafka jar,这会导致类加载混乱,反而加重问题。

具体修复步骤

1. 修改Python代码(适配Kafka 0.10+ API)

把旧的导入和Kafka连接代码替换成0.10版本的实现,这里用更推荐的createDirectStream(比旧的createStream性能更好,支持精确一次语义):

import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
# 替换成0.10版本的导入路径
from pyspark.streaming.kafka010 import KafkaUtils, ConsumerStrategies, LocationStrategies

if __name__=="__main__":
    sc = SparkContext(appName="SparkStreamAISfromKAFKA")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 1)
    
    # 配置Kafka参数
    kafka_params = {
        "bootstrap.servers": "my-kafka-broker",
        "group.id": "raw-event-streaming-consumer",
        "auto.offset.reset": "latest"  # 根据需求选latest/earliest
    }
    topics = ["enriched_ais_messages"]
    
    # 使用createDirectStream创建流
    kvs = KafkaUtils.createDirectStream(
        ssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topics, kafka_params)
    )
    
    lines = kvs.map(lambda x: x.value)  # 0.10版本里消息内容在value字段
    lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
    
    ssc.start()
    ssc.awaitTermination()

2. 修正spark-submit命令(依赖版本匹配)

删除旧的0-8依赖和多余的jar,使用和Spark 3.0.0-preview2匹配的Kafka 0.10依赖:

/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0-preview2 \
--master spark://mysparkip:7077 \
spark_streamer.py

注意:如果你的Spark发行版是基于Scala 2.11的,把_2.12改成_2.11,你可以通过Spark安装目录下lib文件夹里的scala-library jar文件名确认(比如scala-library-2.12.10.jar就是2.12版本)。

3. 额外注意事项

  • 如果你坚持要用旧的0.8 API(不推荐),必须保证依赖版本和Spark版本严格对应,比如Spark 3.0对应的0-8依赖是org.apache.spark:spark-streaming-kafka-0-8_2.12:3.0.0-preview2,但官方已标记这个API为废弃,后续版本会移除。
  • 确认你的Kafka broker版本是0.10及以上,0.10+的API不兼容低于0.10的Kafka集群。

内容的提问来源于stack exchange,提问作者e7lT2P

火山引擎 最新活动