You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

Spark Streaming从Kafka取数本地正常YARN无输出问题求助

解决YARN模式下Spark Streaming消费Kafka无输出的问题

我之前在CDH5.x环境里踩过类似的Spark Streaming YARN模式无输出的坑,结合你的环境(CDH5.12、Spark1.6、Kafka0.10、Python2.6.6),给你几个针对性的排查和解决方向:

1. 清理ZK中残留的消费组元数据

你用的是基于ZK的旧版Kafka消费API(KafkaUtils.createStream),YARN模式下如果之前有同名消费组spark-streaming-consumer的残留offset信息,可能导致新任务无法正确获取最新的消息位置。

  • 登录ZK节点,用zkCli.sh连接:
    zkCli.sh -server <ZKhostname>:2181
    
  • 查看并删除消费组节点:
    ls /consumers/spark-streaming-consumer
    deleteall /consumers/spark-streaming-consumer
    

删除后重新提交YARN任务,看是否能正常消费。

2. 确保依赖包版本兼容

Spark1.6与Kafka0.10的适配需要特定的依赖包,你当前指定的spark-examples.jar可能不足以覆盖所有依赖。建议:

  • 提交任务时通过--packages参数指定官方兼容的依赖:
    spark-submit --master yarn --deploy-mode client --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.3 testfile.py <ZKhostname>:2181 <kafka-topic>
    
    注意Scala版本要和你的CDH Spark匹配(CDH5.12的Spark1.6通常基于Scala2.10)。
  • 如果内网无法下载包,可以从Cloudera仓库下载对应的spark-streaming-kafka_2.10-1.6.3-cdh5.12.0.jar,放到Spark的lib目录或者提交时用--jars指定。

3. 排查YARN容器的网络与权限问题

YARN的Executor节点可能无法访问ZK或Kafka服务:

  • 网络连通性:在Executor所在节点执行telnet <ZKhostname> 2181telnet <KafkaBroker> 9092,确认端口能通;如果不通,检查防火墙规则或安全组配置。
  • 权限问题:YARN运行任务的用户(默认是提交任务的用户)需要有访问ZK节点和Kafka topic的权限。可以尝试用该用户在Executor节点运行Kafka命令行消费者:
    kafka-console-consumer.sh --zookeeper <ZKhostname>:2181 --topic <kafka-topic> --from-beginning
    

如果命令行能消费,说明权限没问题;如果不能,需要调整Kafka的ACL或ZK的权限配置。

4. 强制指定消费起始Offset

旧版ZK消费API可能在YARN模式下默认从已提交的offset开始消费,而如果之前的offset已经是topic末尾,就不会有新输出。可以在代码里强制从最新位置开始消费:

from pyspark.streaming.kafka import TopicAndPartition

if __name__ == "__main__":
    # ... 原有代码 ...
    # 获取topic的分区信息
    topic_partitions = KafkaUtils.getOffsets(ssc, zkQuorum, "spark-streaming-consumer", [topic])
    # 构建初始offset,设置为每个分区的最新位置
    initial_offsets = {}
    for tp, offset in topic_partitions:
        # 这里可以根据需求设置为offset+1(跳过已消费)或直接用Kafka的最新offset
        initial_offsets[tp] = offset + 1
    # 用初始offset创建流
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
                                  {topic: 1}, initialOffsets=initial_offsets)
    # ... 原有代码 ...

5. 调整Spark Streaming的批次与资源配置

  • 批次时间:你当前设置的批次时间是1秒,YARN模式下资源调度有额外开销,可能导致批次处理不及时。尝试把StreamingContext(sc, 1)改成StreamingContext(sc, 5)或更大的值。
  • 资源配置:除了Executor内存,还可以指定Executor数量和核数,比如:
    spark-submit --master yarn --deploy-mode client \
      --executor-memory 3G \
      --num-executors 2 \
      --executor-cores 2 \
      --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.3 \
      testfile.py <ZKhostname>:2181 <kafka-topic>
    

验证步骤

  1. 先运行一个简单的Spark YARN任务(比如spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi /usr/lib/spark/lib/spark-examples.jar),确认YARN集群正常工作。
  2. 用Kafka命令行生产者往topic发测试消息,同时用命令行消费者验证消息能被消费。
  3. 逐步应用上面的排查点,每次调整后重新提交任务并查看YARN日志。

内容的提问来源于stack exchange,提问作者Samhash

火山引擎 最新活动