本文以 C++ 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_PLAINTEXT 接入点 SCRAM 机制接入消息队列 Kafka版,并收发消息。
已完成准备工作。详细说明请参考准备工作。
创建消息发送程序 producer.cpp
。
执行以下命令编译 producer.cpp
。
gcc -lrdkafka ./producer.cpp -o producer
Shell
执行以下命令发送消息。
从命令行接收消息并发送至 Kafka。
./producer -b <bootstrap_servers> -t <topic> -u <user> -p <password> -m SCRAM-SHA-256
Shell
查看运行结果。
运行结果示例如下。
说明
消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。
./producer -b <bootstrap_servers> -t <topic> -u <user> -p <password> -m SCRAM-SHA-256
Shell
通过 SASL_SSL 接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/producer.cpp
,实现相关业务逻辑。
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Apache Kafka producer example programs
* using the Kafka driver from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "librdkafka/rdkafka.h" /* for Kafka driver */
static volatile sig_atomic_t run = 1;
static rd_kafka_t *rk;
static void stop(int sig) {
run = 0;
fclose(stdin); /* abort fgets() */
}
/**
* Kafka logger callback (optional)
*/
static void
logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
struct timeval tv;
gettimeofday(&tv, NULL);
fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec,
(int)(tv.tv_usec / 1000), level, fac,
rk ? rd_kafka_name(rk) : NULL, buf);
}
/**
* Message delivery report callback using the richer rd_kafka_message_t object.
*/
static void msg_delivered(rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage,
void *opaque) {
if (rkmessage->err)
fprintf(stderr,
"%% Message delivery failed: %s\n",
rd_kafka_err2str(rkmessage->err));
else
fprintf(stderr,
"%% Message delivered (%zd bytes, offset %" PRId64
", "
"partition %" PRId32 "): %.*s\n",
rkmessage->len, rkmessage->offset, rkmessage->partition,
(int)rkmessage->len, (const char *)rkmessage->payload);
}
static void sig_usr1(int sig) {
rd_kafka_dump(stdout, rk);
}
int main(int argc, char **argv) {
rd_kafka_topic_t *rkt;
char *brokers = NULL;
char *topic = NULL;
char *user = NULL;
char *password = NULL;
char *mechanisms = NULL;
int partition = RD_KAFKA_PARTITION_UA;
int opt;
rd_kafka_conf_t *conf;
char errstr[512];
char tmp[16];
rd_kafka_resp_err_t err;
bool printUsage = true;
/* Kafka configuration */
conf = rd_kafka_conf_new();
/* Set logger */
rd_kafka_conf_set_log_cb(conf, logger);
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
while ((opt = getopt(argc, argv, "h:t:b:u:p:m:e:d")) != -1) {
switch (opt) {
case 't':
topic = optarg;
break;
case 'b':
brokers = optarg;
break;
case 'u':
user = optarg;
break;
case 'p':
password = optarg;
break;
case 'm':
mechanisms = optarg;
break;
case 'd':
if (rd_kafka_conf_set(conf, "debug", optarg, errstr,
sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr,
"%% Debug configuration failed: "
"%s: %s\n",
errstr, optarg);
exit(1);
}
break;
default:
goto usage;
}
printUsage = false;
}
if (printUsage) {
usage:
fprintf(stderr,
"\n"
" Options:\n"
" -t <topic> 必填,需要生产的主题,可以从kafka主题页获取\n"
" -b <brokers> 必填,kafka实例的接入点,可以从实例详情页获取\n"
" -u <user> 选填,使用SASL接入点时必填,可以从用户管理页面获取用户名称\n"
" -p <password> 选填,使用SASL接入点时必填,需要user对应的密码\n"
" -m <mechanisms> 选填, 使用SASL接入点时必填,加密类型,取值为PLAIN|SCRAM-SHA-256,可以从用户管理页面查询\n"
"\n");
exit(1);
}
signal(SIGINT, stop);
signal(SIGUSR1, sig_usr1);
/* Set bootstrap servers */
if (brokers &&
rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (mechanisms != NULL) {
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置通信协议
rd_kafka_conf_set(conf, "sasl.mechanisms", mechanisms, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置加密协议
rd_kafka_conf_set(conf, "sasl.username", user, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置用户
rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { // 设置bootstrap地址
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
}
/*
* Producer
*/
char buf[2048];
/* Set up a message delivery report callback.
* It will be called once for each message, either on successful
* delivery to broker, or upon failure to deliver to broker. */
rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered);
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n",
errstr);
exit(1);
}
/* Create topic */
rkt = rd_kafka_topic_new(rk, topic, NULL);
fprintf(stderr,
"%% Type stuff and hit enter to send\n");
while (run && fgets(buf, sizeof(buf), stdin)) {
size_t len = strlen(buf);
if (buf[len - 1] == '\n')
buf[--len] = '\0';
err = RD_KAFKA_RESP_ERR_NO_ERROR;
/* Send/Produce message. */
if (rd_kafka_produce(
rkt, partition, RD_KAFKA_MSG_F_COPY,
/* Payload and length */
buf, len,
/* Optional key and its length */
NULL, 0,
/* Message opaque, provided in
* delivery report callback as
* msg_opaque. */
NULL) == -1) {
err = rd_kafka_last_error();
}
if (err) {
fprintf(stderr,
"%% Failed to produce to topic %s "
"partition %i: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(err));
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
continue;
}
fprintf(stderr,
"%% Sent %zd bytes to topic "
"%s partition %i\n",
len, rd_kafka_topic_name(rkt),
partition);
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
}
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
/* Wait for messages to be delivered */
while (run && rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 100);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy the handle */
rd_kafka_destroy(rk);
/* Let background threads clean up and terminate cleanly. */
run = 5;
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
printf("Waiting for librdkafka to decommission\n");
if (run <= 0)
rd_kafka_dump(stdout, rk);
return 0;
}
C++
创建 Consumer 订阅消息程序 consumer.cpp
。
执行以下命令编译 consumer.cpp
。
gcc -lrdkafka ./consumer.cpp -o consumer
Shell
执行如下命令消费消息。
./consumer -b <bootstrap_servers> -t <topic> -p group -u <user> -p <password> -m SCRAM-SHA-256
Shell
查看运行结果。
运行结果示例如下。
通过 SASL_SSL 接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/consumer.cpp
,实现相关业务逻辑。
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Apache Kafka consumer example programs
* using the Kafka driver from librdkafka
* (https://github.com/edenhill/librdkafka)
*/
#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is builtin from within the librdkafka source tree and thus differs. */
#include "librdkafka/rdkafka.h" /* for Kafka driver */
static volatile sig_atomic_t run = 1;
static rd_kafka_t *rk;
static int wait_eof = 0; /* number of partitions awaiting EOF */
static void stop(int sig) {
run = 0;
fclose(stdin); /* abort fgets() */
}
/**
* Kafka logger callback (optional)
*/
static void
logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) {
struct timeval tv;
gettimeofday(&tv, NULL);
fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec,
(int)(tv.tv_usec / 1000), level, fac,
rk ? rd_kafka_name(rk) : NULL, buf);
}
static void msg_consume(rd_kafka_message_t *rkmessage) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr,
"%% Consumer reached end of %s [%" PRId32
"] "
"message queue at offset %" PRId64 "\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
return;
}
fprintf(stderr,
"%% Consume error for topic \"%s\" [%" PRId32
"] "
"offset %" PRId64 ": %s\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
run = 0;
return;
}
if (rkmessage->key_len) {
printf("Key: %.*s\n", (int)rkmessage->key_len,
(char *)rkmessage->key);
}
printf("%.*s\n", (int)rkmessage->len,
(char *)rkmessage->payload);
}
static void
print_partition_list(FILE *fp,
const rd_kafka_topic_partition_list_t *partitions) {
int i;
for (i = 0; i < partitions->cnt; i++) {
fprintf(fp, "%s %s [%" PRId32 "] offset %" PRId64,
i > 0 ? "," : "", partitions->elems[i].topic,
partitions->elems[i].partition,
partitions->elems[i].offset);
}
fprintf(fp, "\n");
}
static void rebalance_cb (rd_kafka_t *rk,
rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *partitions,
void *opaque) {
fprintf(stderr, "%% Consumer group rebalanced: ");
switch (err)
{
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
fprintf(stderr, "assigned:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, partitions);
wait_eof += partitions->cnt;
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
fprintf(stderr, "revoked:\n");
print_partition_list(stderr, partitions);
rd_kafka_assign(rk, NULL);
wait_eof = 0;
break;
default:
fprintf(stderr, "failed: %s\n",
rd_kafka_err2str(err));
rd_kafka_assign(rk, NULL);
break;
}
}
static void sig_usr1(int sig) {
rd_kafka_dump(stdout, rk);
}
int main(int argc, char **argv) {
char *brokers = NULL;
char *topic = NULL;
char *user = NULL;
char *password = NULL;
char *mechanisms = NULL;
char *group_id = NULL;
int partition = RD_KAFKA_PARTITION_UA;
int opt;
rd_kafka_conf_t *conf;
char errstr[512];
char tmp[16];
rd_kafka_topic_partition_list_t *topics;
rd_kafka_resp_err_t err;
bool printUsage = true;
/* Kafka configuration */
conf = rd_kafka_conf_new();
/* Set logger */
rd_kafka_conf_set_log_cb(conf, logger);
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
while ((opt = getopt(argc, argv, "h:t:b:u:p:g:m:e:d")) != -1) {
switch (opt) {
case 't':
topic = optarg;
break;
case 'b':
brokers = optarg;
break;
case 'u':
user = optarg;
break;
case 'p':
password = optarg;
break;
case 'm':
mechanisms = optarg;
break;
case 'g':
group_id = optarg;
break;
case 'd':
if (rd_kafka_conf_set(conf, "debug", optarg, errstr,
sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr,
"%% Debug configuration failed: "
"%s: %s\n",
errstr, optarg);
exit(1);
}
break;
default:
goto usage;
}
printUsage = false;
}
if (printUsage) {
usage:
fprintf(stderr,
"\n"
" Options:\n"
" -t <topic> 必填,需要生产的主题,可以从kafka主题页获取\n"
" -b <brokers> 必填,kafka实例的接入点,可以从实例详情页获取\n"
" -g <group> 必填, kafka消费组,当消费组不存在时,客户端会自动创建消费组\n"
" -u <user> 选填,使用SASL接入点时必填,可以从用户管理页面获取用户名称\n"
" -p <password> 选填,使用SASL接入点时必填,需要user对应的密码\n"
" -m <mechanisms> 选填, 使用SASL接入点时必填,加密类型,取值为PLAIN|SCRAM-SHA-256,可以从用户管理页面查询\n"
"\n");
exit(1);
}
signal(SIGINT, stop);
signal(SIGUSR1, sig_usr1);
/* Set bootstrap servers */
if (brokers &&
rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
/* Set group id */
if (rd_kafka_conf_set(conf, "group.id", group_id,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
} else {
/* Callback called on partition assignment changes */
rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);
rd_kafka_conf_set(conf, "enable.partition.eof", "true", NULL,0);
}
/* 设置其实offset */
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
if (mechanisms != NULL) {
if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置通信协议
rd_kafka_conf_set(conf, "sasl.mechanisms", mechanisms, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置加密协议
rd_kafka_conf_set(conf, "sasl.username", user, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK || // 设置用户
rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { // 设置bootstrap地址
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
}
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr,
sizeof(errstr)))) {
fprintf(stderr,
"%% Failed to create new consumer: %s\n",
errstr);
exit(1);
}
/* Redirect rd_kafka_poll() to consumer_poll() */
rd_kafka_poll_set_consumer(rk);
topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic, partition);
/* 订阅topic */
if ((err = rd_kafka_subscribe(rk, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
exit(1);
}
while (run) {
rd_kafka_message_t *rkmessage;
rkmessage = rd_kafka_consumer_poll(rk, 1000);
if (rkmessage) {
msg_consume(rkmessage);
rd_kafka_message_destroy(rkmessage);
}
}
/* Stop consuming */
err = rd_kafka_consumer_close(rk);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n",
rd_kafka_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
rd_kafka_topic_partition_list_destroy(topics);
/* Destroy the handle */
rd_kafka_destroy(rk);
/* Let background threads clean up and terminate cleanly. */
run = 5;
while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
printf("Waiting for librdkafka to decommission\n");
if (run <= 0)
rd_kafka_dump(stdout, rk);
return 0;
}
C++