You need to enable JavaScript to run this app.
导航

通过 Spark Streaming 消费日志

最近更新时间2023.11.13 16:31:34

首次发布时间2023.02.13 18:01:42

日志服务提供 Kafka 协议消费功能,您可以使用 Spark Streaming 的 spark-streaming-kafka 组件对接日志服务,通过 Spark Streaming 将日志服务中采集的日志数据消费到下游的大数据组件或者数据仓库。

场景概述

Spark Streaming 是构建在 Spark 上的实时计算框架,在 Spark 的基础上提供了可拓展、高吞吐、容错的流计算能力。Spark Streaming 可整合多种数据源,例如通过 spark-streaming-kafka 组件整合 Kafka,实现消费 Kafka 消息的能力。日志服务支持为指定的日志主题开启 Kafka 协议消费功能,开启后,Spark Streaming 可以将日志主题作为 Kafka 的 Topic 进行消费,例如消费到下游的大数据组件或者数据仓库,适用于流式计算或大数据存储场景。

前提条件

  • 已创建日志项目和日志主题。详细操作步骤请参考创建资源
  • 已为指定日志主题开启 Kafka 协议消费功能,并获取Kafka协议消费主题ID。详细说明请参考通过 Kafka 协议消费日志
  • 推荐使用 IAM 用户进行访问鉴权。使用 IAM 用户前,需确认火山引擎主账号已创建 IAM 用户,且已为其授予消费相关的权限。详细说明请参考可授予的权限
  • 已获取当前登录账号的密钥 Access Key。详细信息请参考创建密钥

配置步骤

1 添加 Maven 依赖

通过 Spark Streaming 消费火山引擎日志服务的日志数据时,需要使用 Spark Streaming 提供的 spark-streaming-kafka-0-10。
在项目中添加 spark-streaming-kafka 相关的 Maven 依赖。依赖的详细信息如下:

说明

日志服务 Kafka 消费功能仅支持 0.11.0 及以上的 Kafka 协议版本,需要手动指定 Spark 的 Kafka client 版本为 0.11.0及以上。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>${spark.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <!--版本大于等于0.11.0即可 -->
    <version>1.0.0</version>
</dependency>

2 配置 Spark input stream

参考以下示例代码完成 Spark input stream 的相关配置。详细的配置说明请参考 Spark 官方文档,参数说明请参考下表。
以下示例展示了如何构建 Spark input stream 来消费日志主题 “0fdaa6b6-3c9f-424c-8664-fc0d222c****” 中的日志数据。

// 构建SparkStreaming上下文
SparkConf conf = new SparkConf().setAppName("TlsDemo").setMaster("local").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses((Class<?>[]) Arrays.asList(ConsumerRecord.class).toArray());

// 每隔5秒钟,sparkStreaming作业就会收集最近5秒内的数据源接收过来的数据
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

// 构建kafka参数map
// 主要要放置的是连接的kafka集群的地址(broker集群的地址列表)
Map<String, Object> kafkaParams = new HashMap<>();
//初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中:
//服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。
//端口号固定为 9093。
kafkaParams.put("bootstrap.servers", tlsEndConsumePoint);
//指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("key.deserializer", StringDeserializer.class);
//指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
kafkaParams.put("value.deserializer", StringDeserializer.class);
//消费组ID,随意指定
kafkaParams.put("group.id", consumeGroupID);
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
//安全模式,tls只支持SASL_SSL
kafkaParams.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
//鉴权模式,tls只支持PLAIN
kafkaParams.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
//账号密码设置:
//用户名:应配置为日志服务的日志项目ID
//密码:应配置为火山引擎账户密钥
// 格式为 ${access-key-id}#${access-key-secret},其中:
//${access-key-id} 应替换为您的 AccessKey ID。
//${access-key-secret} 应替换为您的 AccessKey Secret。
kafkaParams.put(SaslConfigs.SASL_JAAS_CONFIG,
        "org.apache.kafka.common.security.plain.PlainLoginModule " +
                "required username=\"" + bd4d5b20-479a-4f33-9099-a44db3be**** + "\" password=\"" + AK****NWE2MTI#WldWa1p****dZd05JKJKHHUI== + "\";");

// 构建topic set
Collection<String> topics = new HashSet<>();
//要消费的topic,格式为out-xxxxxxxxx
topics.add(out-0fdaa6b6-3c9f-424c-8664-fc0d222c****);

try {
    // 获取kafka的数据
    final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe(topics, kafkaParams));
            
    // todo 消费到下游大数据组件
    stream.print();
    jssc.start();
    jssc.awaitTermination();
    jssc.close();

} catch (Exception e) {
    e.printStackTrace();
}

其中,必选的设置如下:

配置项

说明

bootstrap.servers

初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093。

  • 服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务地址,详细信息请参见服务地址
  • 端口号固定为 9093。

consumeTopic

要订阅的 Kafka Topic。此处应配置为日志服务的 Kafka 协议消费主题 ID,格式为 out-日志主题ID,例如 out-0fdaa6b6-3c9f-424c-8664-fc0d222c****。 您可以在日志服务控制台的 Topic 详情页中查看并复制 Kafka 协议消费主题 ID

SECURITY_PROTOCOL_CONFIG

安全模式,应指定为 SASL_SSL。

SASL_MECHANISM

鉴权机制,指定为 PLAIN。

SASL_JAAS_CONFIG

当前登录用户的鉴权信息。其中:

  • useranme:Kafka SASL 用户名。应配置为日志服务的日志项目 ID。
  • password:Kafka SASL 用户密码。应配置为火山引擎账户密钥。格式为 ${access-key-id}#${access-key-secret},其中:
    • ${access-key-id} 应替换为您的 AccessKey ID。
    • ${access-key-secret} 应替换为您的 AccessKey Secret。