本文记录本人使用kafka过程中常用的运维指令。
一 准备客户端环境
安装好jdk和kafka
1. 配置producer/consumer.properties
修改config/producer.properties和config/consumer.properties,添加以下内容
| 12
 3
 4
 5
 
 | security.protocol=SASL_PLAINTEXTsasl.mechanism=PLAIN
 sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='1qaz@WSX';"
 
 
 
 | 
2. 配置静态jaas文件(可选)
如果使用的kafka版本太旧,或者properties文件的sasl.jaas.config无法生效的时候,可配置静态jaas文件
添加config/kafka_jaas.conf文件,然后配置环境变量使之启用
| 12
 
 | export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_jaas.conf"
 
 | 
账号密码认证方式,文件格式如下
| 12
 3
 4
 5
 6
 
 | KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="123456";
 };
 
 
 | 
如果是keytab的话,格式应该是这样
| 12
 3
 4
 5
 6
 7
 8
 
 | KafkaClient {com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 storeKey=true
 serviceName="kafka"
 keyTab="/mnt/XXX.keytab"
 principal="XXX/XXX@XXX.COM";
 };
 
 | 
二 常用命令
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | # 查看主题列表:注意使用zk连接,也可自行替换成--bootstrap-serverbin/kafka-topics.sh --zookeeper $HOSTNAME:$ZK_CLIENT_PORT/kafka --list
 # 创建主题:注意使用zk连接,也可自行替换成--bootstrap-server
 bin/kafka-topics.sh --zookeeper $HOSTNAME:$ZK_CLIENT_PORT/kafka --create  --topic $TOPIC_NAME --partitions 1 --replication-factor 1
 # 生产消息
 bin/kafka-console-producer.sh --bootstrap-server $KAFKA_HOSTNAME:$KAFKA_PORT --topic $TOPIC_NAME --producer.config config/producer.properties
 # 消费消息:从头开始接收数据并打印时间戳
 bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_HOSTNAME:$KAFKA_PORT --from-beginning  --topic $TOPIC_NAME --consumer.config config/consumer.properties --property print.timestamp=true
 # 接收1条最新的数据
 bin/kafka-console-consumer.sh --bootstrap-server $HOSTNAME:$KAFKA_PORT --max-messages 1 --topic $TOPIC_NAME --consumer.config config/consumer.properties
 # 查看消费组信息列表
 bin/kafka-consumer-groups.sh --bootstrap-server  $KAFKA_HOSTNAME:$KAFKA_PORT --list  --command-config config/consumer.properties
 # 查看指定消费组的消费/堆积情况
 bin/kafka-consumer-groups.sh --bootstrap-server  $KAFKA_HOSTNAME:$KAFKA_PORT --describe --group $groupname  --command-config config/consumer.properties
 
 
 | 
三 压测命令
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 
 | # 压测 预设吞吐量为5w/s,总共发送100w条数据,每条数据10字节(自动生成)
 bin/kafka-producer-perf-test.sh --topic $TOPIC_NAME \
 --num-record 1000000  \
 --throughput 50000 \
 --record-size=10 \
 --producer-props bootstrap.servers=$KAFKA_HOSTNAME:$KAFKA_PORT \
 --producer.config config/producer.properties
 
 # 压测 预设吞吐量为1w/s,总共发送150w条数据,数据来自data.txt(按行切割)
 bin/kafka-producer-perf-test.sh --topic $TOPIC_NAME \
 --num-record 1500000  \
 --throughput 10000 \
 --payload-file=data.txt \
 --producer-props bootstrap.servers=$KAFKA_HOSTNAME:$KAFKA_PORT \
 --producer.config config/producer.properties
 
 # 压测数据可使用如下方法获取
 # 从指定主题中获取1000条数据写入到 data.txt文件中
 bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_HOSTNAME:$KAFKA_PORT \
 --topic $TOPIC_NAME \
 --max-messages=1000 \
 --consumer.config config/consumer.properties >> data.txt
 
 
 
 |