0%

kafka-connect初体验

kafka-connect

kafka-connectkafka 自带的一个用于数据集成工具,包含两类 connector: source connectorsink connector,顾名思义一个是作为数据源往 kafka 写数据,一个是从 kafka 读数据。

# 数据流
jdbc/file/mqtt/http/...  ->  source connector  ->  kafka cluster  -> sink connector  ->  jdbc/file/mqtt/http/es/...

kafka-connect 有两个模式:单机 standalone 和分布式 distributed,本文以单机模式从一个文件经过 connector 同步到另一个文件为例进行测试。

测试

进入到 kafka 安装的根目录:

  1. 修改 standalone 配置:config/connect-standalone.properties
# kakfa 节点
bootstrap.servers=172.16.20.152:9092
# file connector 路径,官方自带的 jar 包
plugin.path=/root/kafka_2.13-3.7.1/libs/connect-file-3.7.1.jar
  1. Source file connector 配置:config/connect-file-source.properties
# connector 名称
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
# kafka 根目录下 test.txt 文件
file=test.txt
topic=connect-test
  1. Sink file connector 配置:config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
# 同步到目标文件名
file=test.sink.txt
topics=connect-test
  1. 启动
bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-source.properties config/connect-file-sink.properties
  1. 测试
# 往 test.txt 文件里追加写数据
echo "data1" >> test.txt
echo "data2" >> test.txt
echo "data3" >> test.txt

# 可以看到另一个文件内容也跟着改变
tail -f test.sink.txt

其他