虚拟机意外关机后启动报错
以独占方式锁定此配置文件失败。可能其它正在运行VMware进程在使用此配置文件
解决办法
任务管理器 -> 启动 -> VMware Tray Process -> 禁用
重启机器
VMware Tray Process -> 启用 -> 重启打开虚拟机
不收费的Hadoop版本主要有三个(均是国外厂商),分别是:
Cloudera’s Distribution Including Apache Hadoop
,简称CDH
)Hortonworks Data Platform
,简称HDP
)版本 | 优点 | 缺点 |
---|---|---|
Apache社区版 | 完全开源免费、社区活跃、文档、资料详实 | 1. 版本管理比较混乱,各种版本层出不穷,很难选择,选择其他生态组件时,比如Hive,Sqoop,Flume,Spark等,需要考虑兼容性问题、版本匹配问题、组件冲突问题、编译问题等。2. 集群安装部署复杂,需要编写大量配置文件,分发到每台节点,容易出错,效率低。3. 集群运维复杂,需要安装第三方软件辅助。 |
CDH | 1. 版本管理清晰。2. 版本更新快。3. 集群安装部署简单。提供了部署、安装、配置工具,大大提高了集群部署的效率.运维简单。提供了管理、监控、诊断、配置修改的工具,管理配置方便,定位问题快速、准确,使运维工作简单,有效。 | 厂商固定 |
参考以下链接:
JAVA_HOME
环境变量HADOOP_CONF_DIR
环境变量配置上修改Flink binary
目录的conf
子目录中的masters
和slaves
两个文件:
$cat conf/masters
master:8081
$cat conf/slaves
node1
node2
node3
修改conf/flink-conf.yaml
配置
jobmanager.rpc.address: master
然后把修改后的这三个文件同步到其他机器的相同conf
目录下.
然后启动flink集群:
./bin/start-cluster.sh
提交WordCount
作业
./bin/flink run examples/streaming/WordCount.jar
上传WordCount
的input
文件:
hdfs dfs -copyFromLocal story /test_dir/input_dir/story
提交读写hdfs的WordCount作业:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output
hdfs:///test_dir/output_dir/output
增加WordCount作业的并发度(注意输出文件重名会提交失败):
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output
hdfs:///test_dir/output_dir/output --parallelism 20
JobManager是整个系统中最可能导致系统不可用的角色。一个TaskManager挂了,如果资源足够(空闲TaskSlot足够)的话,则只需要把相关task调度到其他空闲TaskSlot上,然后job从checkpoint中恢复即可。而如果当前集群中只配置了一个JobManager,则一旦JobManager挂了,就必须等待这个JobManager重新恢复,如果恢复时间过长,就可能导致整个job失败。
官方HA部署说明
flink-1.10
zookeeper
hadoop
flink
内部也提供了zookeeper
插件,通过修改/conf/zoo.cfg
文件即可,启动命令在/bin/start-zookeeper-quorum.sh
conf/flink-conf.yaml
# 高可用相关重要配置
# 配置high-availability mode
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster /可以不配,自动生成
high-availability.storageDir: hdfs:///flink/recovery
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints
conf/masters
和slaves
master1:8081
master2:8081
slave001
slave002
slave003
conf/zoo.cfg
server.1=slave001:2888:3888
server.2=slave002:2888:3888
server.3=slave003:2888:3888
修改完成后,再把这几个文件同步到不同机器到相同conf目录下。
$ bin/start-zookeeper-quorum.sh
HA-cluster
$ bin/start-cluster.sh
分别打开:
http://master1:8081
http://master2:8081
可以看到两个页面最后都转到了同一个地址上,这个地址就是当前主JobManager所在机器,另一个就是standby JobManager。
当我们知道主JobManager后,我们可以把主JobManager进程kill掉,比如当前主JobManager在matser1这个机器上,就把这个进程杀掉.
$ bin/stop-cluster.sh
$ bin/stop-zookeeper-quorum.sh
flink-1.10.0
与hadoop
集成vi /etc/profile
# 添加如下内容:
export HADOOP_CONF_DIR=/home/hadoop-2.6.5/etc/hadoop
# 使环境变量生效
source /etc/profile
jar
包放到/flink-1.10.0/lib
目录下.什么情况下适合使用yarn模式跑flink job?
相对于standalone模式,yarn模式允许flink job的好处有:
YARN Session
模式只会启动一个JobManager
,如果启动失败会重启../bin/yarn-session.sh
命令参数用法:
# 查看命令参数
./bin/yarn-session.sh -h
# 用法:
必选
-n,--container 分配多少个yarn容器 (=taskmanager的数量)
可选
-D 动态属性
-d,--detached 独立运行
-jm,--jobManagerMemory JobManager的内存 [in MB]
-nm,--name 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue 指定YARN队列.
-s,--slots 每个TaskManager使用的slots数量
-tm,--taskManagerMemory 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace 针对HA模式在zookeeper上创建NameSpace
client必须要设置
HADOOP_HOME
,YARN_CONF_DIR
或者HADOOP_CONF_DIR
环境变量,通过这个环境变量来读取YARN
和HDFS
的配置信息,否则启动会失败。
创建一个YARN模式的flink集群:
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
提交一个flink job到flink集群:
./bin/flink run examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
这次提交
flink job
,虽然没有指定对应yarn application
的信息,确可以提交到对应的flink集群,原因在于/tmp/.yarn-properties-${user}
文件中保存了上一次创建yarn session的集群信息。所以如果同一用户在同一机器上再次创建一个yarn session
,则这个文件会被覆盖掉。
那如果删掉
/tmp/.yarn-properties-${user}
或者在另一个机器上提交作业能否提交到预期到yarn session
中呢?这也是可以的,如果配置了HighAvailability
,则可以根据cluster-id
,从zookeeper上获取到JobManager的地址和端口,从而提交作业。
high-availability.cluster-id
如果Yarn session没有配置HA,又该如何提交呢?这个时候就必须要在提交flink job的命令中指明YARN上的
application id
,通过“-yid”参数传入:
/bin/flink run -yid application_1548056325049_0048 examples/streaming/WordCount.jar --
input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
我们可以发现,每次跑完任务不久,
TaskManager
就没有了,下次在提交任务的时候,TaskManager
又会重新拉起来。如果希望TaskManager
启动后就持续运行,可以在conf/flink-conf.yaml
文件中配置下面这个参数,单位是milliseconds
,默认值是30000L
,即30秒
:
resourcemanager.taskmanager-timeout
如果你只想运行单个flink job后就退出,那么可以用下面这个命令:
./bin/flink run -m yarn-cluster -yn 2 examples/streaming/WordCount.jar --input hdfs:///test_dir/input_dir/story --output hdfs:///test_dir/output_dir/output
常用的配置有:
./bin/flink run -h
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
jobmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: hub.deri.org.cn/library/flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
while :;
do
if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*jobmanager*.log;
fi;
done"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 8081
name: ui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
selector:
app: flink
component: jobmanager
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
taskmanager-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: hub.deri.org.cn/library/flink:latest
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
kubectl apply -f .
kubectl delete -f .
[root@master k8s-flink]# kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-jobmanager ClusterIP 10.100.206.17 6123/TCP,6124/TCP,8081/TCP 3h47m
flink-jobmanager-rest NodePort 10.109.136.52 8081:30696/TCP 3h47m
kubernetes ClusterIP 10.96.0.1 443/TCP 4d21h
其中flink-jobmanager-rest
对应的30696
端口就是UI
的访问端口,也可以使用Ingress
来访问,这里测试使用NodePort
方式.
首先我们可以知道kubeadm join
格式如下:
kubeadm join 192.168.99.11:6443 --token 4qcl2f.gtl3h8e5kjltuo0r \
--discovery-token-ca-cert-hash sha256:7ed5404175cc0bf18dbfe53f19d4a35b1e3d40c19b10924275868ebf2a3bbe6e
master
节点执行:
[root@master ~]# kubeadm token list
TOKEN TTL EXPIRES USAGES DESCRIPTION EXTRA GROUPS
xh7hwq.a62kp9myjdde3nif 22h 2020-06-05T15:54:14+08:00 authentication,signing system:bootstrappers:kubeadm:default-node-token
注意
token
有效期,使用kubeadm token create
创建新的token
,kubeadm token create --ttl 0
生成一个永不过期的token
.
openssl x509 -pubkey -in /etc/kubernetes/pki/ca.crt | openssl rsa -pubin -outform der 2>/dev/null | openssl dgst -sha256 -hex | sed 's/^.* //'
kubeadm token create --print-join-command
容器生命周期钩子(Container Lifecycle Hooks
)监听容器生命周期的特定事件,并在事件发生时
spec:
containers:
- name: lifecycle-demo-container
image: nginx
lifecycle:
postStart:
exec:
command: ["/bin/sh", "-c", "echo Hello from the postStart handler > /usr/share/message"]
preStop:
exec:
command: ["/usr/sbin/nginx","-s","quit"]
执行已注册的回调函数。支持两种钩子:
postStart
: 容器启动后执行,注意由于是异步执行,它无法保证一定在ENTRYPOINT之后运行。如果失败,容器会被杀死,并根据RestartPolicy决定是否重启preStop
:容器停止前执行,常用于资源清理。如果失败,容器同样也会被杀死而钩子的回调函数支持两种方式:
exec
:在容器内执行命令httpGet
:向指定URL发起GET请求postStart
异步执行测试apiVersion: v1
kind: Pod
metadata:
name: test-post-start
spec:
containers:
- name: test-post-start-container
image: busybox
command: ["/bin/sh", "-c", "sleep 5 && echo $(date) 'written by entrypoint' >> log.log && sleep 600"]
lifecycle:
postStart:
exec:
command: ["/bin/sh", "-c", "sleep 10 && echo $(date) 'written by post start' >> log.log"]
创建上面的pod
,通过进入pod
,查看log.log
打印日志,证明:
如果 PostStart 会阻挡 ENTRYPOINT 的启动,则日志文件内容应该是:
(时间点 T)written by post start
(时间点 T + 约 10 秒)written by entrypoint
否则内容应该是:
(时间点 T)written by entrypoint
(时间点 T + 约 5 秒)written by post start
```log
实验结果:
```log
/ # cat log.log
Thu Jun 4 06:14:50 UTC 2020 written by entrypoint
Thu Jun 4 06:14:55 UTC 2020 written by post start
修改YML
apiVersion: v1
kind: Pod
metadata:
name: test-post-start
spec:
containers:
- name: test-post-start-container
image: busybox
command: ["/bin/sh", "-c", "sleep 15 && echo $(date) 'written by entrypoint' >> log.log && sleep 600"]
lifecycle:
postStart:
exec:
command: ["/bin/sh", "-c", "sleep 10 && echo $(date) 'written by post start' >> log.log"]
如果 PostStart 不是异步执行,则日志文件内容应该是:
(时间点 T)written by entrypoint
(时间点 T + 约 5 秒)written by post start
```log
否则内容应该是:
```log
(时间点 T)written by post start
(时间点 T + 约 5 秒)written by entrypoint
实验结果:
[root@master k8s]# kubectl exec -it test-post-start sh
/ # cat log.log
Thu Jun 4 06:17:54 UTC 2020 written by post start
Thu Jun 4 06:17:59 UTC 2020 written by entrypoint
/ #