flink部署【使用k8s部署】


准备k8s环境

准备相关YAML

  • flink-configuration-configmap.yaml

    apiVersion: v1
    kind: ConfigMap
    metadata:
    name: flink-config
    labels:
      app: flink
    data:
    flink-conf.yaml: |+
      jobmanager.rpc.address: flink-jobmanager
      taskmanager.numberOfTaskSlots: 2
      blob.server.port: 6124
      jobmanager.rpc.port: 6123
      taskmanager.rpc.port: 6122
      jobmanager.heap.size: 1024m
      taskmanager.memory.process.size: 1024m
    log4j.properties: |+
      log4j.rootLogger=INFO, file
      log4j.logger.akka=INFO
      log4j.logger.org.apache.kafka=INFO
      log4j.logger.org.apache.hadoop=INFO
      log4j.logger.org.apache.zookeeper=INFO
      log4j.appender.console=org.apache.log4j.ConsoleAppender
      log4j.appender.console.layout=org.apache.log4j.PatternLayout
      log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
      log4j.appender.file=org.apache.log4j.FileAppender
      log4j.appender.file.file=${log.file}
      log4j.appender.file.layout=org.apache.log4j.PatternLayout
      log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
      log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  • jobmanager-deployment.yaml

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: flink-jobmanager
    spec:
    replicas: 1
    selector:
      matchLabels:
        app: flink
        component: jobmanager
    template:
      metadata:
        labels:
          app: flink
          component: jobmanager
      spec:
        containers:
        - name: jobmanager
          image: hub.deri.org.cn/library/flink:latest
          workingDir: /opt/flink
          command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
            while :;
            do
              if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
                then tail -f -n +1 log/*jobmanager*.log;
              fi;
            done"]
          ports:
          - containerPort: 6123
            name: rpc
          - containerPort: 6124
            name: blob
          - containerPort: 8081
            name: ui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
          - name: flink-config-volume
            mountPath: /opt/flink/conf
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
        volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
            - key: flink-conf.yaml
              path: flink-conf.yaml
            - key: log4j.properties
              path: log4j.properties
  • jobmanager-rest-service.yaml

    apiVersion: v1
    kind: Service
    metadata:
    name: flink-jobmanager-rest
    spec:
    type: NodePort
    ports:
    - name: rest
      port: 8081
      targetPort: 8081
    selector:
      app: flink
      component: jobmanager
  • jobmanager-service.yaml

    apiVersion: v1
    kind: Service
    metadata:
    name: flink-jobmanager
    spec:
    type: ClusterIP
    ports:
    - name: rpc
      port: 6123
    - name: blob
      port: 6124
    - name: ui
      port: 8081
    selector:
      app: flink
      component: jobmanager
  • taskmanager-deployment.yaml

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: flink-taskmanager
    spec:
    replicas: 2
    selector:
      matchLabels:
        app: flink
        component: taskmanager
    template:
      metadata:
        labels:
          app: flink
          component: taskmanager
      spec:
        containers:
        - name: taskmanager
          image: hub.deri.org.cn/library/flink:latest
          workingDir: /opt/flink
          command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
            while :;
            do
              if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
                then tail -f -n +1 log/*taskmanager*.log;
              fi;
            done"]
          ports:
          - containerPort: 6122
            name: rpc
          livenessProbe:
            tcpSocket:
              port: 6122
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
          - name: flink-config-volume
            mountPath: /opt/flink/conf/
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
        volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
            - key: flink-conf.yaml
              path: flink-conf.yaml
            - key: log4j.properties
              path: log4j.properties

在k8s中部署

  • 安装

    kubectl apply -f .
  • 卸载

    kubectl delete -f .
[root@master k8s-flink]# kubectl get svc
NAME                    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
flink-jobmanager        ClusterIP   10.100.206.17           6123/TCP,6124/TCP,8081/TCP   3h47m
flink-jobmanager-rest   NodePort    10.109.136.52           8081:30696/TCP               3h47m
kubernetes              ClusterIP   10.96.0.1               443/TCP                      4d21h

其中flink-jobmanager-rest对应的30696端口就是UI的访问端口,也可以使用Ingress来访问,这里测试使用NodePort方式.


文章作者: wuzhiyong
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wuzhiyong !
评论
 上一篇
flink部署【高可用】 flink部署【高可用】
多机部署Flink standalone 集群部署前要注意的要点 每台机器上配置好java以及JAVA_HOME环境变量 最好挑选一台机器,和其他机器ssh 打通 每台机器上部署的Flink binary的目录要保证是同一个目录 如果需要用
2020-06-09
下一篇 
k8s【如果忘记master节点init后join命令怎么办】 k8s【如果忘记master节点init后join命令怎么办】
格式首先我们可以知道kubeadm join格式如下: kubeadm join 192.168.99.11:6443 --token 4qcl2f.gtl3h8e5kjltuo0r \ --discovery-token-ca-c
2020-06-04
  目录