0%

静态方式

直接在bootstrap.yml或者application.yml中写死;

spring:
  #mysql数据配置
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    username: root
    password: 123456
    url: jdbc:mysql://192.168.41.128:3306/gw?useUnicode=true&characterEncoding=utf8&useSSL=false

缺点是参数需要改变时,需要重新改写配置,再打包。特别docker部署时,环境变化了,配置改动麻烦。

解决办法

yml文件中,通过${Envirment_variable}的方式可以获取系统环境变量中的值;如上面的配置改写成:

spring:
  #mysql数据配置
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    username: ${USER_NAME:root}
    password: ${PASSWORD:123456}
    url: jdbc:mysql://${IP:192.168.41.128:3306}/gw?useUnicode=true&characterEncoding=utf8&useSSL=false

以上配置,优先设置系统环境变量(USER_NAME、PASSWORD、IP)的值,如果没有,才会使用默认值。

docker部署传入环境变量

  • 通过-e可以传入环境变量;
  • docker-compose可以通过environment传入系统环境变量;

概述

本章创建一个模拟数据源将模拟数据源源不断写到kafka中,然后使用Flink SQL client查询kafka中的数据。

模拟数据由java对象序列化而成的json格式字符串,包含book的id、type、price和时间。

BookPojo.java

package com.deri.pojo.util;

import com.alibaba.fastjson.annotation.JSONField;

import java.sql.Timestamp;
import java.util.Date;

/**
 * @ClassName: BookPojo
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 9:48
 * @Version: v1.0
 **/
public class BookPojo {
    private int id;
    private String type;
    private int price;
    //Currently, JSON format only allowed the timestamp data is in RFC-3339, i.e. "2019-07-09 02:02:00.040Z".
    //注意引入的是java.util.Date;而不是java.sql.Date;
    @JSONField(format = "yyyy-MM-dd'T'HH:mm:ss'Z'")
    private Date ts;

    public BookPojo() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }

    public Date getTs() {
        return ts;
    }

    public void setTs(Date ts) {
        this.ts = ts;
    }
}

模拟书籍Books.java

package com.deri.pojo.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: Books
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 10:52
 * @Version: v1.0
 **/
public class Books {

    public static BookPojo getBook(){
        List<BookPojo> bookPojos = new ArrayList<>();
        {
            BookPojo book1 = new BookPojo();
            book1.setId(1);
            book1.setType("cs");
            book1.setPrice(80);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(2);
            book1.setType("math");
            book1.setPrice(70);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(3);
            book1.setType("ph");
            book1.setPrice(60);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(4);
            book1.setType("cs");
            book1.setPrice(50);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(5);
            book1.setType("math");
            book1.setPrice(40);
            bookPojos.add(book1);
        }

        {
            BookPojo book1 = new BookPojo();
            book1.setId(6);
            book1.setType("ph");
            book1.setPrice(30);
            bookPojos.add(book1);
        }
        return bookPojos.get(new Random().nextInt(6));
    }
}
package com.deri.pojo.util;

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: MyNoParalleSource
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:15
 * @Version: v1.0
 **/
//使用并行度为1的source
public class MyPojoSource implements SourceFunction<String> {//1

    private boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            BookPojo book = Books.getBook();
            book.setTs(new Date());
            ctx.collectWithTimestamp(JSON.toJSONString(book),book.getTs().getTime());
            Thread.sleep(new Random().nextInt(4000));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

}

创建flink任务,此任务只是将模拟数据写到kafka中

package com.deri.pojo;

import com.deri.pojo.util.MyPojoSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * @ClassName: KafkaProducer
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:16
 * @Version: v1.0
 **/
public class KafkaPojoProducer {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyPojoSource()).setParallelism(1)/*设置并行度*/;

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.41.128:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("pojo4", new SimpleStringSchema(), properties);

        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
        text.addSink(producer);
        env.execute("POJO Kafka Source");
    }

}

配置sql-client-defaults.yaml

配置文件位于flink根目录/conf

配置table

tables: 
  - name: book
    type: source-table
    update-mode: append
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo4
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      # derive-schema: true
      schema: "ROW<id INT, type STRING, price INT, ts TIMESTAMP>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: tss
        data-type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"
            from: "ts"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"

启动SQL client

进入到flink根目录下,执行

./bin/sql-client.sh embedded

如果没有报错,可以看到一个小松鼠,并进入以下界面

...
Flink SQL> 

一些测试SQL语句

  1. 最简单的查询
SELECT * FROM book;
  1. 查询价格大于50的
SELECT * FROM book GROUP BY  id,price,type,tss  having price > 50;
  1. 每五分钟book的总销售额
SELECT 
id,
type,
TUMBLE_END(tss, INTERVAL '5' MINUTE) AS window_end,
SUM(price) AS sumprice
FROM book
GROUP BY  
id,
type,
TUMBLE(tss, INTERVAL '5' MINUTE);
  1. 创建一个新的view,用于存放SELECT结果
CREATE VIEW book_view AS
SELECT * FROM book GROUP BY  id,price,type,tss  having price > 60;
  1. 根据id查询卖出的总数量
SELECT id,type,count(price) AS cnt FROM book GROUP BY  id,type;
  1. 根据type查询卖出的总数量
SELECT type,count(*) AS cnt FROM book GROUP BY type;
  1. 查询过去5分钟不同id的书卖出的总额,每1分钟刷新一次
SELECT 
id,
type,
HOP_START (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE) AS window_end,
SUM(price) AS sumprice
FROM book
GROUP BY  
id,
type,
HOP (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE);
  1. 创建一个模拟列,存放系统处理的时间
select id,price,tss, PROCTIME() AS ts from book;

滚动窗口(TUMBLE)

滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分成[0:00 - 0:05)、[0:05, 0:10)、 [0:10, 0:15)等窗口。

tumble

语法

TUMBLE函数用在GROUP BY子句中,用来定义滚动窗口。

TUMBLE(<time-attr>, <size-interval>)
<size-interval>: INTERVAL 'string' timeUnit

示例

SELECT
TUMBLE_START(ts, INTERVAL '1' MINUTE),
TUMBLE_END(ts, INTERVAL '1' MINUTE),
username,
COUNT(click_url)
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username

具体参考阿里云

滑动窗口(HOP)

滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

滑动窗口有两个参数:slide和size。slide为每次滑动的步长,size为窗口的大小。

  • slide < size,则窗口会重叠,每个元素会被分配到多个窗口。
  • slide = size,则等同于滚动窗口(TUMBLE)。
  • slide > size,则为跳跃窗口,窗口之间不重叠且有间隙。

hop

通常情况下大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。

滑动窗口函数语法

HOP函数用在group by子句中,用来定义滑动窗口。

HOP(<time-attr>, <slide-interval>,<size-interval>)
<slide-interval>: INTERVAL 'string' timeUnit
<size-interval>: INTERVAL 'string' timeUnit  

示例

SELECT
    HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
    HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
    username,
    COUNT (click_url)
FROM
    user_clicks
GROUP BY
    HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),
    username  

具体参考阿里云

会话窗口(SESSION)

会话窗口(SESSION)通过Session活动来对元素进行分组。会话窗口与滚动窗口和滑动窗口相比,没有窗口重叠,没有固定窗口大小。相反,当它在一个固定的时间周期内不再收到元素,即会话断开时,这个窗口就会关闭。

会话窗口通过一个间隔时间(Gap)来配置,这个间隔定义了非活跃周期的长度。例如,一个表示鼠标点击活动的数据流可能具有长时间的空闲时间,并在两段空闲之间散布着高浓度的点击。 如果数据在指定的间隔(Gap)之后到达,则会开始一个新的窗口。

会话窗口示例如下图。每个Key由于不同的数据分布,形成了不同的Window。
session

会话窗口函数语法

SESSION函数用于在GROUP BY子句中定义会话窗口。

SESSION(<time-attr>, <gap-interval>)
<gap-interval>: INTERVAL 'string' timeUnit

示例

SELECT
SESSION_START(ts, INTERVAL '30' SECOND),
SESSION_END(ts, INTERVAL '30' SECOND),
username,
COUNT(click_url)
FROM user_clicks
GROUP BY SESSION(ts, INTERVAL '30' SECOND), username

具体参考阿里云

时间属性

Flink支持三种与流数据处理相关的时间概念:Processing TimeEvent TimeIngestion Time

time

Flink SQL仅支持其中的两种时间类型Event TimeProcessing Time

  • Event Time:您提供的事件时间(通常是数据的最原始的创建时间).Event Time必须是您提供在数据储存里的数据。
  • Processing Time:系统对事件进行处理的本地系统时间,单位为毫秒。

Event Time

Event Time也称为Row Time。EventTime时间属性必须在源表DDL中声明,可以将源表中的某一字段声明成Event Time。目前只支持将TIMESTAMP类型(将来会支持LONG类型)声明成Row Time字段。如果源表中需要声明为Event Time的列不是TIMESTAMP类型,需要借助计算列,基于现有列构造出一个TIMESTAMP类型的列。

由于数据本身的乱序、网络的抖动(网络堵塞导致的数据传输延迟的变化)或者其它原因,导致了数据到达的顺序和被处理的顺序,可能是不一致的(乱序)。因此定义一个Row Time字段,需要首先明文定义一个Watermark计算方法。

窗口函数基于Event Time聚合的示例如下:

CREATE TABLE tt_stream (
  a VARCHAR,
  b VARCHAR,
  ts TIMESTAMP,
  WATERMARK wk1 FOR ts as withOffset (ts, 1000) --Watermark计算方法。
) WITH (
  type = 'sls',
  topic = '<yourTopicName>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);

sql-client-default.yaml中配置如下:

...
format: 
    property-version: 1
    type: json
    # derive-schema: true
    schema: "ROW<id INT, type STRING, price INT, ts LONG>"
schema: 
    - name: tss
    data-type: TIMESTAMP
    rowtime:
        timestamps:
        type: "from-field"
        from: "ts"
        watermarks:
        type: "periodic-bounded"
        delay: "60000"
...

Processing Time

Processing Time是系统产生的,不在您的原始数据中,您需要在数据源表的声明中明文定义一个Processing Time列。

filedName as PROCTIME()

窗口函数基于Processing Time聚合的示例如下。

CREATE TABLE mq_stream (
    a VARCHAR,
    b VARCHAR,
    c BIGINT,
    ts AS PROCTIME () --在数据源表的声明中明文定义一个Processing Time列。
  ) WITH (
    type = 'mq',
    topic = '<yourTopic>',
    accessId = '<yourAccessId>',
    accessKey = '<yourAccessSecret>'
  );
select id,price,tss, PROCTIME() AS ts from book;

RFC3339

本地时间只包括当前的时间,不包含任何时区信息。同一时刻,东八区的本地时间比零时区的本地时间快了8个小时。在不同时区之间交换时间数据,除了用纯数字的时间戳,还有一种更方便人类阅读的表示方式:标准时间的偏移量表示方法。

RFC3339详细定义了互联网上日期/时间的偏移量表示:

2017-12-08T00:00:00.00Z

这个代表了UTC时间的2017年12月08日零时

2017-12-08T00:08:00.00+08:00

这个代表了同一时刻的,东八区北京时间(CST)表示的方法

上面两个时间的时间戳是等价的。两个的区别,就是在本地时间后面增加了时区信息。Z表示零时区。+08:00表示UTC时间增加8小时。

应用

目前很多软件都是采用RFC3339,如PrometheusFlinkInfluxDB等,注意与北京时间相差8小时。

UDX分类

参考

UDX分类 描述
UDF(User Defined Function) 用户自定义标量函数(User Defined Scalar Function)。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
UDAF(User Defined Aggregation Function) 自定义聚合函数,其输入与输出是多对一的关系, 即将多条输入记录聚合成一条输出值。可以与SQL中的GROUP BY语句一起使用。
UDTF(User Defined Table-valued Function) 自定义表函数,调用一次函数输出多行或多列数据。

注册用户定义的函数

在大多数情况下,必须先注册用户定义的函数,然后才能在查询中使用它。没有必要为Scala Table API注册函数。

TableEnvironment通过调用registerFunction()方法来注册函数。注册用户定义的函数时,会将其插入到函数目录中TableEnvironment,以便 Table APISQL解析器可以识别并正确转换它。

请找到如何注册,如何调用每个类型的用户定义函数(详细的例子ScalarFunctionTableFunctionAggregateFunction下面的子会话)。

自定义标量函数UDF

自定义聚合函数UDAF

自定义表值函数UDTF