0%

时间属性

Flink支持三种与流数据处理相关的时间概念:Processing TimeEvent TimeIngestion Time

time

Flink SQL仅支持其中的两种时间类型Event TimeProcessing Time

  • Event Time:您提供的事件时间(通常是数据的最原始的创建时间).Event Time必须是您提供在数据储存里的数据。
  • Processing Time:系统对事件进行处理的本地系统时间,单位为毫秒。

Event Time

Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。

由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要首先明文定义一个Watermark计算方法。

窗口函数基于Event Time聚合的示例如下:

CREATE TABLE tt_stream (
  a VARCHAR,
  b VARCHAR,
  ts TIMESTAMP,
  WATERMARK wk1 FOR ts as withOffset (ts, 1000) --Watermark计算方法。
) WITH (
  type = 'sls',
  topic = '<yourTopicName>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);

sql-client-default.yaml中配置如下:

...
format: 
    property-version: 1
    type: json
    # derive-schema: true
    schema: "ROW<id INT, type STRING, price INT, ts LONG>"
schema: 
    - name: tss
    data-type: TIMESTAMP
    rowtime:
        timestamps:
        type: "from-field"
        from: "ts"
        watermarks:
        type: "periodic-bounded"
        delay: "60000"
...

Processing Time

Processing Time是系统产生的,不在您的原始数据中,您需要在数据源表的声明中明文定义一个Processing Time列。

filedName as PROCTIME()

窗口函数基于Processing Time聚合的示例如下。

CREATE TABLE mq_stream (
    a VARCHAR,
    b VARCHAR,
    c BIGINT,
    ts AS PROCTIME () --在数据源表的声明中明文定义一个Processing Time列。
  ) WITH (
    type = 'mq',
    topic = '<yourTopic>',
    accessId = '<yourAccessId>',
    accessKey = '<yourAccessSecret>'
  );
select id,price,tss, PROCTIME() AS ts from book;

RFC3339

本地时间只包括当前的时间,不包含任何时区信息。同一时刻,东八区的本地时间比零时区的本地时间快了8个小时。在不同时区之间交换时间数据,除了用纯数字的时间戳,还有一种更方便人类阅读的表示方式:标准时间的偏移量表示方法。

RFC3339详细定义了互联网上日期/时间的偏移量表示:

2017-12-08T00:00:00.00Z

这个代表了UTC时间的2017年12月08日零时

2017-12-08T00:08:00.00+08:00

这个代表了同一时刻的,东八区北京时间(CST)表示的方法

上面两个时间的时间戳是等价的。两个的区别,就是在本地时间后面增加了时区信息。Z表示零时区。+08:00表示UTC时间增加8小时。

应用

目前很多软件都是采用RFC3339,如PrometheusFlinkInfluxDB等,注意与北京时间相差8小时。

UDX分类

参考

UDX分类 描述
UDF(User Defined Function) 用户自定义标量函数(User Defined Scalar Function)。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
UDAF(User Defined Aggregation Function) 自定义聚合函数,其输入与输出是多对一的关系, 即将多条输入记录聚合成一条输出值。可以与SQL中的GROUP BY语句一起使用。
UDTF(User Defined Table-valued Function) 自定义表函数,调用一次函数输出多行或多列数据。

注册用户定义的函数

在大多数情况下,必须先注册用户定义的函数,然后才能在查询中使用它。没有必要为Scala Table API注册函数。

TableEnvironment通过调用registerFunction()方法来注册函数。注册用户定义的函数时,会将其插入到函数目录中TableEnvironment,以便 Table APISQL解析器可以识别并正确转换它。

请找到如何注册,如何调用每个类型的用户定义函数(详细的例子ScalarFunctionTableFunctionAggregateFunction下面的子会话)。

自定义标量函数UDF

自定义聚合函数UDAF

自定义表值函数UDTF

问题

{"id":5,"price":40,"ts":1585125854697,"type":"math"}
{"id":3,"price":60,"ts":1585125861687,"type":"ph"}
{"id":1,"price":80,"ts":1585125862380,"type":"cs"}

模拟数据如上所示,时间字段是13位时间戳格式,在flink sql中直接转成TIMESTAMP格式会有问题。

参考阿里云日期函数TO_TIMESTAMP,文档中示例支持三种入参,

TIMESTAMP TO_TIMESTAMP(BIGINT time)
TIMESTAMP TO_TIMESTAMP(VARCHAR date)
TIMESTAMP TO_TIMESTAMP(VARCHAR date, VARCHAR format)

实际使用flink 1.10版本测试,TO_TIMESTAMP不能直接将BIGINT转成TIMESTAMP.

[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TO_TIMESTAMP' to arguments of type 'TO_TIMESTAMP()'. Supported form(s): 'TO_TIMESTAMP()' 'TO_TIMESTAMP(, )'

创建UDF,并用SQL client测试

创建一个UDF,传入一个Long型时间戳,返回Timestamp格式

public class TimeUdf extends ScalarFunction {

    private Long timestamp;

    public TimeUdf(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Timestamp eval(Long timestamp){
        return new Timestamp(timestamp);
    }
}

UDF需要的依赖

<!--Table Program Dependencies-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

使用maven打包成jar,并拷贝到flink目录lib中,重启flink。

mvn clean package

修改sql-client-defaults.yaml配置

...
#创建表
  - name: bookpojo
    type: source-table
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      # derive-schema: true
      schema: "ROW<id INT, type STRING, price INT, ts BIGINT>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: ts
        data-type: BIGINT

# 创建UDF
functions: 
  - name: TimeUdf
    from: class
    class: com.deri.udx.TimeUdf
    constructor: 
      - type: BIGINT
        value: 111111111

启动SQL client

./bin/sql-client.sh embedded

SQL查询

-- 时间戳
select * from bookpojo;
-- 转成日期
select id,price,type,TimeUdf(ts) AS ts from bookpojo;

最新研究:LONG型时间戳可以直接转成TIMESTAMP格式

需要在ROW中定义为LONG,Schema中定义为TIMESTAMP

CREATE TABLE `t` (
   ctm TIMESTAMP,
) WITH (
  'format.schema' = 'ROW<ctm LONG>'
)

sql-client-defaults.yaml中详细配置,测试成功

tables: 
  - name: bookpojo
    type: source-table
    update-mode: append
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      schema: "ROW<id INT, type STRING, price INT, ts LONG>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: tss
        data-type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"
            from: "ts"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"

流处理状态管理

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:

  • 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
  • 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

flink状态管理

Flink的一个算子有多个子任务,每个子任务分布在不同实例上,我们可以把状态理解为某个算子子任务在其当前实例上的一个变量,变量记录了数据流的历史信息。当新数据流入时,我们可以结合历史信息来进行计算。实际上,Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内输入流的某个整数字段求和,那么当算子子任务接收到新元素时,会获取已经存储在状态中的数值,然后将当前输入加到状态上,并将状态数据更新。
状态管理

获取和更新状态的逻辑其实并不复杂,但流处理框架还需要解决以下几类问题:

  • 数据的产出要保证实时性,延迟不能太高。
  • 需要保证数据不丢不重,恰好计算一次,尤其是当状态数据非常大或者应用出现故障需要恢复时,要保证状态的计算不出任何错误。
  • 一般流处理任务都是7*24小时运行的,程序的可靠性非常高。

基于上述要求,我们不能将状态直接交由内存管理,因为内存的容量是有限制的,当状态数据稍微大一些时,就会出现内存不够的问题。作为一个计算框架,Flink提供了有状态的计算,封装了一些底层的实现,比如状态的高效存储、Checkpoint和Savepoint持久化备份机制、计算资源扩缩容等问题。因为Flink接管了这些问题,开发者只需调用Flink API,这样可以更加专注于业务逻辑。

Flink的几种状态类型

Managed State和Raw State

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。从名称中也能读出两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。

Managed State Raw State
状态管理方式 Flink Runtime自动管理,自动存储,自动恢复,内存管理优化 用户自己管理;需要自己序列化
状态数据结构 已知的数据结构:value,list,map… 字节数组:byte[]
推荐使用场景 大多数场景 自定义Operator时可使用

两者的具体区别有:

  • 从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当我们横向伸缩,或者说我们修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。
  • 从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。
  • 从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。

Keyed State和Operator State

对Managed State继续细分,它又有两种类型:Keyed State和Operator State。

Keyed State

Keyed State是KeyedStream上的状态。假如输入流按照id为Key进行了keyBy分组,形成一个KeyedStream,数据流中所有id为1的数据共享一个状态,可以访问和更新这个状态,以此类推,每个Key对应一个自己的状态。下图展示了Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务1处理了两种Key,两种Key分别对应自己的状态。

状态管理

Operator State

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。

状态管理

无论是Keyed State还是Operator State,Flink的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。

在之前各算子的介绍中曾提到,为了自定义Flink的算子,我们可以重写Rich Function接口类,比如RichFlatMapFunction。使用Keyed State时,我们也可以通过重写Rich Function接口类,在里面创建和访问状态。对于Operator State,我们还需进一步实现CheckpointedFunction接口。

Keyed State Operator State
适用算子类型 只适用keyedStream上的算子 适用所有算子
状态分配 每一个key对应一个状态 一个算子上的子任务对应一个状态
创建和访问方式 重写Rich Function,通过里面的RuntimeContext访问 实现CheckpointedFunctionListCheckpointed接口
横向扩展 状态随着key自动在多个算子子任务上迁移 有多种状态重新分配的方式:均匀分配、合并后每个得到全量
支持的数据结构 ValueStateListStateMapStateReducingState ListState

几种KeyedState之间的关系

状态管理

几种KeyedState之间的差异

状态管理

状态的保存与恢复

状态管理

可选的状态存储方式

MemoryStateBackend

Checkpoint 的存储,第一种是内存存储,即 MemoryStateBackend,构造方法是设置最大的StateSize,选择是否做异步快照,这种存储状态本身存储在 TaskManager 节点也就是执行节点内存中的,因为内存有容量限制,所以单个 State maxStateSize 默认 5 M,且需要注意 maxStateSize <= akka.framesize 默认 10 M。Checkpoint 存储在 JobManager 内存中,因此总大小不超过 JobManager 的内存。推荐使用的场景为:本地测试、几乎无状态的作业,比如 ETL、JobManager 不容易挂,或挂掉影响不大的情况。不推荐在生产场景使用

FsStateBackend

在文件系统上的 FsStateBackend ,构建方法是需要传一个文件路径和是否异步快照。State 依然在 TaskManager 内存中,但不会像 MemoryStateBackend 有 5 M 的设置上限,Checkpoint 存储在外部文件系统(本地或 HDFS),打破了总大小 Jobmanager 内存的限制。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。推荐使用的场景、常规使用状态的作业、例如分钟级窗口聚合或 join、需要开启HA的作业

RocksDBStateBacked

还有一种存储为 RocksDBStateBackend ,RocksDB 是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中,但需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单 Key最大 2G,总大小不超过配置的文件系统容量即可。推荐使用的场景为:超大状态的作业,例如天级窗口聚合、需要开启 HA 的作业、最好是对状态读写性能要求不高的作业

参考链接

ProcessFunction简介

ProcessFunction是flink中最底层的API。
low-api

Flink的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限。如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。

目前,这个系列函数主要包括KeyedProcessFunctionProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessJoinFunctionProcessWindowFunction等多种函数,这些函数各有侧重,但核心功能比较相似,主要包括两点:

  • 状态:我们可以在这些函数中访问和更新Keyed State
  • 定时器(Timer):像定闹钟一样设置定时器,我们可以在时间维度上设计更复杂的业务逻辑。使用前先在Timer中注册一个未来的时间,当这个时间到达,闹钟会“响起”,程序会执行一个回调函数,回调函数中执行一定的业务逻辑。

ProcessFunction使用

ProcessFunction有两个重要的接口processElementonTimer

其中processElement函数在源码中的Java签名如下:

// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)

processElement方法处理数据流中的一条元素,并通过Collector<O>输出出来。Context是它的区别于FlatMapFunction等普通函数的特色,开发者可以通过Context来获取时间戳,访问TimerService,设置Timer

另外一个接口是onTimer

// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

这是一个回调函数,当到了“闹钟”时间,Flink会调用onTimer,并执行一些业务逻辑。这里也有一个参数OnTimerContext,它实际上是继承了前面的Context,与Context几乎相同。

使用Timer的方法主要逻辑为:

  1. processElement方法中通过Context注册一个未来的时间戳t。这个时间戳的语义可以是Processing Time,也可以是Event Time,根据业务需求来选择。
  2. onTimer方法中实现一些逻辑,到达t时刻,onTimer方法被自动调用。

获取、注册和删除Timer

  • Context中,我们可以获取一个TimerService,这是一个访问时间戳和Timer的接口。

  • 我们可以通过Context.timerService.registerProcessingTimeTimerContext.timerService.registerEventTimeTimer这两个方法来注册Timer,只需要传入一个时间戳即可。

  • 我们可以通过Context.timerService.deleteProcessingTimeTimerContext.timerService.deleteEventTimeTimer来删除之前注册的Timer。

  • 此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTimeContext.timerService.currentWatermark

注意,我们只能在KeyedStream上注册Timer。每个Key下可以使用不同的时间戳注册不同的Timer,但是每个Key的每个时间戳只能注册一个Timer。如果想在一个DataStream上应用Timer,可以将所有数据映射到一个伪造的Key上,但这样所有数据会流入一个算子子任务。

Flink 框架会自动忽略同一时间的重复注册Timer。

使用ProcessFunction实现Join

如果想从更细的粒度上实现两个数据流的Join,可以使用CoProcessFunctionKeyedCoProcessFunction。这两个函数都有processElement1processElement2方法,分别对第一个数据流和第二个数据流的每个元素进行处理。两个数据流的数据类型以及输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现Join

  • 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。
  • processElement1方法处理第一个数据流,更新状态a
  • processElement2方法处理第二个数据流,根据状态a中的数据,生成相应的输出。

参考链接