Spark Streaming从Kafka取数本地正常YARN无输出问题求助
解决YARN模式下Spark Streaming消费Kafka无输出的问题
我之前在CDH5.x环境里踩过类似的Spark Streaming YARN模式无输出的坑,结合你的环境(CDH5.12、Spark1.6、Kafka0.10、Python2.6.6),给你几个针对性的排查和解决方向:
1. 清理ZK中残留的消费组元数据
你用的是基于ZK的旧版Kafka消费API(KafkaUtils.createStream),YARN模式下如果之前有同名消费组spark-streaming-consumer的残留offset信息,可能导致新任务无法正确获取最新的消息位置。
- 登录ZK节点,用
zkCli.sh连接:zkCli.sh -server <ZKhostname>:2181 - 查看并删除消费组节点:
ls /consumers/spark-streaming-consumer deleteall /consumers/spark-streaming-consumer
删除后重新提交YARN任务,看是否能正常消费。
2. 确保依赖包版本兼容
Spark1.6与Kafka0.10的适配需要特定的依赖包,你当前指定的spark-examples.jar可能不足以覆盖所有依赖。建议:
- 提交任务时通过
--packages参数指定官方兼容的依赖:
注意Scala版本要和你的CDH Spark匹配(CDH5.12的Spark1.6通常基于Scala2.10)。spark-submit --master yarn --deploy-mode client --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.3 testfile.py <ZKhostname>:2181 <kafka-topic> - 如果内网无法下载包,可以从Cloudera仓库下载对应的
spark-streaming-kafka_2.10-1.6.3-cdh5.12.0.jar,放到Spark的lib目录或者提交时用--jars指定。
3. 排查YARN容器的网络与权限问题
YARN的Executor节点可能无法访问ZK或Kafka服务:
- 网络连通性:在Executor所在节点执行
telnet <ZKhostname> 2181和telnet <KafkaBroker> 9092,确认端口能通;如果不通,检查防火墙规则或安全组配置。 - 权限问题:YARN运行任务的用户(默认是提交任务的用户)需要有访问ZK节点和Kafka topic的权限。可以尝试用该用户在Executor节点运行Kafka命令行消费者:
kafka-console-consumer.sh --zookeeper <ZKhostname>:2181 --topic <kafka-topic> --from-beginning
如果命令行能消费,说明权限没问题;如果不能,需要调整Kafka的ACL或ZK的权限配置。
4. 强制指定消费起始Offset
旧版ZK消费API可能在YARN模式下默认从已提交的offset开始消费,而如果之前的offset已经是topic末尾,就不会有新输出。可以在代码里强制从最新位置开始消费:
from pyspark.streaming.kafka import TopicAndPartition if __name__ == "__main__": # ... 原有代码 ... # 获取topic的分区信息 topic_partitions = KafkaUtils.getOffsets(ssc, zkQuorum, "spark-streaming-consumer", [topic]) # 构建初始offset,设置为每个分区的最新位置 initial_offsets = {} for tp, offset in topic_partitions: # 这里可以根据需求设置为offset+1(跳过已消费)或直接用Kafka的最新offset initial_offsets[tp] = offset + 1 # 用初始offset创建流 kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}, initialOffsets=initial_offsets) # ... 原有代码 ...
5. 调整Spark Streaming的批次与资源配置
- 批次时间:你当前设置的批次时间是1秒,YARN模式下资源调度有额外开销,可能导致批次处理不及时。尝试把
StreamingContext(sc, 1)改成StreamingContext(sc, 5)或更大的值。 - 资源配置:除了Executor内存,还可以指定Executor数量和核数,比如:
spark-submit --master yarn --deploy-mode client \ --executor-memory 3G \ --num-executors 2 \ --executor-cores 2 \ --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.3 \ testfile.py <ZKhostname>:2181 <kafka-topic>
验证步骤
- 先运行一个简单的Spark YARN任务(比如
spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi /usr/lib/spark/lib/spark-examples.jar),确认YARN集群正常工作。 - 用Kafka命令行生产者往topic发测试消息,同时用命令行消费者验证消息能被消费。
- 逐步应用上面的排查点,每次调整后重新提交任务并查看YARN日志。
内容的提问来源于stack exchange,提问作者Samhash




