0%

kafka常用操作

topic 相关

新建topic

bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_test --partitions 30  --replication-factor 2
  • –zookeeper 指定zk地址
  • –topic 后面”t_test”为定义的topic name
  • –partition 指定分区数
  • –relication-factor 副本数

查看所有topic

bin/kafka-topics.sh --zookeeper node01:2181 --list

查看指定topic信息

bin/kafka-topics.sh --zookeeper localhost:2181 --descirbe --topic topic_name

手动产生消息

需要交互输入
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Hello Kafka!
nihao shijie

手动消费消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

查看未被消费的消息数(topic 消息总数)

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  –zookeeper localhost:2181  –group logstash

副本相关

这里介绍下kafka 副本复制的过程,很巧妙。Kafka每个主题分区有N个副本,其中n是主题的复制因子。Kafka通过多副本机制实现自动故障转移,当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。一个replica为leader,其他都为follower,leader处理对这个partition的所有读写请求,与此同时,follower会被动地去复制leader上的数据。

复制过程参考()[]

增加topic副本数量

  • 首先查看topic信息
1
2
3
4
5
6
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_byhands

Topic:test_byhands PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test_byhands Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: test_byhands Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: test_byhands Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3
可以看到这个topic信息有3个partition 两个副本
  • 手工构造一个json文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"version": 1,
"partitions": [
{
"topic": "test_byhands",
"partition": 0,
"replicas": [
1,
3,
2
]
},
{
"topic": "test_byhands",
"partition": 1,
"replicas": [
3,
2,
1
]
},
{
"topic": "test_byhands",
"partition": 2,
"replicas": [
1,
3,
2
]
}
]
}
  • 然后执行
1
2
3
4
5
6
7
8
./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file add_replicas.json --execute

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test_byhands","partition":1,"replicas":[3,2]},{"topic":"test_byhands","partition":0,"replicas":[1,3]},{"topic":"test_byhands","partition":2,"replicas":[1,3]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"test_byhands","partition":0,"replicas":[1,3,2]},{"topic":"test_byhands","partition":1,"replicas":[3,2,1]},{"topic":"test_byhands","partition":2,"replicas":[1,3,2]}]}
  • 检查确认
1
2
3
4
5
6
7

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test_byhands

Topic:test_byhands PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test_byhands Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test_byhands Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test_byhands Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2

副本添加完成。。

删除topic

在kafka老的版本里面好像是不支持直接删除topic的,但是0.9之后的都可以。。

集群扩容

当集群压力增大,或者磁盘不够用了之后,集群就需要扩容。一般是先加broker,然后更改之前partition数,然后rebanlance

  • 正常安装broker

平时

参数优化

num.replica.fetchers 默认1 副本同步线程

以下两个参数调整messages 大小
message.max.bytes=10485760
replica.fetch.max.bytes=10485760