kafka-connect
kafka-connect
是 kafka
自带的一个用于数据集成工具,包含两类 connector
: source connector
和 sink connector
,顾名思义一个是作为数据源往 kafka 写数据,一个是从 kafka 读数据。
# 数据流
jdbc/file/mqtt/http/... -> source connector -> kafka cluster -> sink connector -> jdbc/file/mqtt/http/es/...
kafka-connect
有两个模式:单机 standalone
和分布式 distributed
,本文以单机模式从一个文件经过 connector 同步到另一个文件为例进行测试。
测试
进入到 kafka 安装的根目录:
- 修改 standalone 配置:
config/connect-standalone.properties
# kakfa 节点
bootstrap.servers=172.16.20.152:9092
# file connector 路径,官方自带的 jar 包
plugin.path=/root/kafka_2.13-3.7.1/libs/connect-file-3.7.1.jar
- Source file connector 配置:
config/connect-file-source.properties
# connector 名称
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
# kafka 根目录下 test.txt 文件
file=test.txt
topic=connect-test
- Sink file connector 配置:
config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
# 同步到目标文件名
file=test.sink.txt
topics=connect-test
- 启动
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
- 测试
# 往 test.txt 文件里追加写数据
echo "data1" >> test.txt
echo "data2" >> test.txt
echo "data3" >> test.txt
# 可以看到另一个文件内容也跟着改变
tail -f test.sink.txt