本文将为您介绍火山引擎 LAS Kafka 组件相关的高阶使用,方便您更深入的使用 Kafka。
您可以在 LAS 控制台的集群管理页面,进行 Kafka 集群的扩容操作。开源 Kafka 扩容新的 Broker 后,流量不会自动迁移到新 Broker 上。通常有两种方式将流量迁移到新的 broker。
执行以下命令实现扩分区操作:
/usr/lib/emr/current/kafka/bin/kafka-topics.sh --alter --zookeeper {zookeeper_connect} --topic {topic} --partitions {num}
针对已有分区数已经超过 24,且数据占比较大的情况,则考虑使用如下方式进行均衡。
脚本:kafka-reassign-partitions.sh,其主要的三个操作:
将要处理的 topic 信息按照如下格式保存到 JSON 文件。例如要处理的 topic 为 test,则将 topic 信息保存为 topic_test.json,内容如下:
{ "topics": [ { "topic": "test" } ], "version": 1 }
获取执行计划
可以参考下面的命令。
参数 --broker-list 中的 broker 标识,是 broker.id 值。集群的 broker.id 都填写上,即生成建议的分区信息。
sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --topics-to-move-json-file {topic_test.json} --broker-list "1001,1002,1003,1004,1005,1006" --generate
保存目标分区信息
执行完 generate 命令,会有两个 JSON 数据:
将建议的分配保存为 JSON,例如:topic_test_reassignment.json
sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --reassignment-json-file {topic_test_reassignment.json} --execute
sh bin/kafka-reassign-partitions.sh --zookeeper {zookeeper_connect} --reassignment-json-file {topic_test_reassignment.json} --verify
结果都为 successfully 即成功。如果数据量大,可能需要等待一段时间。