0%

kafka服务端

kafka默认的消息大小限制为1MB,如果有需要可以修改大一点。

  • server.properties
# 100M
message.max.bytes=104857600
  • 修改单个topic配置
# 修改配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic
  --alter --add-config max.message.bytes=128000

# 检查配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name my-topic --describe

# 删除配置
bin/kafka-configs.sh --bootstrap-server localhost:9092  --entity-type topics --entity-name my-topic
  --alter --delete-config max.message.bytes

客户端

  • go: sarama
kafkaConfig := sarama.NewConfig()
kafkaConfig.Producer.MaxMessageBytes = 104857600 //最大消息大小限制: 100M

kafka-connect

kafka-connectkafka 自带的一个用于数据集成工具,包含两类 connector: source connectorsink connector,顾名思义一个是作为数据源往 kafka 写数据,一个是从 kafka 读数据。

# 数据流
jdbc/file/mqtt/http/...  ->  source connector  ->  kafka cluster  -> sink connector  ->  jdbc/file/mqtt/http/es/...

kafka-connect 有两个模式:单机 standalone 和分布式 distributed,本文以单机模式从一个文件经过 connector 同步到另一个文件为例进行测试。

测试

进入到 kafka 安装的根目录:

  1. 修改 standalone 配置:config/connect-standalone.properties
# kakfa 节点
bootstrap.servers=172.16.20.152:9092
# file connector 路径,官方自带的 jar 包
plugin.path=/root/kafka_2.13-3.7.1/libs/connect-file-3.7.1.jar
  1. Source file connector 配置:config/connect-file-source.properties
# connector 名称
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
# kafka 根目录下 test.txt 文件
file=test.txt
topic=connect-test
  1. Sink file connector 配置:config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
# 同步到目标文件名
file=test.sink.txt
topics=connect-test
  1. 启动
bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-source.properties config/connect-file-sink.properties
  1. 测试
# 往 test.txt 文件里追加写数据
echo "data1" >> test.txt
echo "data2" >> test.txt
echo "data3" >> test.txt

# 可以看到另一个文件内容也跟着改变
tail -f test.sink.txt

其他

部署

准备三台服务器,每台机器都需要安装 java 环境 、 zookeeper 、kafka。

  1. java 环境

    Your local environment must have Java 8+ installed.

yum install java-1.8.0-openjdk-devel -y 
java -version
  1. 下载 kafka
# https://kafka.apache.org/downloads, 推荐下载 Scala 2.13 版本
wget https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz

tar -xzf kafka_2.13-3.7.1.tgz
cd kafka_2.13-3.7.1
  1. 配置 config/zookeeper.properties
dataDir=/home/zookeeper/data
dataLogDir=/home/zookeeper/log
clientPort=2181
tickTime=2000
initLimit=10
syncLimit=5
# 3台服务器IP
server.1=172.16.20.152:2888:3888
server.2=172.16.20.153:2888:3888
server.3=172.16.20.154:2888:3888

dataDir 目录下创建 myid 文件,3 台机器分别写 1, 2, 3.

[root@data9 kafka_2.13-3.7.1]# cat /home/zookeeper/data/myid 
1
  1. 启动zookeeper
# 前台启动
bin/zookeeper-server-start.sh config/zookeeper.properties
# 后台启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  1. 配置 config/server.properties
# 3 台机器分别 0 1 2
broker.id=0
listeners=PLAINTEXT://:9092
# 填本机IP
advertised.listeners=PLAINTEXT://172.16.20.154:9092
# 修改日志目录
log.dirs=/home/kafka/kafka-logs
# 修改存储时间,默认7天
log.retention.hours=24
# 修改  zookeeper 地址
zookeeper.connect=172.16.20.152:2181,172.16.20.153:2181,172.16.20.154:2181
  1. 启动kafka
# 前台启动
bin/kafka-server-start.sh config/server.properties
# 后台启动
bin/kafka-server-start.sh -daemon config/server.properties
  1. 测试kafka
# 连任意节点创建一个主题:quickstart-events
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server 172.16.20.152:9092
# 查看主题状态
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 172.16.20.152:9092
# 消费主题消息
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server 172.16.20.152:9092

# 从任意节点生产消息
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server 172.16.20.154:9092
  1. 修改kafka内存使用上限
vim bin/kafka-server-start.sh
# 修改 -Xmx(最大堆内存)和 -Xms(初始堆内存)
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

# 重启
bin/kafka-server-stop.sh
bin/kafka-server-start.sh -daemon config/server.properties

概述

因为 postgres 16 只能通过源码的方式在 centos 上安装,所以 timescaledb 也只能通过源码的方式安装.

安装

  • 官方文档
    前提条件:
  • 已经安装 postgresql
  • CMake 版本大于 3.11 (Cmake安装参考前文)
  • gcc
# 下载源码
git clone https://github.com/timescale/timescaledb
cd timescaledb
git checkout 2.15.3

# 配置,如果不使用openssl,请添加参数: ./bootstrap -DUSE_OPENSSL=0
./bootstrap

cd build && make

make install

安装完成后配置

  • 如果是单机 postgrsql
sudo timescaledb-tune --pg-config=/usr/local/pgsql/bin/pg_config -conf-path=/home/pgsql/data/postgresql.conf

systemctl restart postgresql-16
  • 如果是patroni,直接修改postgresql.conf会被patroni重置。需要修改patroni.yml
# 将 timescaledb-tune 修改的配置文件通过 patroni 写到 postgres
postgresql:
  parameters:
    shared_preload_libraries: 'timescaledb'
    shared_buffers: 1955MB
    effective_cache_size: 5865MB
    maintenance_work_mem: 1001095kB
    work_mem: 5005kB
    timescaledb.max_background_workers: 16
    max_parallel_workers_per_gather: 2
    max_parallel_workers: 4
    wal_buffers: 16MB
    min_wal_size: 512MB
    max_wal_size: 1GB
    default_statistics_target: 100
    random_page_cost: 1.1
    checkpoint_completion_target: 0.9
    autovacuum_max_workers: 10
    autovacuum_naptime: 10
    effective_io_concurrency: 256

cmake安装

# 到官网 https://cmake.org/download/ 下载最新的包
wget https://github.com/Kitware/CMake/releases/download/v3.30.0/cmake-3.30.0-linux-x86_64.tar.gz

# 解压
tar -zxvf cmake-3.30.0-linux-x86_64.tar.gz

# 改个名字并移动到/usr/local目录下
mv cmake-3.30.0-linux-x86_64 cmake
mv cmake /usr/local/

# 添加bin到环境变量
export PATH=/usr/local/cmake/bin:$PATH

# 测试
cmake --version

安装

# Add the TimescaleDB package
echo "deb https://packagecloud.io/timescale/timescaledb/ubuntu/ $(lsb_release -c -s) main" | sudo tee /etc/apt/sources.list.d/timescaledb.list
# Install the TimescaleDB GPG key
wget --quiet -O - https://packagecloud.io/timescale/timescaledb/gpgkey | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/timescaledb.gpg
# Update your local repository list
sudo apt update
# Install TimescaleDB
sudo apt install timescaledb-2-postgresql-16 postgresql-client-16

转换

# Tune your PostgreSQL instance for TimescaleDB
sudo timescaledb-tune
# Restart PostgreSQL
sudo systemctl restart postgresql

添加拓展

重启pg后,登录到控制台:

-- 添加
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- \dx 命令检查安装的拓展
postgres=# \dx
                                                List of installed extensions
    Name     | Version |   Schema   |                                      Description                                      
-------------+---------+------------+---------------------------------------------------------------------------------------
 plpgsql     | 1.0     | pg_catalog | PL/pgSQL procedural language
 timescaledb | 2.15.3  | public     | Enables scalable inserts and complex queries for time-series data (Community Edition)
(2 rows)

修改的配置详细过程

postgres@deri:~$ sudo timescaledb-tune
Using postgresql.conf at this path:
/etc/postgresql/16/main/postgresql.conf

Is this correct? [(y)es/(n)o]: y
Writing backup to:
/tmp/timescaledb_tune.backup202407030214

shared_preload_libraries needs to be updated
Current:
#shared_preload_libraries = ''
Recommended:
shared_preload_libraries = 'timescaledb'
Is this okay? [(y)es/(n)o]: y
success: shared_preload_libraries will be updated

Tune memory/parallelism/WAL and other settings? [(y)es/(n)o]: y
Recommendations based on 7.76 GB of available memory and 4 CPUs for PostgreSQL 16

Memory settings recommendations
Current:
shared_buffers = 128MB
#effective_cache_size = 4GB
#maintenance_work_mem = 64MB
#work_mem = 4MB
Recommended:
shared_buffers = 1985MB
effective_cache_size = 5956MB
maintenance_work_mem = 1016498kB
work_mem = 5082kB
Is this okay? [(y)es/(s)kip/(q)uit]: y
success: memory settings will be updated

Parallelism settings recommendations
Current:
missing: timescaledb.max_background_workers
#max_worker_processes = 8
#max_parallel_workers_per_gather = 2
#max_parallel_workers = 8
Recommended:
timescaledb.max_background_workers = 16
max_worker_processes = 23
max_parallel_workers_per_gather = 2
max_parallel_workers = 4
Is this okay? [(y)es/(s)kip/(q)uit]: y
success: parallelism settings will be updated

WAL settings recommendations
Current:
#wal_buffers = -1
min_wal_size = 80MB
Recommended:
wal_buffers = 16MB
min_wal_size = 512MB
Is this okay? [(y)es/(s)kip/(q)uit]: y
success: WAL settings will be updated

Background writer settings recommendations
Current:
Recommended:
Is this okay? [(y)es/(s)kip/(q)uit]: y
success: background writer settings will be updated

Miscellaneous settings recommendations
Current:
#default_statistics_target = 100
#random_page_cost = 4.0
#checkpoint_completion_target = 0.9
#max_locks_per_transaction = 64
#autovacuum_max_workers = 3
#autovacuum_naptime = 1min
#effective_io_concurrency = 1
Recommended:
default_statistics_target = 100
random_page_cost = 1.1
checkpoint_completion_target = 0.9
max_locks_per_transaction = 128
autovacuum_max_workers = 10
autovacuum_naptime = 10
effective_io_concurrency = 256
Is this okay? [(y)es/(s)kip/(q)uit]: y
success: miscellaneous settings will be updated
Saving changes to: /etc/postgresql/16/main/postgresql.conf