最近更新时间:2024.02.28 19:27:40
首次发布时间:2022.07.26 15:08:01
云原生消息引擎 BMQ(Bytedance Message Queue)是火山引擎自研、基于云原生的全托管、高吞吐、高可扩展的分布式消息引擎服务,广泛应用于日志收集、数据聚合、离线数据分析等业务场景。
本文为您介绍通过 BMQ Java SDK 收发消息。
BMQFullAcess
系统权限。具体操作,请参见IAM产品文档。ServiceRoleForBmq
服务关联角色。登录云原生消息引擎控制台。
在顶部菜单栏选择地域。
在左侧导航栏的项目下拉框中,查询并单击目标项目。
在左侧导航栏选择资源管理,然后单击创建资源池。
在创建资源池页面,设置资源池的基本信息、资源配置、网络信息、Topic 配置等关键参数,然后单击下一步:确认订单。
一级配置项 | 二级配置项 | 说明 |
---|---|---|
基本信息 | 计费类型 | 选择资源池计费的类型。 |
资源池名称 | 输入资源池名称。 | |
地域及可用区 | 地域已选定,不可更改。
| |
所属项目 | 资源池所属项目。 | |
标签 | 支持为资源池添加标签,可以更方便的识别和管理资源池。最多支持添加 20 个标签。
| |
资源池配置 | 计算规格 | 根据业务场景预估 Topic 数量、Consumer Group 数量、分区数量、读写流量峰值,然后选择适合的资源池规格。 |
存储规格 | 默认使用 CloudFS 加速存储,以 CloudFS 按量计费进行核算,无需额外配置。 | |
网络信息 | 私有网络 | 为保证内网顺利访问,建议选择已有云上业务的地域位置所在的 VPC。同一个 VPC 内,不同可用区子网之间是互通的。 |
子网 | 从下拉列表中选择子网。系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。 说明 如果是多可用区部署的资源池,需要为选择的所有可用区分别配置子网。 | |
安全组 | 从下拉列表中选择安全组。 | |
Topic 配置 | 消息保留时长 | 为该资源池下的所有 Topic 设置默认消息保留时长。
|
访问设置 | 公网访问 | 支持为实例开通公网访问。 |
在订单详情页面,确认资源池配置信息,然后阅读并勾选产品相关协议,再单击立即购买。
查看资源池创建进度。
提交购买订单后,您可以返回控制台资源池列表页面。购买的资源池显示为初始化中,初始化完成后显示为运行中。
在左侧导航栏的项目下拉框中,查询并单击目标项目。
在项目左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。
在资源池详情页面选择 Topic 页签,然后单击创建 Topic。
在创建 Topic 对话框,设置名称、分区数、消息保留时长等,然后单击确定。
参数 | 说明 |
---|---|
Topic 名称 | 输入 Topic 名称。 |
描述 | 填写 Topic 的描述语言。 |
分区数 | 输入分区数,默认为 12。 |
消息保留时长 | 数据在 Topic 中保存的时间。
|
在左侧导航栏的项目下拉框中,查询并单击目标项目。
在项目左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。
在资源池详情页面选择 Consumer Group 页签,然后单击创建 Group。
在创建 Group 对话框,设置名称和描述信息,然后单击确定。
参数 | 说明 |
---|---|
Group 名称 | 输入 Group 名称。 |
描述 | 输入描述信息。非必填。 |
本文以调用 Java SDK 为例,介绍如何通过 SDK 接入云原生消息引擎 BMQ 并收发消息。
安装 Java 环境。
安装依赖库。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>
发送消息。
编写并运行BmqProducerDemo.java发送消息。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //测试消息内容 String value = "this is test message value."; //发送消息条数 int count = 100; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); try { for (int i = 0; i < count; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++)) .get(5, TimeUnit.SECONDS); logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), i); } } catch (Throwable e) { logger.error("produce error", e); } producer.flush(); producer.close();
消费消息。
编写并运行BmqConsumerDemo.java消费消息。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //在控制台申请消费消息的consumerGroup String group = "this is your group."; //消费offset策略:earliest, latest, none String offsetReset = "earliest"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); List<String> topicList = Lists.newArrayList(topic); consumer.subscribe(topicList); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } catch (Exception e) { logger.error("consume error", e); } }