0%

canal介绍

canal是阿里开源的数据库同步框架,采用非侵入式方式,解析mysql的binary log,再发送到目的地,目的地可是mq,hbase,mysql,es等.

本章流程

  1. 开启mysql的bin-log日志
  2. 创建mysql用户获取bin-log日志
  3. canal采集bin-log日志
  4. canal-client获取mysql变化信息

开启bin-log日志

只需要在mysqld.cnf新增配置

server-id=1
log-bin=mysql-bin

创建mysql用户

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

配置canal

# 配置文件1:canal-server/conf/canal.properties
# 端口
canal.port = 11111

# 配置文件2:canal-server/conf/example/instance.properties
# 数据库连接信息
canal.instance.master.address=192.168.41.128:3307
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 监听的表(正则表达式)
canal.instance.filter.regex=.*\\..*
# 主题
canal.mq.topic=example

启动mysql/canal

# 本地测试采用docker方式启动
docker run -d --name mysql -p 3307:3306 -e MYSQL_ROOT_PASSWORD=123456 -v $PWD/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf  hub.c.163.com/library/mysql:5.7

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

编写canal-client

package com.deri.stream.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @ClassName: Main
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2021/6/11 9:41
 * @Version: v1.0
 **/
public class Main {
    public static void main(String[] args) throws InterruptedException {
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("192.168.41.128",11111), "example", "", "");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 没有变化,等一秒钟再去拉取数据
                   Thread.sleep(1000);
                } else {
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

参考连接

项目结构

  • GOROOT设置go安装目录;
  • GOPATH设置项目目录,目录默认包括srcbinpkgsrc下面是一个个创建的项目,pkg是项目引用的包,bin下面是可执行程序;
  • GO111MODULE=on
  • GOPROXY=https://goproxy.io,direct

命令

  • go build会在当前目录下生成可执行程序,window下是exe
  • go install会在%GOPATH%/bin目录下生成可执行程序

import使用

参考链接:Go import使用

  • import "fmt"最常用的一种形式
  • import "./test"导入同一目录下test包中的内容
  • import f "fmt"导入fmt,并给他启别名f
  • import . "fmt",将fmt启用别名".",这样就可以直接使用其内容,而不用再添加fmt,如fmt.Println可以直接写成Println
  • import _ "fmt" 表示不使用该包,而是只是使用该包的init函数,并不显示的使用该包的其他内容。注意:这种形式的import,当import时就执行了fmt包中的init函数,而不能够使用该包的其他函数。

命名规范

Go是一门区分大小写的语言.

参考链接:Go中的命名规范

任何需要对外暴露的名字必须以大写字母开头,不需要对外暴露的则应该以小写字母开头;

  • 当命名(包括常量、变量、类型、函数名、结构字段等等)以一个大写字母开头,如:Analysize,那么使用这种形式的标识符的对象就可以被外部包的代码所使用(客户端程序需要先导入这个包),这被称为导出(像面向对象语言中的 public);
  • 命名如果以小写字母开头,则对包外是不可见的,但是他们在整个包的内部是可见并且可用的(像面向对象语言中的 private )

init函数

  • init函数会自动执行,用于初始化变量,不可被其他函数调用;
  • init函数可以有多个,执行顺序没有明确定义,不同包的init执行顺序由其依赖关系决定;

指针

  • & :取地址
  • * :取值

空接口使用

// 空接口类型的变量可以保存任何类型的值
var emptyInterface interface{}
fmt.Printf("emptyInterface is of type %T\n", emptyInterface)
emptyInterface = 100
fmt.Printf("emptyInterface is of type %T\n", emptyInterface)
emptyInterface = "Golang"
fmt.Printf("emptyInterface is of type %T\n", emptyInterface)
// 创建一个map
make(map[string]interface{})
// 
make(map[string]map[string]interface{})

创建对象的几种方式

// 对象
type People struct {
    Name string
}
// 1: 结果为指针类型
p := new(People)
// 2: 结果为值类型
p := People{}
// 3: 结果为指针类型
p := &People{}
// 4: 创建并初始化
p := People{Name: "userName"}
p := People{"userName"}
p := &People{"userName"}
// 5: 构造函数
func NewPeople(name string)*People  {
    return &People{name}
}

GO111MODULE

  • Go ModulesGo 语言的一种依赖管理方式,使用go module管理依赖后会在项目根目录下生成两个文件go.modgo.sum.
  • 要使用go module首先要设置GO111MODULE=onGO111MODULE有三个值,offonautoauto则会根据当前目录下是否有 go.mod文件来判断是否使用modules功能。
  • go module 默认不在 GOPATH 目录下查找依赖文件,其首先会在GOPATH/pkg/mod中查找有没有所需要的依赖,没有的直接会进行下载。可以使用go mod download下载好所需要的依赖。
  • go build 会将项目的依赖添加到 go.mod

go mod vendor

// 1. 使用 go mod vendor把依赖下载到本地调试
go mod vendor
// 2. 使用本地依赖
go build -mod=vendor main.go

为对象创建方法

type People struct{
    name string
}

func (p People) GetName() string{
    return p.name
}

go项目打包成docker镜像

直接执行go build或者go build -o xxx打包成的二进制文件是在window中运行的,如果要打包成在Linux中运行的,go build之前需要增加环境变量:

  • set GOOS=linux
  • set GOARCH=amd64
# Dockerfile文件
FROM alpine
MAINTAINER wuzhiyong wuzhiyong@deri.energy
WORKDIR /app
COPY restful /app

EXPOSE 8806
CMD ["./restful"]

说明

dockerfly提供一个web页面,可以非常方便的管理主机上镜像容器网络等。

安装

# 采用docker部署即可
docker pull helyho/dockerfly:latest

运行

  • 28083:web端口
  • 2735:管理端口
docker run --name dockerfly -d -v /var/run/docker.sock:/var/run/docker.sock --restart always -p 2735:2735 -p 28083:28083 helyho/dockerfly

访问

编写docker-compose.yml

version: '3.1'

services:
  zoo1:
    image: zookeeper:3.5
    restart: always
    hostname: zoo1
    volumes:
      - /root/zookeeper/1/data:/data
      - /root/zookeeper/1/datalog:/datalog
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper:3.5
    restart: always
    hostname: zoo2
    volumes:
      - /root/zookeeper/2/data:/data
      - /root/zookeeper/2/datalog:/datalog
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper:3.5
    restart: always
    hostname: zoo3
    volumes:
      - /root/zookeeper/3/data:/data
      - /root/zookeeper/3/datalog:/datalog
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

启停

# 启动
docker-compose up -d

# 停止
docker-compose down

参考链接

说明

giteegithub都提供了丰富的restful API,用于开发人员操作仓库.

token获取

giteegithub私有仓库或者部分接口都需要用户权限认证,本文均使用token方式验证.

  • gitee token 获取
    选择【用户】 - 【设置】 - 【私人令牌】 - 【生成新令牌】. 使用私人令牌,可以通过Gitee Open API访问你授权的数据.

  • github token 获取
    选择【用户】 - 【Settings】 - 【Developer settings】 - 【Personal access tokens】 - 【Generate new token】. Tokens you have generated that can be used to access the GitHub API.

gitee 接口测试

  • 示例: 添加新的issue

接口在线测试地址

curl -X POST --header 'Content-Type: application/json;charset=UTF-8' 'https://gitee.com/api/v5/repos/happywzy/issues' -d '{"access_token":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx","repo":"test","title":"测试ISSUE","body":"测试内容","labels":"bug"}'

github 接口测试

接口格式:https://api.github.com/repos/{用户名}/{仓库}/xxx

  • 获取公开仓库的issue列表
# 添加Accept头,指定返回内容类型
curl \
  -H "Accept: application/vnd.github.v3+json" \
  https://api.github.com/repos/octocat/hello-world/issues
  • 获取私有仓库的issue列表

    github API传入token的几种方式,官方文档.

# header添加Authorization字段,注意字段的值是[token xxxxxx]格式
curl -v -H "Authorization: token xxxxxxxxxxxxxx" https://api.github.com/repos/xxx/admin/issues
# 请求URL后面增加access_token
https://api.github.com/repos/alfalfaw/vue-shop-admin?access_token=xxxxxxx

下载软件包

关闭防火墙

本次安装使用centos7.5操作系统.

# 关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
# 查看防火墙状态
systemctl status firewalld

# 关闭selinux
setenforce 0
vim /etc/selinux/config
# 查看selinux状态
/usr/sbin/sestatus -v

安装docker

  • 准备docker.servicedocker服务化配置文件
[Unit]
Description=Docker Application Container Engine
Documentation=https://docs.docker.com
After=network-online.target firewalld.service
Wants=network-online.target

[Service]
Type=notify
# the default is not to use systemd for cgroups because the delegate issues still
# exists and systemd currently does not support the cgroup feature set required
# for containers run by docker
ExecStart=/usr/bin/dockerd
ExecReload=/bin/kill -s HUP $MAINPID
# Having non-zero Limit*s causes performance problems due to accounting overhead
# in the kernel. We recommend using cgroups to do container-local accounting.
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
# Uncomment TasksMax if your systemd version supports it.
# Only systemd 226 and above support this version.
#TasksMax=infinity
TimeoutStartSec=0
# set delegate yes so that systemd does not reset the cgroups of docker containers
Delegate=yes
# kill only the docker process, not all processes in the cgroup
KillMode=process
# restart the docker process if it exits prematurely
Restart=on-failure
StartLimitBurst=3
StartLimitInterval=60s

[Install]
WantedBy=multi-user.target
  • 依次执行命令安装
# 解压tar包
tar -xvf docker-19.03.9.tgz
# 将docker目录移到/usr/bin目录下
cp docker/* /usr/bin/
# 将docker.service 移到/etc/systemd/system/ 目录
cp docker.service /etc/systemd/system/
# 添加文件权限
chmod +x /etc/systemd/system/docker.service
# 重新加载配置文件
systemctl daemon-reload
# 启动docker
systemctl start docker
# 设置开机自启
systemctl enable docker.service
# 验证docker安装是否成功
docker -v

安装docker-compose

# 添加文件权限
chmod +x /xxx/docker-compose
# 创建软链接
ln -s /xxx/docker-compose /usr/bin/docker-compose
# 验证安装是否成功
docker-compose --version

配置普通用户可以使用docker命令

# 创建用户组,默认安装docker可能已经创建了
sudo groupadd docker
# 示例
sudo usermod -aG docker testuser
# 重新服务再登录即可使用docker命令
sudo systemctl restart docker