topic 相关
新建topic
bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_test --partitions 30 --replication-factor 2
- –zookeeper 指定zk地址
- –topic 后面”t_test”为定义的topic name
- –partition 指定分区数
- –relication-factor 副本数
查看所有topic
bin/kafka-topics.sh --zookeeper node01:2181 --list
查看指定topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --descirbe --topic topic_name
手动产生消息
需要交互输入
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Hello Kafka!
nihao shijie
手动消费消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
查看未被消费的消息数(topic 消息总数)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –zookeeper localhost:2181 –group logstash
副本相关
这里介绍下kafka 副本复制的过程,很巧妙。Kafka每个主题分区有N个副本,其中n是主题的复制因子。Kafka通过多副本机制实现自动故障转移,当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。一个replica为leader,其他都为follower,leader处理对这个partition的所有读写请求,与此同时,follower会被动地去复制leader上的数据。
复制过程参考()[]
增加topic副本数量
- 首先查看topic信息
1 | ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_byhands |
可以看到这个topic信息有3个partition 两个副本
- 手工构造一个json文件
1 | { |
- 然后执行
1 | ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file add_replicas.json --execute |
- 检查确认
1 |
|
副本添加完成。。
删除topic
在kafka老的版本里面好像是不支持直接删除topic的,但是0.9之后的都可以。。
集群扩容
当集群压力增大,或者磁盘不够用了之后,集群就需要扩容。一般是先加broker,然后更改之前partition数,然后rebanlance
- 正常安装broker
平时
参数优化
num.replica.fetchers 默认1 副本同步线程
以下两个参数调整messages 大小
message.max.bytes=10485760
replica.fetch.max.bytes=10485760