时间属性
Flink支持三种与流数据处理相关的时间概念:Processing Time
、Event Time
和Ingestion Time
。
Flink SQL仅支持其中的两种时间类型Event Time
和Processing Time
:
Event Time
:您提供的事件时间(通常是数据的最原始的创建时间).Event Time
必须是您提供在数据储存里的数据。Processing Time
:系统对事件进行处理的本地系统时间,单位为毫秒。
Event Time
Event Time
也称为Row Time
。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列
,基于现有列构造出一个TIMESTAMP类型的列。
由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要首先明文定义一个Watermark计算方法。
窗口函数基于Event Time聚合的示例如下:
CREATE TABLE tt_stream (
a VARCHAR,
b VARCHAR,
ts TIMESTAMP,
WATERMARK wk1 FOR ts as withOffset (ts, 1000) --Watermark计算方法。
) WITH (
type = 'sls',
topic = '<yourTopicName>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessSecret>'
);
sql-client-default.yaml
中配置如下:
...
format:
property-version: 1
type: json
# derive-schema: true
schema: "ROW<id INT, type STRING, price INT, ts LONG>"
schema:
- name: tss
data-type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "ts"
watermarks:
type: "periodic-bounded"
delay: "60000"
...
Processing Time
Processing Time
是系统产生的,不在您的原始数据中,您需要在数据源表的声明中明文定义一个Processing Time
列。
filedName as PROCTIME()
窗口函数基于Processing Time聚合的示例如下。
CREATE TABLE mq_stream (
a VARCHAR,
b VARCHAR,
c BIGINT,
ts AS PROCTIME () --在数据源表的声明中明文定义一个Processing Time列。
) WITH (
type = 'mq',
topic = '<yourTopic>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessSecret>'
);
select id,price,tss, PROCTIME() AS ts from book;