Kafka
Kafka
一、安装部署
1.1 群集规划
node01 | node02 | node03 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
JAR包下载: 从Kafka官方下载Jar包
1.2 群集部署
- 解压安装包
[hadoop@node01 opt]$ sudo wget https://downloads.apache.org/kafka/3.4.1/kafka_2.13-3.4.1.tgz
[hadoop@node01 opt]$ sudo tar -xzf kafka_2.13-3.4.1.tgz
- 创建logs文件夹
[hadoop@node01 kafka]$ mkdir /opt/kafka/logs
- 修改配置文件
node03-➜ /opt vim /opt/kafka/config/server.properties
# 每台broker的id需要唯一,不能重复
broker.id=0
# 设置 kafka topic 数据存放目录
log.dirs=/opt/kafka/data
# 设置zookeeper的节点
zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka
- 配置环境变量
[root@node01 kafka]# vim /etc/profile.d/my_env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
- 分发文件
[root@node01 kafka]# xsync /etc/profile.d/my_env.sh
[root@node01 opt]# xsync kafka
分别在node02,node03上修改 broker.id
启动脚本
#!/bin/bash
case $1 in
"start"){
for i in node01 node02 node03
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties"
done
};;
"stop"){
for i in node01 node02 node03
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/kafka/bin/kafka-server-stop.sh"
done
};;
esac
- 启动停止kafka群集
[root@node01 script]# kf.sh start
[root@node01 script]# kf.sh stop
- 在zk中查看kafka状态
node01-➜ ~ /opt/zookeeper/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 4] ls /kafka/brokers/ids
[0, 1, 2]
二、命令行操作
官方快速指南
2.1 创建topic
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first --partitions 1 --replication-factor 3
2.2 查看topic列表
kafka-topics.sh --list --bootstrap-server localhost:9092
2.3 创造消息
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
2.4 消费消息
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
2.5 删除数据
停掉服务,然后清除数据。
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
2.6 删除某个topic的数据
bin/kafka-topics.sh --delete --topic bin_log_coupon --zookeeper localhost:2181
三、KnowStreaming 监控
3.1 安装 MySQL 服务
3.2 安装ElasticSearch
3.3 KnowStreaming 实例搭建
# 下载安装包
[root@node01 opt]# wget https://s3-gzpu.didistatic.com/pub/knowstreaming/KnowStreaming-3.3.0.tar.gz
# 解压安装包到指定目录
[root@node01 opt]# tar -xzf KnowStreaming-3.3.0.tar.gz
# 修改启动脚本并加入systemd管理
cd KnowStreaming
# 创建相应的库和导入初始化数据
mysql -uroot -proot -e "create database know_streaming;"
mysql -uroot -proot know_streaming < ./init/sql/ddl-ks-km.sql
mysql -uroot -proot know_streaming < ./init/sql/ddl-logi-job.sql
mysql -uroot -proot know_streaming < ./init/sql/ddl-logi-security.sql
mysql -uroot -proot know_streaming < ./init/sql/dml-ks-km.sql
mysql -uroot -proot know_streaming < ./init/sql/dml-logi.sql
# 创建elasticsearch初始化数据
sh ./bin/init_es_template.sh
# 修改配置文件
vim ./conf/application.yml
# 监听端口
server:
port: 8080 # web 服务端口
tomcat:
accept-count: 1000
max-connections: 10000
# ES地址
es.client.address: 127.0.0.1:8060
# 数据库配置(一共三处地方,修改正确的mysql地址和数据库名称以及用户名密码)
jdbc-url: jdbc:mariadb://127.0.0.1:3306/know_streaming?.....
username: root
password: root
# 启动服务
cd /opt/KnowStreaming/bin/
sh startup.sh
3.4 使用KnowStreaming
3.4.1 登录系统
默认用户名密码admin:admin