0%

flink时间属性

时间属性

Flink支持三种与流数据处理相关的时间概念:Processing TimeEvent TimeIngestion Time

time

Flink SQL仅支持其中的两种时间类型Event TimeProcessing Time

  • Event Time:您提供的事件时间(通常是数据的最原始的创建时间).Event Time必须是您提供在数据储存里的数据。
  • Processing Time:系统对事件进行处理的本地系统时间,单位为毫秒。

Event Time

Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。

由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要首先明文定义一个Watermark计算方法。

窗口函数基于Event Time聚合的示例如下:

CREATE TABLE tt_stream (
  a VARCHAR,
  b VARCHAR,
  ts TIMESTAMP,
  WATERMARK wk1 FOR ts as withOffset (ts, 1000) --Watermark计算方法。
) WITH (
  type = 'sls',
  topic = '<yourTopicName>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);

sql-client-default.yaml中配置如下:

...
format: 
    property-version: 1
    type: json
    # derive-schema: true
    schema: "ROW<id INT, type STRING, price INT, ts LONG>"
schema: 
    - name: tss
    data-type: TIMESTAMP
    rowtime:
        timestamps:
        type: "from-field"
        from: "ts"
        watermarks:
        type: "periodic-bounded"
        delay: "60000"
...

Processing Time

Processing Time是系统产生的,不在您的原始数据中,您需要在数据源表的声明中明文定义一个Processing Time列。

filedName as PROCTIME()

窗口函数基于Processing Time聚合的示例如下。

CREATE TABLE mq_stream (
    a VARCHAR,
    b VARCHAR,
    c BIGINT,
    ts AS PROCTIME () --在数据源表的声明中明文定义一个Processing Time列。
  ) WITH (
    type = 'mq',
    topic = '<yourTopic>',
    accessId = '<yourAccessId>',
    accessKey = '<yourAccessSecret>'
  );
select id,price,tss, PROCTIME() AS ts from book;