You need to enable JavaScript to run this app.
导航

开发指南

最近更新时间2023.01.16 10:10:10

首次发布时间2023.01.16 10:10:10

Java 依赖库

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

发送消息

//在控制台查看对应接入点信息String server = "xxx.";
//在控制台申请的消息所属TopicString topic = "this is your topic.";
//测试消息内容String value = "this is test message value.";
//发送消息条数int count = 100;
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
try {
    for (int i = 0; i < count; i++) {
        RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++))
                .get(5, TimeUnit.SECONDS);
        logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.",
                recordMetadata.topic(),
                recordMetadata.partition(),
                recordMetadata.offset(),
                i);
    }
} catch (Throwable e) {
    logger.error("produce error", e);
}
producer.flush();
producer.close();

消费消息

//在控制台查看对应接入点信息String server = "xxx.";
//在控制台申请的消息所属TopicString topic = "this is your topic.";
//在控制台申请消费消息的consumerGroupString group = "this is your group.";
//消费offset策略:earliest, latest, noneString offsetReset = "earliest";
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = Lists.newArrayList(topic);
consumer.subscribe(topicList);
while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
    } catch (Exception e) {
        logger.error("consume error", e);
    }
}