You need to enable JavaScript to run this app.
导航

使用SDK收发消息

最近更新时间2023.12.20 14:25:01

首次发布时间2022.07.26 15:08:01

云原生消息引擎 BMQ(Bytedance Message Queue)是火山引擎自研、基于云原生的全托管、高吞吐、高可扩展的分布式消息引擎服务,广泛应用于日志收集、数据聚合、离线数据分析等业务场景。
本文为您介绍如何通过 Java SDK 收发消息。

准备工作

  1. 跨服务授权。
    首次使用云原生消息引擎 BMQ 创建资源池,您需要进行跨服务授权,允许云原生消息引擎 BMQ 访问当前账户下的其他服务资源,例如 VPC、子网等。
    建议使用主账号完成跨服务授权。如果您的实际业务场景下确实需要 IAM 用户进行跨服务授权,请给 IAM 用户授予 BMQFullAcess 系统权限。具体操作,请参见IAM产品文档
    您在创建资源池时,如果弹出跨服务访问请求页面,请单击授权。 成功授权后,可以在 IAM 的角色列表中查看已创建的 ServiceRoleForBmq 服务关联角色。
  2. 主账号创建项目。
    在体验产品功能前,需要先由主账号创建项目,并添加相关项目成员。具体操作,请参见创建项目添加成员
  3. 创建私有网络和子网。
    资源池默认开通私有网络 VPC 访问,VPC 为资源池提供网络隔离的虚拟网络环境,只有在 VPC 中才能访问资源池。在创建资源池前,请确保目标地域和可用区下有可用的 VPC 和子网。具体操作,请参见创建私有网络创建子网

    注意

    • VPC 和资源池必须在同一地域中,跨地域无法访问。
    • 安全组需要配置 9092 端口,因为 BMQ 资源池的用户接入地址的端口是 9092。

步骤一:创建资源池

  1. 登录云原生消息引擎控制台

  2. 在顶部菜单栏选择地域。

  3. 项目管理页面,单击目标项目区块,进入项目。

  4. 在项目左侧导航栏选择资源管理,然后单击创建资源池

  5. 创建资源池页面,设置资源池的基本信息、资源配置、网络信息、Topic 配置等关键参数,然后单击下一步:确认订单
    图片

    一级配置项

    二级配置项

    说明

    基本信息

    资源类型

    默认为通用资源

    资源模式

    默认为 VCI 模式,即在通用资源-VCI 资源域上创建 BMQ 资源池。

    计费类型

    选择资源池计费的类型。目前仅支持按量计费
    如需了解计费详情,请参见计费说明

    资源池名称

    输入资源池名称。

    • 由小写字母、数字和短横线(-)构成。
    • 长度为 1~64 个字符。

    地域及可用区部署

    地域已选定,不可更改。
    根据业务的网络延迟、高可用容灾等需求,选择单可用区或者多可用区

    • 单可用区:选择一个合适的可用区即可。
    • 多可用区:默认选中多个可用区。

    所属项目

    资源池所属项目。

    标签

    支持为资源池添加标签,可以更方便的识别和管理资源池。最多支持添加 20 个标签。
    标签为键值对样式,设置时注意以下事项:

    • 只支持大小写字母、数字、中文和特殊字符.:/=+-_@,键值大小写敏感。
    • Key 不允许以任何大小写形式的volc:开头,比如volc:Volc:vOlc:volc:......(16 种组合)。
    • Key 长度为 1~128 字符;Value 长度为 0~256 字符。

    资源配置

    计算规格

    根据业务场景预估需要的 Topic 数量、Consumer Group 数量、分区数量等,选择适合的资源池规格。

    存储规格

    默认使用 CloudFS 加速存储,无需额外配置。

    网络信息

    私有网络

    为保证内网顺利访问,建议选择已有云上业务的地域位置所在的 VPC。同一个 VPC 内,不同可用区子网之间是互通的。
    如果还未创建私有网络,请参见创建私有网络

    子网

    从下拉列表中选择子网。系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。

    说明

    如果是多可用区部署的资源池,需要为选择的所有可用区分别配置子网。

    安全组

    从下拉列表中选择安全组。

    Topic 配置

    消息保留时长

    为该资源池下的所有 Topic 设置默认消息保留时长。

    • 默认为 72 小时,可按小时粒度调节。
    • 支持设置的留存范围为 1~336 小时(14天)。
  6. 订单详情页面,确认资源池配置信息,然后阅读并勾选产品相关协议,再单击立即购买

  7. 查看资源池创建进度。
    提交购买订单后,您可以返回资源池页面。购买的资源池显示为初始化中,初始化完成后显示为运行中
    图片

步骤二:创建 Topic

  1. 在项目左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。

  2. 在资源池详情的左侧导航栏选择 Topic,然后单击创建 Topic

  3. 创建 Topic 对话框,设置名称、分区数、消息保留时长等,然后单击确定
    图片

    参数

    说明

    Topic 名称

    输入 Topic 名称。
    只能由小写英文字符、数字、下划线(_)和短横线(-)组成,不超过3~64个字符。

    描述

    填写 Topic 的描述语言。

    分区数

    输入分区数。默认为 12。

    • 输入框下展示剩余可用分区数,随设置分区数不同而变化。
    • 设置分区数超过可用分区数时,超过可用分区数的部分无法增加。

    消息保留时长

    数据在 Topic 中保存的时间。

    • 默认与资源池设置的全局消息保留时长保持一致,但也可按小时粒度自主调节。
    • 支持设置的留存范围为 1~336 小时(14天)。

步骤三:创建 Consumer Group

  1. 在项目与左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。

  2. 在资源池详情的左侧导航栏选择 Consumer Group,然后单击创建 Group

  3. 创建 Group 对话框,设置名称和描述信息,然后单击确定
    图片

    参数

    说明

    Group 名称

    输入 Group 名称。
    由小写英文字母、数字、短横线(-)和下划线(_)组成,且长度为 3~64 个字符。

    描述

    输入描述信息。非必填。

步骤四:使用 SDK 收发消息

此处以 Java SDK 为例介绍如何通过 SDK 接入云原生消息引擎 BMQ 并收发消息。

  1. 安装 Java 环境。

    1. 安装 1.8 或以上版本 JDK。具体操作,请参见安装JDK
    2. 安装 3.5 或以上版本 Maven。具体操作,请参见安装Maven
  2. 安装依赖库。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.0</version>
    </dependency>
    
  3. 发送消息。
    编写并运行BmqProducerDemo.java发送消息。

    //在控制台查看对应接入点信息
    String server = "xxx.";
    //在控制台申请的消息所属Topic
    String topic = "this is your topic.";
    //测试消息内容
    String value = "this is test message value.";
    //发送消息条数
    int count = 100;
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    try {
        for (int i = 0; i < count; i++) {
            RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++))
                    .get(5, TimeUnit.SECONDS);
            logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.",
                    recordMetadata.topic(),
                    recordMetadata.partition(),
                    recordMetadata.offset(),
                    i);
        }
    } catch (Throwable e) {
        logger.error("produce error", e);
    }
    producer.flush();
    producer.close();
    
  4. 消费消息。
    编写并运行BmqConsumerDemo.java消费消息。

    //在控制台查看对应接入点信息
    String server = "xxx.";
    //在控制台申请的消息所属Topic
    String topic = "this is your topic.";
    //在控制台申请消费消息的consumerGroup
    String group = "this is your group.";
    //消费offset策略:earliest, latest, none
    String offsetReset = "earliest";
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    List<String> topicList = Lists.newArrayList(topic);
    consumer.subscribe(topicList);
    while (true) {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                        record.topic(),
                        record.partition(),
                        record.offset(),
                        record.key(),
                        record.value());
            }
        } catch (Exception e) {
            logger.error("consume error", e);
        }
    }