0%

flink应用场景

Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。

事件驱动型应用

什么是事件驱动型应用?

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。

事件驱动型应用是在计算存储分离的传统应用基础上进化而来。在传统架构中,应用需要读写远程事务型数据库。

相反,事件驱动型应用是基于状态化流处理来完成。在该设计中,数据和计算不会分离,应用只需访问本地(内存或磁盘)即可获取数据。系统容错性的实现依赖于定期向远程持久化存储写入 checkpoint。下图描述了传统应用和事件驱动型应用架构的区别。
事件驱动型应用

事件驱动型应用的优势?

事件驱动型应用无须查询远程数据库,本地数据访问使得它具有更高的吞吐和更低的延迟。而由于定期向远程持久化存储的 checkpoint 工作可以异步、增量式完成,因此对于正常事件处理的影响甚微。事件驱动型应用的优势不仅限于本地数据访问。传统分层架构下,通常多个应用会共享同一个数据库,因而任何对数据库自身的更改(例如:由应用更新或服务扩容导致数据布局发生改变)都需要谨慎协调。反观事件驱动型应用,由于只需考虑自身数据,因此在更改数据表示或服务扩容时所需的协调工作将大大减少。

事件驱动型应用会受制于底层流处理系统对时间和状态的把控能力,Flink 诸多优秀特质都是围绕这些方面来设计的。它提供了一系列丰富的状态操作原语,允许以精确一次的一致性语义合并海量规模(TB 级别)的状态数据。此外,Flink 还支持事件时间和自由度极高的定制化窗口逻辑,而且它内置的 ProcessFunction 支持细粒度时间控制,方便实现一些高级业务逻辑。同时,Flink 还拥有一个复杂事件处理(CEP)类库,可以用来检测数据流中的模式。

Flink 中针对事件驱动应用的明星特性当属 savepoint。Savepoint 是一个一致性的状态映像,它可以用来初始化任意状态兼容的应用。在完成一次 savepoint 后,即可放心对应用升级或扩容,还可以启动多个版本的应用来完成 A/B 测试。

典型的事件驱动型应用实例

  • 反欺诈
  • 异常检测
  • 基于规则的报警
  • 业务流程监控
  • (社交网络)Web 应用

数据分析应用

什么是数据分析应用?

数据分析任务需要从原始数据中提取有价值的信息和指标。传统的分析方式通常是利用批查询,或将事件记录下来并基于此有限数据集构建应用来完成。为了得到最新数据的分析结果,必须先将它们加入分析数据集并重新执行查询或运行应用,随后将结果写入存储系统或生成报告。

借助一些先进的流处理引擎,还可以实时地进行数据分析。和传统模式下读取有限数据集不同,流式查询或应用会接入实时事件流,并随着事件消费持续产生和更新结果。这些结果数据可能会写入外部数据库系统或以内部状态的形式维护。仪表展示应用可以相应地从外部数据库读取数据或直接查询应用的内部状态。

如下图所示,Apache Flink 同时支持流式及批量分析应用。
数据分析应用

流式分析应用的优势?

和批量分析相比,由于流式分析省掉了周期性的数据导入和查询过程,因此从事件中获取指标的延迟更低。不仅如此,批量查询必须处理那些由定期导入和输入有界性导致的人工数据边界,而流式查询则无须考虑该问题。

另一方面,流式分析会简化应用抽象。批量查询的流水线通常由多个独立部件组成,需要周期性地调度提取数据和执行查询。如此复杂的流水线操作起来并不容易,一旦某个组件出错将会影响流水线的后续步骤。而流式分析应用整体运行在 Flink 之类的高端流处理系统之上,涵盖了从数据接入到连续结果计算的所有步骤,因此可以依赖底层引擎提供的故障恢复机制。

Flink 为持续流式分析和批量分析都提供了良好的支持。具体而言,它内置了一个符合 ANSI 标准的 SQL 接口,将批、流查询的语义统一起来。无论是在记录事件的静态数据集上还是实时事件流上,相同 SQL 查询都会得到一致的结果。同时 Flink 还支持丰富的用户自定义函数,允许在 SQL 中执行定制化代码。如果还需进一步定制逻辑,可以利用 Flink DataStream API 和 DataSet API 进行更低层次的控制。此外,Flink 的 Gelly 库为基于批量数据集的大规模高性能图分析提供了算法和构建模块支持。

典型的数据分析应用实例

  • 电信网络质量监控
  • 移动应用中的产品更新及实验评估分析
  • 消费者技术中的实时数据即席分析
  • 大规模图分析

数据管道应用

什么是数据管道?

提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。

数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。

下图描述了周期性 ETL 作业和持续数据管道的差异。
数据管道应用

数据管道的优势?

和周期性 ETL 作业相比,持续数据管道可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。

很多常见的数据转换和增强操作可以利用 Flink 的 SQL 接口(或 Table API)及用户自定义函数解决。如果数据管道有更高级的需求,可以选择更通用的 DataStream API 来实现。Flink 为多种数据存储系统(如:Kafka、Kinesis、Elasticsearch、JDBC数据库系统等)内置了连接器。同时它还提供了文件系统的连续型数据源及数据汇,可用来监控目录变化和以时间分区的方式写入文件。

典型的数据管道应用实例

  • 电子商务中的实时查询索引构建
  • 电子商务中的持续 ETL

阿里蒋晓伟谈计算引擎Flink和Spark的对比

相比Spark、Hadoop、Storm等,是什么样的场景需求让阿里搜索团队选择了Flink?

首先我们希望有个流计算和批处理的一体化处理方案。Spark和Flink都具有流和批处理能力,但是他们的做法是相反的。Spark Streaming是把流转化成一个个小的批来处理,这种方案的一个问题是我们需要的延迟越低,额外开销占的比例就会越大,这导致了Spark Streaming很难做到秒级甚至亚秒级的延迟。Flink是把批当作一种有限的流,这种做法的一个特点是在流和批共享大部分代码的同时还能够保留批处理特有的一系列的优化。因为这个原因,如果要用一套引擎来解决流和批处理,那就必须以流处理为基础,所以我们决定先选择一个优秀的流处理引擎。从功能上流处理可以分为无状态的和有状态两种。在流处理的框架里引入状态管理大大提升了系统的表达能力,让用户能够很方便地实现复杂的处理逻辑,是流处理在功能上的一个飞跃。流处理引擎对一致性的支持可以分为:best effort,at least once 和 exactly once。Exactly once的语义才能真正保证完全的一致性,Flink采用的架构比较优雅地实现了exactly once的有状态流处理。另外在保证了一致性的前提下Flink在性能上也是相当优秀的。

总结一下:

  • 我们觉得在流处理方面Flink在功能,延迟,一致性和性能上综合来看是目前社区最优秀的。
  • 实现流和批的一体化方案。
  • Flink有一个比较活跃的社区。

您认为Flink未来的杀手级应用会是什么?

…,我觉得Flink在流计算上的优势是非常大的,随着在线学习等流计算需求的增长,在这方面Flink一定会大放异彩。

企业实践经典案例

日均处理万亿数据!Apache Flink在快手的应用实践与技术演进之路

快手

快手计算链路是从 DB/Binlog 以及 WebService Log 实时入到 Kafka 中,然后接入 Flink 做实时计算,其中包括实时 ETL、实时分析、Interval Join 以及实时训练,最后的结果存到 Druid、ES 或者 HBase 里面,后面接入一些数据应用产品;同时这一份 Kafka 数据实时 Dump 一份到 Hadoop 集群,然后接入离线计算。

  • 80% 统计监控:实时统计,包括各项数据的指标,监控项报警,用于辅助业务进行实时分析和监控;
  • 15% 数据处理:对数据的清洗、拆分、Join 等逻辑处理,例如大 Topic 的数据拆分、清洗;
  • 5% 数据处理:实时业务处理,针对特定业务逻辑的实时处理,例如实时调度。
  • 快手是分享短视频跟直播的平台,快手短视频、直播的质量监控是通过 Flink 进行实时统计,比如直播观众端、主播端的播放量、卡顿率、开播失败率等跟直播质量相关的多种监控指标;
  • 用户增长分析,实时统计各投放渠道拉新情况,根据效果实时调整各渠道的投放量;
  • 实时数据处理,广告展现流、点击流实时 Join,客户端日志的拆分等;
  • 直播 CDN 调度,实时监控各 CDN 厂商质量,通过 Flink 实时训练调整各个 CDN 厂商流量配比。

规模

快手目前集群规模有 1500 台左右,作业数量大约是 500 左右,日处理条目数总共有 1.7 万亿,峰值处理条目数大约是 3.7 千万。集群部署都是 On Yarn 模式,分为离线集群和实时集群两类集群,其中离线集群混合部署,机器通过标签进行物理隔离,实时集群是 Flink 专用集群,针对隔离性、稳定性要求极高的业务部署。

Interval Join 应用场景

快手

Interval Join 在快手的一个应用场景是广告展现点击流实时 Join 场景:打开快手 App 可能会收到广告服务推荐的广告视频,用户有时会点击展现的广告视频。这样在后端形成两份数据流,一份是广告展现日志,一份是客户端点击日志。这两份数据需进行实时 Join,将 Join 结果作为样本数据用于模型训练,训练出的模型会被推送到线上的广告服务。

该场景下展现以后 20 分钟的点击被认为是有效点击,实时 Join 逻辑则是点击数据 Join 过去 20 分钟展现。其中,展现流的数据量相对比较大,20 分钟数据在 1 TB 以上。最初实时 Join 过程是业务自己实现,通过 Redis 缓存广告展现日志,Kafka 延迟消费客户端点击日志实现 Join 逻辑,该方式缺点是实时性不高,并且随着业务增长需要堆积更多机器,运维成本非常高。基于 Flink 使用 Interval Join 完美契合此场景,并且实时性高,能够实时输出 Join 后的结果数据,对业务来说维护成本非常低,只需要维护一个 Flink 作业即可。

视频地址

视频地址

基础定义

有界/无界数据流

有状态计算

例如:过去一段时间的点击量
状态可持久化

时间官网

  • 事件发生时间
  • 事件进入flink时间
  • flink处理事件时间

API

3层API

应用场景

  • Data Pipeline
  • Data Analytics
  • Data Driven

flink - maven构建项目命令

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.10.0

下载kafka

在官网下载kafka最新版本。

如:当前最新2.4.0,针对Scala不同版本有不同的编译包,下载Scala 2.12的测试。

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
# 解压
tar -xzf kafka_2.12-2.4.0.tgz

#进入目录
cd kafka_2.12-2.4.0

启动

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

如果需要放到后台运行,可以加上&或者 nohup命令.
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

测试

创建主题

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

查看主题列表

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

向主题发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

从主题消费消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

搭建kafka集群

参考官网

./kafka-topics.sh --create --zookeeper 192.168.3.17:2181,192.168.3.19:2181,192.168.3.20:2181 --replication-factor 1 --partitions 1 --topic first_kafka

./kafka-console-producer.sh --broker-list 192.168.3.17:9092,192.168.3.19:9092,192.168.3.20:9092 --topic first_kafka

./kafka-console-consumer.sh --bootstrap-server 192.168.3.17:9092,192.168.3.19:9092,192.168.3.20:9092 --topic first_kafka --from-beginning

./kafka-topics.sh --describe --zookeeper 192.168.3.17:2181,192.168.3.19:2181,192.168.3.20:2181 --topic first_kafka

./kafka-topics.sh --list --zookeeper 192.168.3.17:2181,192.168.3.19:2181,192.168.3.20:2181

./kafka-topics.sh --delete --zookeeper 192.168.3.17:2181,192.168.3.19:2181,192.168.3.20:2181 --topic first_kafka

下载压缩包

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz

tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz 

cd apache-zookeeper-3.5.7-bin/

注意:zookeeper3.5.5版本开始,需要下载*-bin.tar.gz的包才能运行,否则启动会报错,缺少类。

拷贝zoo.cfg

用默认配置即可

cp conf/zoo_sample.cfg conf/zoo.cfg

启动&检查

启动

./bin/zkServer.sh start

检查

./bin/zkServer.sh status

停止

./bin/zkServer.sh stop

重启

./bin/zkServer.sh restart

修改hosts文件

151.101.72.133 assets-cdn.github.com
151.101.73.194 github.global.ssl.fastly.net
192.30.253.113 github.com
11.238.159.92 git.node5.mirror.et2sqa

如果使用Windows系统,则是配置在C:\Windows\System32\drivers\etc\hosts文件中。
如果使用Linux系统,则是配置在的/etc/hosts文件中。

国内突然被墙了

https://github.com.ipaddress.com/www.github.com

返回上面的地址,获取github.com最新IP,填写到hosts文件中.

Domain github.com
IP Address 140.82.112.4

查找本机安装的openjdk

rpm -qa | grep java

python-javapackages-3.4.1-11.el7.noarch
java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64
java-1.8.0-openjdk-headless-1.8.0.181-7.b13.el7.x86_64
tzdata-java-2018e-3.el7.noarch
javapackages-tools-3.4.1-11.el7.noarch

卸载

依次使用命令rpm -e --nodeps卸载

rpm -e --nodeps java-1.8.0-openjdk-1.8.0.181-7.b13.el7.x86_64

安装jdk1.8

#进入到安装目录
cd /usr/local/

tar -zxvf jdk-8u112-linux-x64.tar.gz
vim /etc/profile

文件末尾增加

#java_home
export JAVA_HOME=/usr/local/bin/jdk1.8.0_112
export PATH=$PATH:$JAVA_HOME/bin

测试

[root@node1 jdk1.8.0_112]# java -version
java version "1.8.0_112"
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)