You need to enable JavaScript to run this app.
导航

创建 Group

最近更新时间2024.01.02 20:16:54

首次发布时间2023.03.30 15:42:05

消息队列 Kafka版支持通过控制台创建 Group,或通过消费 SDK 解析获取并展示 Group 的信息。关闭自由使用 Group 功能后,只能通过控制台创建 Group。本文档展示创建 Group 的方式及操作步骤。

背景信息

消费组(Consumer Group)是 Kafka 提供的可扩展且具有容错性的消费者机制。在消息队列 Kafka版中,Group 是消费者的虚拟分组,组内所有的消费者协调在一起,共同消费订阅主题中的所有分区。
消息队列 Kafka版通过自由使用 Group 功能控制 Kafka 实例支持的 Group 创建方式,该功能默认为开启状态。

  • 开启时,不仅可通过控制台创建 Group,还可以通过消费 SDK 解析获取并展示 Group 的信息。未在控制台创建的 Group 也可以正常调用开源 Kafka 的相关 API 进行消息消费、提交消费位点操作。
  • 关闭后,只能通过控制台创建 Group。

注意事项

  • 每个实例可创建的最大 Group 数量与分区额度有关,实例的 Group 额度是其分区额度的 2 倍。例如当前实例的计算规格为 kafka.n3.x2.tiny,如果除免费分区额度 250 以外,未购买更多分区,则当前可创建 Group 500个;如果为该实例购买额外 50 个分区,则可创建 Group 600 个。
    不同规格支持的最大分区数量不同,详细规格说明请参考产品规格
  • 自由使用 Group 功能默认为开启状态。关闭后,只能通过控制台创建 Group,不能通过消费 SDK 解析 Group 信息,但不影响已创建的 Group 消费状态,也可以创建新的数据同步任务或 Connector。

通过控制台创建 Group

  1. 登录消息队列 Kafka版控制台

  2. 在顶部菜单栏中选择地域,并在选择左侧导航栏中单击实例列表

  3. 找到目标实例,单击实例名称。

  4. 在顶部页签栏中单击Group管理。

  5. 在页面左上角单击新建消费组

  6. 填写 Group 基本信息。

    配置项

    说明

    Group ID

    Group ID 应符合以下要求:

    • 长度范围为 3~128 个字符。
    • 仅支持英文、数字、@、连字符(-)以及下划线(_)。
    • 创建 Group 后,不支持修改 Group ID,请谨慎设置。

    描述

    Group 的简单描述。创建 Group 后可以在 Group 管理页面中查看或修改描述。

  7. 单击确定

通过 SDK 设置 Group

开启自由使用 Group 功能后,您可以直接在消费 SDK 中指定一个符合命名要求的 Group ID 进行消费,消息队列 Kafka版会自动解析、获取并展示此 Group 的信息。

说明

通过 SDK 设置 Group 时,如果 Group ID 中指定了不符合规范的字符,则可能出现无法采集 Group 监控数据等问题。Group ID 命名规范请参考通过控制台创建 Group

1 开启自动创建 Group 功能

自动创建 Group 功能默认为开启状态,如果之前关闭了该功能,您可以参考以下步骤重新开启。

  1. 登录消息队列 Kafka版控制台
  2. 在顶部菜单栏中选择地域,并在选择左侧导航栏中单击实例列表
  3. 找到目标实例,单击实例名称。
  4. 在顶部页签栏中单击Group管理。
  5. 在页面左上角开启自动创建Group功能开关。
  6. 在弹出对话框中单击确定

2 消费时指定 Group

开启自动创建 Group 功能后,您可以在消费端调用相关的 API 创建消费者、设置消费组和订阅的 Topic 等信息,并启动消费。您可以参考以下示例代码在消费 SDK 中为消费者实例设置消费组,也可以参考消息队列 Kafka版提供的示例项目,编写相关业务逻辑。

// 设置消费者的启动参数
    private void setProps(Properties kafkaProperties) {
        .......
        //设置当前消费实例所属的消费组,属于同一个消费组的消费实例,会负载消费消息。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("consumer.group.id"));
        .......
    }

// 构造消费者对象,即生成一个消费实例。
    private void newConsumer() {
        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
    }

    // 订阅Topic
    private void subscribed(Properties kafkaProperties) {
        //设置消费组订阅的Topic。topicName为已创建的Topic名称。
        List<String> subscribedTopic =  new ArrayList<String>();
        subscribedTopic.add(kafkaProperties.getProperty("topicName"));
        consumer.subscribe(subscribedTopic);
    }