You need to enable JavaScript to run this app.
导航

高阶使用

最近更新时间2023.10.17 23:07:57

首次发布时间2023.01.16 10:09:55

本文将为您介绍火山引擎 E-MapReduce(EMR)kafka 组件相关的高阶使用,方便您更深入的使用 Kafka。

扩容

您可以在 EMR 控制台的集群管理页面,进行 Kafka 集群的扩容操作。
开源 Kafka 扩容新的 broker 后,流量不会自动迁移到新 broker 上。通常有两种方式将流量迁移到新的 broker。

  • 扩分区:脚本直接扩容分区。比如之前有 12 个分区,扩容到 24 个分区。新分区会根据策略分配到新的 broker 上,是最简单的方式。缺点是老的分区还是在老的 broker 上,集群整体上流量是不均衡的。

  • Reassign:这种方式即迁移分区数据到新的 broker,步骤相对复杂。

1 扩分区

执行以下命令实现扩分区操作:

/usr/lib/emr/current/kafka/bin/kafka-topics.sh --alter --zookeeper {zookeeper_connect} --topic {topic} --partitions {num}

2 Reassign 进行数据均衡

针对已有分区数已经超过 24,且数据占比较大的情况,则考虑使用如下方式进行均衡。

脚本:kafka-reassign-partitions.sh,其主要的三个操作:

  • --generate:生成分区重分配计划

  • --execute:执行分区重分配计划

  • --verify:验证分区重分配结果

2.1 选择要处理的 topic

将要处理的 topic 信息按照如下格式保存到 JSON 文件。例如要处理的 topic 为 test,则将 topic 信息保存为 topic_test.json,内容如下:

{
    "topics":
    [
        {
            "topic": "test"
        }
    ],
    "version": 1
}

2.2 生成分区重分配计划

2.2.1 获取执行计划

可以参考下面的命令。

注意

参数 --broker-list 中的 broker 标识,是 broker.id 值。集群的 broker.id 都填写上,即生成建议的分区信息。

sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect}  --topics-to-move-json-file {topic_test.json} --broker-list "1001,1002,1003,1004,1005,1006" --generate

2.2.2 保存目标分区信息

执行完 generate 命令,会有两个 JSON 数据:

  • 第一个是 Current partition replica assignment,即当前的分配。

  • 另一个是建议的分配:Proposed partition reassignment configuration,即要保存的文件。

建议的分配保存为 JSON,例如:topic_test_reassignment.json

2.3 执行分区重分配计划

sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --reassignment-json-file  {topic_test_reassignment.json} --execute

2.4验证分区重分配结果

sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --reassignment-json-file  {topic_test_reassignment.json}  --verify

结果都为 successfully 即成功。如果数据量大,可能需要等待一段时间。

3 创建时绑定了公网 IP 的 Kafka 集群在扩容 Core 节点后需要做的配置操作

说明

本节内容适用于 EMR 3.4.X 及以前的软件栈版本。EMR 3.5.0 及以后的软件栈版本无需做这里的配置操作。

Kafka 集群在创建时如果给 Broker 所在的 Core 节点组绑定了公网 IP,会自动配置 Kafka Broker 的 advertised.listeners 等参数,使 Kafka Broker 可以通过公网 IP(端口号:19092)和内网地址(端口号:9092)访问。
在 Kafka 集群扩容 Core 节点后,如果仍需要通过公网地址访问 Kafka Broker,需要执行如下操作:

说明

在 Kafka 集群扩容 Core 节点后,如果不执行下述操作,仍可以通过内网地址访问 Kafka Broker。

  1. 给所有新扩容出来的 Core 节点绑定公网 IP。

  2. 集群列表 > Kafka 集群名称 > 服务列表 > Kafka服务名称 > 服务参数中,修改该集群的 Kafka 服务的配置参数:kafka_broker_hostname_eip_map_str。该参数的值为一个 JSON 字符串,包含 Kafka Broker 各个节点的 hostname 到公网 IP 的映射。在该参数的值中,增加新扩容出来的 Core 节点的 hostname 到公网 IP 的映射。

  3. 重启 Kafka 服务中心扩容出来的 Core 节点下的 Kafka Broker 组件。

完成上述操作执行成功后,就可以继续通过公网地址访问 Kafka Broker 了。