flinkSQL时间戳转换成日期格式


问题

{"id":5,"price":40,"ts":1585125854697,"type":"math"}
{"id":3,"price":60,"ts":1585125861687,"type":"ph"}
{"id":1,"price":80,"ts":1585125862380,"type":"cs"}

模拟数据如上所示,时间字段是13位时间戳格式,在flink sql中直接转成TIMESTAMP格式会有问题。

参考阿里云日期函数TO_TIMESTAMP,文档中示例支持三种入参,

TIMESTAMP TO_TIMESTAMP(BIGINT time)
TIMESTAMP TO_TIMESTAMP(VARCHAR date)
TIMESTAMP TO_TIMESTAMP(VARCHAR date, VARCHAR format)

实际使用flink 1.10版本测试,TO_TIMESTAMP不能直接将BIGINT转成TIMESTAMP.

[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TO_TIMESTAMP' to arguments of type 'TO_TIMESTAMP()'. Supported form(s): 'TO_TIMESTAMP()' 'TO_TIMESTAMP(, )'

创建UDF,并用SQL client测试

创建一个UDF,传入一个Long型时间戳,返回Timestamp格式

public class TimeUdf extends ScalarFunction {

    private Long timestamp;

    public TimeUdf(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Timestamp eval(Long timestamp){
        return new Timestamp(timestamp);
    }
}

UDF需要的依赖

<!--Table Program Dependencies-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

使用maven打包成jar,并拷贝到flink目录lib中,重启flink。

mvn clean package

修改sql-client-defaults.yaml配置

...
#创建表
  - name: bookpojo
    type: source-table
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      # derive-schema: true
      schema: "ROW<id INT, type STRING, price INT, ts BIGINT>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: ts
        data-type: BIGINT

# 创建UDF
functions: 
  - name: TimeUdf
    from: class
    class: com.deri.udx.TimeUdf
    constructor: 
      - type: BIGINT
        value: 111111111

启动SQL client

./bin/sql-client.sh embedded

SQL查询

-- 时间戳
select * from bookpojo;
-- 转成日期
select id,price,type,TimeUdf(ts) AS ts from bookpojo;

最新研究:LONG型时间戳可以直接转成TIMESTAMP格式

需要在ROW中定义为LONG,Schema中定义为TIMESTAMP

CREATE TABLE `t` (
   ctm TIMESTAMP,
) WITH (
  'format.schema' = 'ROW<ctm LONG>'
)

sql-client-defaults.yaml中详细配置,测试成功

tables: 
  - name: bookpojo
    type: source-table
    update-mode: append
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      schema: "ROW<id INT, type STRING, price INT, ts LONG>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: tss
        data-type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"
            from: "ts"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"

文章作者: wuzhiyong
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wuzhiyong !
评论
 上一篇
flinkSQL用户自定义函数(UDX) flinkSQL用户自定义函数(UDX)
UDX分类参考 UDX分类 描述 UDF(User Defined Function) 用户自定义标量函数(User Defined Scalar Function)。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
2020-03-25
下一篇 
flink中状态管理 flink中状态管理
流处理状态管理有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能: 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已
2020-03-25
  目录