0%

like

-- 没有用到索引
select * from t_user where user_name like '%test%';
-- 用到索引但是不通用
select * from t_user where user_name like 'test%';

locate

-- LOCATE('substr',str,pos)
-- pos不填:返回substr在str中第一次出现的位置,如果不存在,返回值为0
-- pos存在:返回pos之后第一次出现的位置
select * from t_user where locate('test',user_name)>0;

position

-- 同locate
select * from t_user where position('test' in user_name);

instr

-- 查找一个字符串在另一个字符串中首次出现的位置
select * from t_user where instr(user_name,'test')>0;

find_in_set

FIND_IN_SET(str,strList)

  • str 要查询的字符串
  • strList 字段名,参数以 , 分隔,如 (1,2,6,8)
-- 测试及结果
mysql> select find_in_set('1','1,2,3,4,5,6');
+--------------------------------+
| find_in_set('1','1,2,3,4,5,6') |
+--------------------------------+
|                              1 |
+--------------------------------+
1 row in set

mysql> select find_in_set('2','1,2,3,4,5,6');
+--------------------------------+
| find_in_set('2','1,2,3,4,5,6') |
+--------------------------------+
|                              2 |
+--------------------------------+
1 row in set

mysql> select find_in_set('','1,2,3,4,5,6');
+-------------------------------+
| find_in_set('','1,2,3,4,5,6') |
+-------------------------------+
|                             0 |
+-------------------------------+
1 row in set

mysql> select find_in_set('2','');
+---------------------+
| find_in_set('2','') |
+---------------------+
|                   0 |
+---------------------+
1 row in set

mysql> select find_in_set('7','1,2,3,4,5,6');
+--------------------------------+
| find_in_set('7','1,2,3,4,5,6') |
+--------------------------------+
|                              0 |
+--------------------------------+
1 row in set

mysql> select find_in_set(null,'1,2,3,4,5,6');
+---------------------------------+
| find_in_set(null,'1,2,3,4,5,6') |
+---------------------------------+
| NULL                            |
+---------------------------------+
1 row in set

mysql> select find_in_set('7',null);
+-----------------------+
| find_in_set('7',null) |
+-----------------------+
| NULL                  |
+-----------------------+
1 row in set

虚拟机意外关机后启动报错

以独占方式锁定此配置文件失败。可能其它正在运行VMware进程在使用此配置文件

解决办法

任务管理器 -> 启动 -> VMware Tray Process -> 禁用
重启机器
VMware Tray Process -> 启用 -> 重启打开虚拟机

简介

不收费的Hadoop版本主要有三个(均是国外厂商),分别是:

  • Apache(最原始的版本,所有发行版均基于这个版本进行改进)
  • Cloudera版本(Cloudera’s Distribution Including Apache Hadoop,简称CDH
  • Hortonworks版本(Hortonworks Data Platform,简称HDP

比较

版本 优点 缺点
Apache社区版 完全开源免费、社区活跃、文档、资料详实 1. 版本管理比较混乱,各种版本层出不穷,很难选择,选择其他生态组件时,比如Hive,Sqoop,Flume,Spark等,需要考虑兼容性问题、版本匹配问题、组件冲突问题、编译问题等。2. 集群安装部署复杂,需要编写大量配置文件,分发到每台节点,容易出错,效率低。3. 集群运维复杂,需要安装第三方软件辅助。
CDH 1. 版本管理清晰。2. 版本更新快。3. 集群安装部署简单。提供了部署、安装、配置工具,大大提高了集群部署的效率.运维简单。提供了管理、监控、诊断、配置修改的工具,管理配置方便,定位问题快速、准确,使运维工作简单,有效。 厂商固定

CDH安装

参考以下链接:

部署前要注意的要点

  • 每台机器上配置好java以及JAVA_HOME环境变量
  • 最好挑选一台机器,和其他机器ssh 打通
  • 每台机器上部署的Flink binary的目录要保证是同一个目录
  • 如果需要用hdfs,需要配置HADOOP_CONF_DIR环境变量配置上

配置

  • JobManager机器:master
  • TaskManager机器:node1,node2,node3

修改Flink binary目录的conf子目录中的mastersslaves两个文件:

$cat conf/masters
master:8081
$cat conf/slaves
node1
node2
node3

修改conf/flink-conf.yaml配置

jobmanager.rpc.address: master

然后把修改后的这三个文件同步到其他机器的相同conf目录下.

然后启动flink集群:

./bin/start-cluster.sh

提交WordCount作业

./bin/flink run examples/streaming/WordCount.jar

上传WordCountinput文件:

hdfs dfs -copyFromLocal story /test_dir/input_dir/story

提交读写hdfs的WordCount作业:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output
hdfs:///test_dir/output_dir/output

增加WordCount作业的并发度(注意输出文件重名会提交失败):

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output
hdfs:///test_dir/output_dir/output --parallelism 20

standalone 模式的HighAvailability(HA)部署和配置

JobManager是整个系统中最可能导致系统不可用的角色。一个TaskManager挂了,如果资源足够(空闲TaskSlot足够)的话,则只需要把相关task调度到其他空闲TaskSlot上,然后job从checkpoint中恢复即可。而如果当前集群中只配置了一个JobManager,则一旦JobManager挂了,就必须等待这个JobManager重新恢复,如果恢复时间过长,就可能导致整个job失败。
官方HA部署说明

所需环境

  • flink-1.10
  • zookeeper
  • hadoop

flink内部也提供了zookeeper插件,通过修改/conf/zoo.cfg文件即可,启动命令在/bin/start-zookeeper-quorum.sh

配置

  1. conf/flink-conf.yaml
# 高可用相关重要配置
# 配置high-availability mode
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster /可以不配,自动生成
high-availability.storageDir: hdfs:///flink/recovery
state.checkpoints.dir:  hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints
  1. conf/mastersslaves
master1:8081
master2:8081
slave001
slave002
slave003
  1. conf/zoo.cfg
server.1=slave001:2888:3888
server.2=slave002:2888:3888
server.3=slave003:2888:3888

修改完成后,再把这几个文件同步到不同机器到相同conf目录下。

  1. 启动zookeeper(如果已经有zookeeper集群可以不使用flink自带的)
$ bin/start-zookeeper-quorum.sh
  1. 启动HA-cluster
$ bin/start-cluster.sh

分别打开:
http://master1:8081
http://master2:8081

可以看到两个页面最后都转到了同一个地址上,这个地址就是当前主JobManager所在机器,另一个就是standby JobManager。
当我们知道主JobManager后,我们可以把主JobManager进程kill掉,比如当前主JobManager在matser1这个机器上,就把这个进程杀掉.

  1. 关闭zookeeper和flink
$ bin/stop-cluster.sh
$ bin/stop-zookeeper-quorum.sh

注意事项

  1. 配置环境变量
vi /etc/profile
# 添加如下内容:
export HADOOP_CONF_DIR=/home/hadoop-2.6.5/etc/hadoop
# 使环境变量生效
source /etc/profile
  1. 添加hadoop依赖
    flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
    下载jar包放到/flink-1.10.0/lib目录下.

参考链接-Flink 1.10.0基于高可用部署

什么情况下适合使用yarn模式跑flink job?
相对于standalone模式,yarn模式允许flink job的好处有:

  • 资源按需使用,提高集群的资源利用率
  • 任务有优先级,根据优先级运行作业
  • 基于YARN调度系统,能够自动化地处理各个角色的failover
    • JobManager进程和TaskManager进程都由Yarn NodeManager监控
    • 如果JobManager进程异常退出,则Yarn ResourceManager会重新调度JobManager到其他机器
    • 如果TaskManager进程异常退出,JobManager会收到消息并重新向Yarn ResourceManager申请资源,重新启动TaskManager

在YARN上启动long running的flink集群(yarn session)

  • Highly Available YARN Session模式
  • yarn是一个集群资源管理框架。它运行在集群之上运行各种分布式应用程序。flink像其他程序一样,也可以在yarn上运行。用户不需要设置或者安装任何东西,如果已经有一个安装配置好的yarn。
  • 一个session将会包含所有必须的flink 服务(jobmanager和taskmanager),这样你就可以向这个集群提交程序了。注意:每个session会话你可以运行多个程序。
  • YARN Session模式只会启动一个JobManager,如果启动失败会重启.
  • 启动一个session命令:./bin/yarn-session.sh

命令参数用法:

# 查看命令参数
./bin/yarn-session.sh -h
# 用法:
   必选
     -n,--container    分配多少个yarn容器 (=taskmanager的数量)
   可选
     -D                         动态属性
     -d,--detached                   独立运行
     -jm,--jobManagerMemory     JobManager的内存 [in MB]
     -nm,--name                     在YARN上为一个自定义的应用设置一个名字
     -q,--query                      显示yarn中可用的资源 (内存, cpu核数)
     -qu,--queue                指定YARN队列.
     -s,--slots                 每个TaskManager使用的slots数量
     -tm,--taskManagerMemory    每个TaskManager的内存 [in MB]
     -z,--zookeeperNamespace    针对HA模式在zookeeper上创建NameSpace

client必须要设置HADOOP_HOME,YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARNHDFS的配置信息,否则启动会失败。

创建一个YARN模式的flink集群:

./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m

提交一个flink job到flink集群:

./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

这次提交flink job,虽然没有指定对应yarn application的信息,确可以提交到对应的flink集群,原因在于/tmp/.yarn-properties-${user}文件中保存了上一次创建yarn session的集群信息。所以如果同一用户在同一机器上再次创建一个yarn session,则这个文件会被覆盖掉。

那如果删掉/tmp/.yarn-properties-${user}或者在另一个机器上提交作业能否提交到预期到yarn session中呢?这也是可以的,如果配置了HighAvailability,则可以根据cluster-id,从zookeeper上获取到JobManager的地址和端口,从而提交作业。

high-availability.cluster-id

如果Yarn session没有配置HA,又该如何提交呢?这个时候就必须要在提交flink job的命令中指明YARN上的application id,通过“-yid”参数传入:

/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --
input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

我们可以发现,每次跑完任务不久,TaskManager就没有了,下次在提交任务的时候,TaskManager又会重新拉起来。如果希望TaskManager启动后就持续运行,可以在conf/flink-conf.yaml文件中配置下面这个参数,单位是milliseconds,默认值是30000L,即30秒

resourcemanager.taskmanager-timeout

如果你只想运行单个flink job后就退出,那么可以用下面这个命令:

./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output

常用的配置有:

  • -yn,–yarncontainer Number of Task Managers
  • -yqu,–yarnqueue Specify YARN queue.
  • -ys,–yarnslots Number of slots per TaskManager
  • -yqu,–yarnqueue Specify YARN queue.
    可以通过help命令查看run的可用参数:
./bin/flink run -h

参考链接

准备k8s环境

准备相关YAML

  • flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  • jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: hub.deri.org.cn/library/flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
          while :;
          do
            if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*jobmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob
        - containerPort: 8081
          name: ui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
  • jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
  selector:
    app: flink
    component: jobmanager
  • jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob
    port: 6124
  - name: ui
    port: 8081
  selector:
    app: flink
    component: jobmanager
  • taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: hub.deri.org.cn/library/flink:latest
        workingDir: /opt/flink
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
          while :;
          do
            if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
              then tail -f -n +1 log/*taskmanager*.log;
            fi;
          done"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties

在k8s中部署

  • 安装
kubectl apply -f .
  • 卸载
kubectl delete -f .
[root@master k8s-flink]# kubectl get svc
NAME                    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
flink-jobmanager        ClusterIP   10.100.206.17           6123/TCP,6124/TCP,8081/TCP   3h47m
flink-jobmanager-rest   NodePort    10.109.136.52           8081:30696/TCP               3h47m
kubernetes              ClusterIP   10.96.0.1               443/TCP                      4d21h

其中flink-jobmanager-rest对应的30696端口就是UI的访问端口,也可以使用Ingress来访问,这里测试使用NodePort方式.

格式

首先我们可以知道kubeadm join格式如下:

kubeadm join 192.168.99.11:6443 --token 4qcl2f.gtl3h8e5kjltuo0r \
    --discovery-token-ca-cert-hash sha256:7ed5404175cc0bf18dbfe53f19d4a35b1e3d40c19b10924275868ebf2a3bbe6e

获取token

master节点执行:

[root@master ~]# kubeadm token list
TOKEN                     TTL       EXPIRES                     USAGES                   DESCRIPTION   EXTRA GROUPS
xh7hwq.a62kp9myjdde3nif   22h       2020-06-05T15:54:14+08:00   authentication,signing           system:bootstrappers:kubeadm:default-node-token

注意token有效期,使用kubeadm token create创建新的token,kubeadm token create --ttl 0生成一个永不过期的token.

获取cert

openssl x509 -pubkey -in /etc/kubernetes/pki/ca.crt | openssl rsa -pubin -outform der 2>/dev/null | openssl dgst -sha256 -hex | sed 's/^.* //'

最后就可以在新节点执行上面的命令加入集群了

一条命令解决

kubeadm token create --print-join-command