Kafka部署

单机部署

tar -xzf kafka_2.13-3.4.0.tgz
cd kafka_2.13-3.4.0

Apache Kafka可以使用ZooKeeper或KRaft启动。要开始使用任一配置,请遵循以下部分之一,但不能同时遵循两者。
Kafka with ZooKeeper

vim /opt/kafka/config/zookeeper.properties
dataDir=/mnt/zookeeper
clientPort=2181
./bin/zookeeper-server-start.sh config/zookeeper.properties

vim /opt/kafka/config/server.properties
listeners=PLAINTEXT://:9092
zookeeper.connect=localhost:2181
./bin/kafka-server-start.sh config/server.properties

集群部署

# 下载服务包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.1/kafka_2.11-2.0.1.tgz

# 解压
tar xf kafka_2.11-2.0.1.tgz
# 查看所有配置
grep "^[a-Z]" server.properties 

# 编辑配置文件
vim server.properties 
broker.id=1  # broker的全局唯一编号,不能重复
port=9092  # 监听端口,producer或consumer将在此端口建立连接
num.network.threads=3 # 处理网络请求的线程数量
num.io.threads=8 # 用来处理磁盘IO的线程数量
delete.topic.enable=true # 由Kafka来完成Topic的相关删除,否则需要手动删除zk相关数据

socket.send.buffer.bytes=102400 # 发送套接字的缓存区大小
socket.receive.buffer.bytes=102400 # 接收套接字的缓冲区大小
socket.request.max.bytes=104857600 # 请求套接字的缓冲区大小
log.dirs=/Data/kafka-logs # kafka运行日志存放的路径,需要提前创建好

num.partitions=1 # topic在当前broker上的分片个数  
num.recovery.threads.per.data.dir=1 # 用来恢复和清理data下数据的线程数量

// 2.11-0.8.2.1 版本没有下面三个配置
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
//

log.retention.hours=168 # segment文件保留的最长时间,超时将被删除
log.segment.bytes=1073741824 # 日志中每个segment的大小,默认为1G
log.retention.check.interval.ms=300000 # 周期性检查文件大小的时间
log.cleaner.enable=true # 日志清理是否打开
zookeeper.connect=172.16.150.154:2181,172.16.150.155:2181,172.16.150.156:2181  # zokeeper集群地址,以","为分割!!!!!(使用域名,IP启动失败)
zookeeper.connection.timeout.ms=6000 # zookeeper链接超时时间
 
# partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000

# 消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000 
 
# 此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connect to localhost:9092 unsuccessful错误!
host.name=mini2
advertised.host.name=192.168.33.62

# 启动kafka
./bin/kafka-server-start.sh config/server.properties &

$ 停止kafka
./bin/kafka-server-stop.sh

启动脚本

#!/bin/bash

user=`whoami`

if [ $user = "root" ];then

case "$1" in

start)
kafka_name=`ls -F|grep '/$'`

for i in $kafka_name;
do 
 echo $i
 cd $i
 ./bin/kafka-server-start.sh config/server.properties &
 sleep 5
 cd ../
done
;;

stop)
kafka_name=`ls -F|grep '/$'`

for i in $kafka_name;
do
 echo $i
 cd $i
 ./bin/kafka-server-stop.sh
 sleep 3
 cd ../
done
;;

*)
echo -e "\033[33m后面需追加参数 [start|stop] \033[0m"
;;
esac

else
echo -e "\033[31m请切换启动用户: \033[0m"
fi