You need to enable JavaScript to run this app.
导航

体验 BMQ 收发消息

最近更新时间2024.02.28 19:27:40

首次发布时间2022.07.26 15:08:01

云原生消息引擎 BMQ(Bytedance Message Queue)是火山引擎自研、基于云原生的全托管、高吞吐、高可扩展的分布式消息引擎服务,广泛应用于日志收集、数据聚合、离线数据分析等业务场景。
本文为您介绍通过 BMQ Java SDK 收发消息。

准备工作

  1. 跨服务授权。
    首次使用云原生消息引擎 BMQ 创建资源池,您需要进行跨服务授权,允许云原生消息引擎 BMQ 访问当前账户下的其他服务资源,例如 VPC、子网等。
    建议使用主账号完成跨服务授权。如果您的实际业务场景下确实需要 IAM 用户进行跨服务授权,请给 IAM 用户授予 BMQFullAcess 系统权限。具体操作,请参见IAM产品文档
    您在创建资源池时,如果弹出跨服务访问请求页面,请单击授权。 成功授权后,可以在 IAM 的角色列表中查看已创建的 ServiceRoleForBmq 服务关联角色。
  2. 创建私有网络和子网。
    资源池默认开通私有网络 VPC 访问,VPC 为资源池提供网络隔离的虚拟网络环境,只有在 VPC 中才能访问资源池。在创建资源池前,请确保目标地域和可用区下有可用的 VPC 和子网。具体操作,请参见创建私有网络创建子网

步骤一:创建资源池

  1. 登录云原生消息引擎控制台

  2. 在顶部菜单栏选择地域。

  3. 在左侧导航栏的项目下拉框中,查询并单击目标项目。

  4. 在左侧导航栏选择资源管理,然后单击创建资源池

  5. 创建资源池页面,设置资源池的基本信息、资源配置、网络信息、Topic 配置等关键参数,然后单击下一步:确认订单
    图片

    一级配置项

    二级配置项

    说明

    基本信息

    计费类型

    选择资源池计费的类型。
    目前支持按量计费包年包月两种计费类型,如需了解计费详情,请参见计费说明
    如果选择包年包月计费类型,还需要选择购买时长,以及确认是否需要自动续费

    资源池名称

    输入资源池名称。
    由小写字母、数字和短横线(-)构成,长度为 1~64 个字符。

    地域及可用区

    地域已选定,不可更改。
    根据业务的网络延迟、高可用容灾等需求,选择单可用区或者多可用区

    • 单可用区:选择一个合适的可用区即可。
    • 多可用区:默认选中多个可用区,可按照实际需求选择可用区。

    所属项目

    资源池所属项目。
    云原生消息引擎 BMQ 控制台的项目与火山引擎的项目融合。您可以对不同业务或项目使用的云资源进行分组管理。基于项目进行 IAM 授权,有利于维护资源独立、数据安全;同时可从项目维度查看资源消费账单,便于计算云资源使用成本。

    标签

    支持为资源池添加标签,可以更方便的识别和管理资源池。最多支持添加 20 个标签。
    标签为键值对样式,设置时注意以下事项:

    • 只支持大小写字母、数字、中文和特殊字符.:/=+-_@,键值大小写敏感。
    • Key 不允许以任何大小写形式的volc:开头,比如volc:Volc:vOlc:volc:......(16 种组合)。
    • Key 长度为 1~128 字符;Value 长度为 0~256 字符。

    资源池配置

    计算规格

    根据业务场景预估 Topic 数量、Consumer Group 数量、分区数量、读写流量峰值,然后选择适合的资源池规格。
    如需了解资源池规格详情,请参见资源池规格

    存储规格

    默认使用 CloudFS 加速存储,以 CloudFS 按量计费进行核算,无需额外配置。

    网络信息

    私有网络

    为保证内网顺利访问,建议选择已有云上业务的地域位置所在的 VPC。同一个 VPC 内,不同可用区子网之间是互通的。
    如果还未创建私有网络,请参见创建私有网络

    子网

    从下拉列表中选择子网。系统会自动根据您选择的地域、可用区、私有网络筛选出可用的子网。

    说明

    如果是多可用区部署的资源池,需要为选择的所有可用区分别配置子网。

    安全组

    从下拉列表中选择安全组。

    Topic 配置

    消息保留时长

    为该资源池下的所有 Topic 设置默认消息保留时长。

    • 默认为 72 小时,可按小时粒度调节。
    • 支持设置的留存范围为 1~336 小时(14天)。

    访问设置

    公网访问

    支持为实例开通公网访问。
    开启公网访问需要与弹性公网 IP(EIP)绑定。请提前创建 EIP。操作步骤,请参见申请公网IP
    公网地址将绑弹性公网 IP,费用由 EIP 收取,详情请参见 EIP 计费指引

  6. 订单详情页面,确认资源池配置信息,然后阅读并勾选产品相关协议,再单击立即购买

  7. 查看资源池创建进度。
    提交购买订单后,您可以返回控制台资源池列表页面。购买的资源池显示为初始化中,初始化完成后显示为运行中
    图片

步骤二:创建 Topic

  1. 在左侧导航栏的项目下拉框中,查询并单击目标项目。

  2. 在项目左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。

  3. 在资源池详情页面选择 Topic 页签,然后单击创建 Topic
    图片

  4. 创建 Topic 对话框,设置名称、分区数、消息保留时长等,然后单击确定
    图片

    参数

    说明

    Topic 名称

    输入 Topic 名称。
    只能由小写英文字符、数字、下划线(_)和短横线(-)组成,不超过 3~128 个字符。

    描述

    填写 Topic 的描述语言。

    分区数

    输入分区数,默认为 12。
    输入框下展示剩余可用分区数,你所设置的分区数不可超过可用分区数。

    消息保留时长

    数据在 Topic 中保存的时间。

    • 默认与资源池设置的全局消息保留时长保持一致,但也可按小时粒度自主调节。
    • 支持设置的留存范围为 1~336 小时(14天)。

步骤三:创建 Consumer Group

  1. 在左侧导航栏的项目下拉框中,查询并单击目标项目。

  2. 在项目左侧导航栏选择资源管理,然后单击目标资源池名称,进入资源池详情页面。

  3. 在资源池详情页面选择 Consumer Group 页签,然后单击创建 Group
    图片

  4. 创建 Group 对话框,设置名称和描述信息,然后单击确定
    图片

    参数

    说明

    Group 名称

    输入 Group 名称。
    由小写英文字母、数字、短横线(-)和下划线(_)组成,且长度为 3~128 个字符。

    描述

    输入描述信息。非必填。

步骤四:使用 SDK 收发消息

本文以调用 Java SDK 为例,介绍如何通过 SDK 接入云原生消息引擎 BMQ 并收发消息。

  1. 安装 Java 环境。

    1. 安装 1.8 或以上版本 JDK。具体操作,请参见安装JDK
    2. 安装 3.5 或以上版本 Maven。具体操作,请参见安装Maven
  2. 安装依赖库。

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.0</version>
    </dependency>
    
  3. 发送消息。
    编写并运行BmqProducerDemo.java发送消息。

    //在控制台查看对应接入点信息
    String server = "xxx.";
    //在控制台申请的消息所属Topic
    String topic = "this is your topic.";
    //测试消息内容
    String value = "this is test message value.";
    //发送消息条数
    int count = 100;
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    try {
        for (int i = 0; i < count; i++) {
            RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++))
                    .get(5, TimeUnit.SECONDS);
            logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.",
                    recordMetadata.topic(),
                    recordMetadata.partition(),
                    recordMetadata.offset(),
                    i);
        }
    } catch (Throwable e) {
        logger.error("produce error", e);
    }
    producer.flush();
    producer.close();
    
  4. 消费消息。
    编写并运行BmqConsumerDemo.java消费消息。

    //在控制台查看对应接入点信息
    String server = "xxx.";
    //在控制台申请的消息所属Topic
    String topic = "this is your topic.";
    //在控制台申请消费消息的consumerGroup
    String group = "this is your group.";
    //消费offset策略:earliest, latest, none
    String offsetReset = "earliest";
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    List<String> topicList = Lists.newArrayList(topic);
    consumer.subscribe(topicList);
    while (true) {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                        record.topic(),
                        record.partition(),
                        record.offset(),
                        record.key(),
                        record.value());
            }
        } catch (Exception e) {
            logger.error("consume error", e);
        }
    }