0%

准备k8s环境

准备相关YAML

  • flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  • jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: hub.deri.org.cn/library/flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
  • jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
  selector:
    app: flink
    component: jobmanager
  • jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager
  • taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: hub.deri.org.cn/library/flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

在k8s中部署

  • 安装
kubectl apply -f .
  • 卸载
kubectl delete -f .
[root@master k8s-flink]# kubectl get svc
NAME                    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
flink-jobmanager        ClusterIP   10.100.206.17           6123/TCP,6124/TCP,8081/TCP   3h47m
flink-jobmanager-rest   NodePort    10.109.136.52           8081:30696/TCP               3h47m
kubernetes              ClusterIP   10.96.0.1               443/TCP                      4d21h

其中flink-jobmanager-rest对应的30696端口就是UI的访问端口,也可以使用Ingress来访问,这里测试使用NodePort方式.

格式

首先我们可以知道kubeadm join格式如下:

kubeadm join 192.168.99.11:6443 --token 4qcl2f.gtl3h8e5kjltuo0r \
    --discovery-token-ca-cert-hash sha256:7ed5404175cc0bf18dbfe53f19d4a35b1e3d40c19b10924275868ebf2a3bbe6e

获取token

master节点执行:

[root@master ~]# kubeadm token list
TOKEN                     TTL       EXPIRES                     USAGES                   DESCRIPTION   EXTRA GROUPS
xh7hwq.a62kp9myjdde3nif   22h       2020-06-05T15:54:14+08:00   authentication,signing           system:bootstrappers:kubeadm:default-node-token

注意token有效期,使用kubeadm token create创建新的token,kubeadm token create --ttl 0生成一个永不过期的token.

获取cert

openssl x509 -pubkey -in /etc/kubernetes/pki/ca.crt | openssl rsa -pubin -outform der 2>/dev/null | openssl dgst -sha256 -hex | sed 's/^.* //'

最后就可以在新节点执行上面的命令加入集群了

一条命令解决

kubeadm token create --print-join-command

PostStart

容器生命周期钩子(Container Lifecycle Hooks)监听容器生命周期的特定事件,并在事件发生时

spec:
  containers:
  - name: lifecycle-demo-container
    image: nginx
    lifecycle:
      postStart:
        exec:
          command: ["/bin/sh", "-c", "echo Hello from the postStart handler > /usr/share/message"]
      preStop:
        exec:
          command: ["/usr/sbin/nginx","-s","quit"]

执行已注册的回调函数。支持两种钩子:

  • postStart: 容器启动后执行,注意由于是异步执行,它无法保证一定在ENTRYPOINT之后运行。如果失败,容器会被杀死,并根据RestartPolicy决定是否重启
  • preStop:容器停止前执行,常用于资源清理。如果失败,容器同样也会被杀死

而钩子的回调函数支持两种方式:

  • exec:在容器内执行命令
  • httpGet:向指定URL发起GET请求

关于postStart异步执行测试

apiVersion: v1
kind: Pod
metadata:
  name: test-post-start
spec:
  containers:
  - name: test-post-start-container
    image: busybox
    command: ["/bin/sh", "-c", "sleep 5 && echo $(date) 'written by entrypoint' >> log.log && sleep 600"]
    lifecycle:
      postStart:
        exec:
          command: ["/bin/sh", "-c", "sleep 10 && echo $(date) 'written by post start' >> log.log"]

创建上面的pod,通过进入pod,查看log.log打印日志,证明:

  • PostStart是否会挡住主进程的启动
  • PostStart是否是异步执行

如果 PostStart 会阻挡 ENTRYPOINT 的启动,则日志文件内容应该是:

(时间点 T)written by post start
(时间点 T + 约 10 秒)written by entrypoint

否则内容应该是:

(时间点 T)written by entrypoint
(时间点 T + 约 5 秒)written by post start
```log

实验结果:
```log
/ # cat log.log
Thu Jun 4 06:14:50 UTC 2020 written by entrypoint
Thu Jun 4 06:14:55 UTC 2020 written by post start

修改YML

apiVersion: v1
kind: Pod
metadata:
  name: test-post-start
spec:
  containers:
  - name: test-post-start-container
    image: busybox
    command: ["/bin/sh", "-c", "sleep 15 && echo $(date) 'written by entrypoint' >> log.log && sleep 600"]
    lifecycle:
      postStart:
        exec:
          command: ["/bin/sh", "-c", "sleep 10 && echo $(date) 'written by post start' >> log.log"]

如果 PostStart 不是异步执行,则日志文件内容应该是:

(时间点 T)written by entrypoint
(时间点 T + 约 5 秒)written by post start
```log

否则内容应该是:
```log
(时间点 T)written by post start
(时间点 T + 约 5 秒)written by entrypoint

实验结果:

[root@master k8s]# kubectl exec -it test-post-start sh
/ # cat log.log
Thu Jun 4 06:17:54 UTC 2020 written by post start
Thu Jun 4 06:17:59 UTC 2020 written by entrypoint
/ # 

实验结论

  • PostStart不会挡住主进程的启动
  • PostStart是异步执行

正反代理

正向代理代理的是客户端,如国内访问国外网站,有防火墙限制,我们可以通过VPN去访问;

反向代理代理的是服务器,如机房内多个后端服务,通过Nginx对外提供服务;

Nginx

  • 默认配置文件nginx.conf

nginx单点问题

可以结合keepalived解决单点问题

实现ElementHandler接口

public class MyHandler implements ElementHandler {

    // xml中每一个<节点>开始执行的方法
    @Override
    public void onStart(ElementPath elementPath) {
        // todo 解析xml要做的业务处理
    }

    // xml中每一个<节点>结束执行的方法
    @Override
    public void onEnd(ElementPath elementPath) {
        // todo 解析xml要做的业务处理
        elementPath.getCurrent().detach();
    }
}

ElementPath获取一些信息

// 获取当前解析的<节点>是第几级,根节点为第1级
public static int getLevel(ElementPath path) {
    return path.getPath().split("/").length - 1;
}
// 获取当前节点的节点名
public static String getElementName(Element element) {
    return element.getName();
}

// 获取当前节点属性
public static String getAttribute(Element element) {
    return element.attribute("attribute-name");
}

// 获取当前节点文本
public static String getText(Element element) {
    return element.getText();
}

解析示例

<!-- 要解析的xml格式 -->
<root>
    <book id="1">
        <name>菜鸟编程</name>
        <describe>针对菜鸟编程学习</describe>
    </book>
    <book id="2">
        <name>老鸟编程</name>
        <describe>针对老鸟编程学习</describe>
    </book>
</root>
// book映射对象
@Data
public class Book {
    private int id;
    private String name;
    private String describe;
}
// 解析book.xml伪代码
public class BookHandler implements ElementHandler {
    // 创建一个Book对象
    private Book book;
    // xml中每一个<节点>开始执行的方法
    @Override
    public void onStart(ElementPath elementPath) {
        // 获取节点级数,<root>为1,<book>为2,<name><describe>为3
        level = getLevel(elementPath)
        switch(level):
        case 1: // 不做处理
        case 2: // 每次读取到<book id="xx">这一行会执行
                // 创建book对象,将读取的id属性写入
                book = new Book();
                book.setId();
        case 3: // 每次读取到<book>的子节点会执行
                // 无需处理
    }

    // xml中每一个<节点>结束执行的方法
    @Override
    public void onEnd(ElementPath elementPath) {
        // 获取节点级数,<root>为1,<book>为2,<name><describe>为3
        level = getLevel(elementPath)
        switch(level):
        case 1: // 不做处理
        case 2: // 每次读取到</book>这一行会执行
                // 说明这一本书所有信息已经读完,需要将book对象保存,或写入数据库,然后清空
                saveDB(book); // 保存数据方法
                book = null;
        case 3: // 每次读完一个<book>的子节点会执行
                // 可以读取book的name、describe属性,并写入到book对象中
                // 注意 一次只能获取到一个属性,要做一些处理
                String name = getAttribute("name");
                String describe = getAttribute("describe");
                book.setName(name);
                book.setDescribe(describe);
        elementPath.getCurrent().detach();
    }
}
// 读取xml,使用BookHandler解析
BookHandler bookHandler = new BookHandler();
SAXReader reader = new SAXReader();
reader.setDefaultHandler(bookHandler);
reader.read(new File(xmlPath));

dom4j

DOM4J是一个开源XML解析包。DOM4J应用于Java平台,采用了Java集合框架并完全支持DOMSAXxpathJAXP

dom4j接口说明

接口 说明
Attribute 属性
Branch 分支,指能够包含子节点的节点。如Element,(Docuemnts)
CDATA XML CDATA区域
CharacterData 是一个标识接口,标识基于字符的节点。如CDATAComment, Text
Comment 注释
Document 文档
DocumentType XML DOCTYPE 声明
Element 定义XML元素
ElementHandler Element对象的处理器
ElementPath ElementHandler使用,用于取得当前正在处理的路径层次信息
Entity 定义XML entity
Node 节点
NodeFilter 节点过滤器
ProcessingInstruction 定义XML处理指令
Text 文本节点
Visitor 用于实现Visitor模式
XPath XPath表达式

dom4j依赖

<!-- https://mvnrepository.com/artifact/dom4j/dom4j -->
<dependency>
    <groupId>dom4j</groupId>
    <artifactId>dom4j</artifactId>
    <version>1.6.1</version>
</dependency>
<!--dom4j使用xpath依赖的包-->
<dependency>
    <groupId>jaxen</groupId>
    <artifactId>jaxen</artifactId>
    <version>1.1.6</version>
</dependency>

文档操作

SAXReader saxReader = new SAXReader();
Document document = saxReader.read(new File(filename));  //读取XML文件,获得document对象
// Document document = saxReader.read(url);