本文以 Java 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_PLAINTEXT 接入点 SCRAM 机制接入消息队列 Kafka版,并收发消息。
已完成准备工作。详细说明请参考准备工作。
在 Java 项目的 pom.xml
中添加相关依赖。此处以 Kafka 2.2.2 版本为例。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.6</version>
</dependency>
创建消息队列 Kafka版配置文件 config.properties
。
配置文件字段的详细说明,请参考配置文件。
通过 SASL_PLAINTEXT 接入点 SCRAM 机制接入时,配置文件示例如下。
说明
SCRAM 机制下,应使用具备对应 Topic 访问权限的 SCRAM 用户进行 SASL 认证。获取用户名及密码的方式请参考2 收集连接信息。
bootstrap.servers=xxxxx
security.protocol=SASL_PLAINTEXT
topic=my-topic
consumer.group.id=test
consumer.auto.offset.reset=earliest
consumer.enable.auto.commit=false
client.dns.lookup=use_all_dns_ips
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="xxxx" password="xxxxx";
创建配置文件加载程序 KafkaConfigurer.java
。
package com.volcengine.openservice.kafka;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
public class KafkaConfigurer {
private static Properties properties;
public synchronized static Properties getKafkaProperties(String path) {
if (null != properties) {
return properties;
}
//获取配置文件config.properties的内容
Properties kafkaProperties = new Properties();
try {
FileInputStream conf = new FileInputStream(path);
kafkaProperties.load(conf);
} catch (Exception e) {
//没加载到文件,程序要考虑退出
e.printStackTrace();
}
properties = kafkaProperties;
return kafkaProperties;
}
}
创建发送消息程序 ProducerDemo.java
。
编译并运行 ProducerDemo.java
发送消息。
查看运行结果。
运行结果示例如下。
Produce ok:sasl-0@0 Produce ok:sasl-0@1 Produce ok:sasl-0@2 Produce ok:sasl-0@3
说明
消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送消息。
java -cp kafka-demo.jar com.volcengine.openservice.kafka.ProducerDemo ./config.properties
通过 SASL_PLAINTEXT 接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/src/main/java/com/volcengine/openservice/kafka/ProducerDemo.java
,实现相关业务逻辑。
package com.volcengine.openservice.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
class Producer {
// 生产者使用的topic
private static String topic;
// 生产者使用的配置
private static Properties props = new Properties();
private static KafkaProducer<String, String> producer;
// 构造生产者
public Producer(Properties kafkaProperties) {
setProps(kafkaProperties);
setSasl(kafkaProperties);
setTopic(kafkaProperties);
newProducer();
}
// 设置生成的topic, 请在控制台申请之后,填写在这里
private void setTopic(Properties kafkaProperties) {
topic = kafkaProperties.getProperty("topic");
}
// 设置安全协议, kafka支持SASL_PLAINTEXT和PLAINTEXT协议
private void setSasl(Properties kafkaProperties) {
String protocol = kafkaProperties.getProperty("security.protocol");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);
// 如果安全协议为PLAINTEXT,则不需要填充用户名和密码,直接返回
if (protocol.equals("PLAINTEXT")) {
return;
}
// 如果安全协议为SASL_PLAINTEXT,需要获取加密类型以及sasl的config
if (protocol.equals("SASL_PLAINTEXT")) {
props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getProperty("sasl.mechanism"));
props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaProperties.getProperty("sasl.jaas.config"));
return;
}
throw new IllegalArgumentException("security.protocol is not correct");
}
// 设置生产者的启动参数
private void setProps(Properties kafkaProperties) {
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//Kafka消息的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//请求的最长等待时间
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
//设置客户端内部重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 5);
//设置客户端内部重试间隔
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
// 设置客户端的dns策略
props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, kafkaProperties.getProperty("client.dns.lookup"));
}
// 构造生产者对象,也即生成一个生产实例
private void newProducer() {
// 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
// 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
producer = new KafkaProducer<String, String>(props);
}
public void Produce() {
//构造一个Kafka消息
String value = "this is demo message "; //消息的内容
//批量获取 futures 可以加快速度, 但注意,批量不要太大
List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
for (int i =0; i < 100; i++) {
//发送消息,并获得一个Future对象
ProducerRecord<String, String> kafkaMessage = new ProducerRecord<String, String>(topic, value + ": " + i);
Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
futures.add(metadataFuture);
}
producer.flush();
for (Future<RecordMetadata> future: futures) {
//同步获得Future对象的结果
try {
RecordMetadata recordMetadata = future.get();
System.out.println("Produce ok:" + recordMetadata.toString());
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}
public class ProducerDemo {
public static void main(String args[]) {
if (args.length < 1) {
System.out.println("please input config path");
return;
}
// 获取生产者对象
Producer producer = new Producer(KafkaConfigurer.getKafkaProperties(args[0]));
try {
// 生产消息
producer.Produce();
} catch (Exception e) {
// 客户端内部重试之后,仍然发送失败,业务要应对此类错误
System.out.println("error occurred");
e.printStackTrace();
}
}
}
创建 Consumer 订阅消息程序 ConsumerDemo.java
。
编译并运行 ConsumerDemo.java
消费消息。
查看运行结果。
运行结果示例如下。
Consume: ConsumerRecord(topic = sasl, patition = 6, leaderEpoch=0, offset=0, CreateTime = 1637207092476, serialized key size = -1, serialized key size = 24, headers = ReacordHeaders(headers = [], isReadOnly = false), key = null, value = this is demo, message : 0) Consume: ConsumerRecord(topic = sasl, patition = 6, leaderEpoch=0, offset=0, CreateTime = 1637207092476, serialized key size = -1, serialized key size = 24, headers = ReacordHeaders(headers = [], isReadOnly = false), key = null, value = this is demo, message : 1)
说明
消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令消费消息。
java -cp kafka-demo.jar com.volcengine.openservice.kafka.ConsumerDemo ./config.properties
通过 SASL_PLAINTEXT 接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/src/main/java/com/volcengine/openservice/kafka/ConsumerDemo.java
,实现相关业务逻辑。
package com.volcengine.openservice.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
class Consumer {
// 消费者的启动配置
private static Properties props = new Properties();
private KafkaConsumer<String, String> consumer;
// 消费者实例构造函数
public Consumer(Properties kafkaProperties) {
setProps(kafkaProperties);
setSasl(kafkaProperties);
newConsumer();
subscribed(kafkaProperties);
}
// 设置安全协议, kafka支持SASL_PLAINTEXT和PLAINTEXT协议
private void setSasl(Properties kafkaProperties) {
String protocol = kafkaProperties.getProperty("security.protocol");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);
// 如果安全协议为PLAINTEXT,则不需要填充用户名和密码,直接返回
if (protocol.equals("PLAINTEXT")) {
return;
}
// 如果安全协议为SASL_PLAINTEXT,需要获取加密类型以及sasl的config
if (protocol.equals("SASL_PLAINTEXT")) {
props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getProperty("sasl.mechanism"));
props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaProperties.getProperty("sasl.jaas.config"));
return;
}
throw new IllegalArgumentException("security.protocol is not correct");
}
// 设置消费者的启动参数
private void setProps(Properties kafkaProperties) {
//设置接入点,请通过控制台获取对应Topic的接入点
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
//可根据实际拉取数据和客户的版本等设置此值,默认30s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
//每次poll的最大数量
//注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
//消息的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//当前消费实例所属的消费组
//属于同一个组的消费实例,会负载消费消息
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("consumer.group.id"));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getProperty("consumer.auto.offset.reset"));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getProperty("consumer.enable.auto.commit"));
// 设置客户端的dns策略
props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, kafkaProperties.getProperty("client.dns.lookup"));
}
// 构造消费者对象,也即生成一个消费实例
private void newConsumer() {
consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
}
// 订阅topic
private void subscribed(Properties kafkaProperties) {
//设置消费组订阅的Topic
List<String> subscribedTopic = new ArrayList<String>();
//如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
//Topic需要先在控制台进行创建
subscribedTopic.add(kafkaProperties.getProperty("topic"));
consumer.subscribe(subscribedTopic);
}
// 消费消息
public void poll() {
ConsumerRecords<String, String> records = consumer.poll(1000);
//必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG
//建议开一个单独的线程池来消费消息,然后异步返回结果
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consume: %s%n", record.toString());
}
}
}
public class ConsumerDemo {
public static void main(String args[]) {
if (args.length < 1) {
System.out.println("please input config path");
return;
}
// 获取消费者对象
Consumer consumer = new Consumer(KafkaConfigurer.getKafkaProperties(args[0]));
//循环消费消息
while (true){
try {
// 消费消息
consumer.poll();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
}
}