前言
kafka作为高效的消息队列,其数据的维护也是十分重要的。有时候,我们可能存在对broker进行数据迁移,或者增加或者减少broker。对于我们已经创建了对topic。其配置不会随便进行更新。isr中依然存在着已经移除了对broker,这个时候,我们就需要更新它们的信息,告诉kafka数据分布的情况。并且在最少isr的问题到来之前,提前减少事故的事发。
1. 创建topic
1 | bash-4.4# kafka-topics.sh --create --topic test-topic \ |
2.查看test-topic
1 | bash-4.4# kafka-topics.sh --describe --zookeeper mzookeeper --topic test-topic |
3. 产生一批数据
1 | bash-4.4# kafka-console-producer.sh --topic test-topic --broker-list mkafka1:9092 |
4. kafka-reassign-partitions重分区(假设需要减少节点broker 1004,本测试通过关闭对应kafka broker节点模拟)
1 | bash-4.4# kafka-topics.sh --describe --zookeeper mzookeeper --topic test-topic |
这里,我们看到partition=2的Leader由1004变为了1001,并且isr中的1004已经不见了。
5. 新建文件topic-to-move.json ,比加入如下内容
1 | topic-to-move.json<<EOF |
6. 使用–generate生成迁移计划,broker-list根据自己环境设置,我的环境由于broker 10004挂掉了,只剩下1001和1002和1003
1 | bash-4.4# kafka-reassign-partitions.sh --zookeeper mzookeeper --topics-to-move-json-file topic-to-move.json --broker-list "1001,1002,1003" --generate |
我们把最下面分配好的建议的分区副本分布配置拿出来,写入一个新的文件中
(生产上一般会保留当前分区副本分布,仅更改下线的分区,这样数据移动更少)
1 | cat > kafka-reassign-execute.json <<EOF |
7. 使用–execute执行迁移计划 (有数据移动,broker 1004上的数据会移到broker 1001和1002和1003上,如果数据量大,执行的时间会比较久,耐心等待即可)
1 | bash-4.4# kafka-reassign-partitions.sh --zookeeper mzookeeper \ |
这里还是列出了原始的配置和最新的配置,并且有一句提示:如果需要回滚的话,请使用第一份配置,第二份配置已经成功开始重新分发。
8. 使用-verify查看迁移进度
1 | bash-4.4# kafka-reassign-partitions.sh --zookeeper mzookeeper \ |
成功处理完毕了。
9. 查看详情
1 | bash-4.4# kafka-topics.sh --describe --zookeeper mzookeeper --topic test-topic |
可以看到1004的broker_id已经被彻底移除了。
9. 通过消费者验证,可知,并未丢失数据。注意需要加–from-beginning。
1 | bash-4.4# kafka-console-consumer.sh --topic test-topic --from-beginning --zookeeper mzookeeper |