flinkSQL-client初体验


概述

本章创建一个模拟数据源将模拟数据源源不断写到kafka中,然后使用Flink SQL client查询kafka中的数据。

模拟数据由java对象序列化而成的json格式字符串,包含book的id、type、price和时间。

BookPojo.java

package com.deri.pojo.util;

import com.alibaba.fastjson.annotation.JSONField;

import java.sql.Timestamp;
import java.util.Date;

/**
 * @ClassName: BookPojo
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 9:48
 * @Version: v1.0
 **/
public class BookPojo {
    private int id;
    private String type;
    private int price;
    //Currently, JSON format only allowed the timestamp data is in RFC-3339, i.e. "2019-07-09 02:02:00.040Z".
    //注意引入的是java.util.Date;而不是java.sql.Date;
    @JSONField(format = "yyyy-MM-dd'T'HH:mm:ss'Z'")
    private Date ts;

    public BookPojo() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }

    public Date getTs() {
        return ts;
    }

    public void setTs(Date ts) {
        this.ts = ts;
    }
}

模拟书籍Books.java

package com.deri.pojo.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: Books
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 10:52
 * @Version: v1.0
 **/
public class Books {

    public static BookPojo getBook(){
        List<BookPojo> bookPojos = new ArrayList<>();
        {
            BookPojo book1 = new BookPojo();
            book1.setId(1);
            book1.setType("cs");
            book1.setPrice(80);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(2);
            book1.setType("math");
            book1.setPrice(70);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(3);
            book1.setType("ph");
            book1.setPrice(60);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(4);
            book1.setType("cs");
            book1.setPrice(50);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(5);
            book1.setType("math");
            book1.setPrice(40);
            bookPojos.add(book1);
        }

        {
            BookPojo book1 = new BookPojo();
            book1.setId(6);
            book1.setType("ph");
            book1.setPrice(30);
            bookPojos.add(book1);
        }
        return bookPojos.get(new Random().nextInt(6));
    }
}
package com.deri.pojo.util;

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: MyNoParalleSource
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:15
 * @Version: v1.0
 **/
//使用并行度为1的source
public class MyPojoSource implements SourceFunction<String> {//1

    private boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            BookPojo book = Books.getBook();
            book.setTs(new Date());
            ctx.collectWithTimestamp(JSON.toJSONString(book),book.getTs().getTime());
            Thread.sleep(new Random().nextInt(4000));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

}

创建flink任务,此任务只是将模拟数据写到kafka中

package com.deri.pojo;

import com.deri.pojo.util.MyPojoSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * @ClassName: KafkaProducer
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:16
 * @Version: v1.0
 **/
public class KafkaPojoProducer {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyPojoSource()).setParallelism(1)/*设置并行度*/;

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.41.128:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("pojo4", new SimpleStringSchema(), properties);

        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
        text.addSink(producer);
        env.execute("POJO Kafka Source");
    }

}

配置sql-client-defaults.yaml

配置文件位于flink根目录/conf

配置table

tables: 
  - name: book
    type: source-table
    update-mode: append
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo4
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      # derive-schema: true
      schema: "ROW<id INT, type STRING, price INT, ts TIMESTAMP>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: tss
        data-type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"
            from: "ts"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"

启动SQL client

进入到flink根目录下,执行

./bin/sql-client.sh embedded

如果没有报错,可以看到一个小松鼠,并进入以下界面

...
Flink SQL> 

一些测试SQL语句

  1. 最简单的查询

    SELECT * FROM book;
  2. 查询价格大于50的

    SELECT * FROM book GROUP BY  id,price,type,tss  having price > 50;
  3. 每五分钟book的总销售额

    SELECT 
    id,
    type,
    TUMBLE_END(tss, INTERVAL '5' MINUTE) AS window_end,
    SUM(price) AS sumprice
    FROM book
    GROUP BY  
    id,
    type,
    TUMBLE(tss, INTERVAL '5' MINUTE);
  4. 创建一个新的view,用于存放SELECT结果

    CREATE VIEW book_view AS
    SELECT * FROM book GROUP BY  id,price,type,tss  having price > 60;
  5. 根据id查询卖出的总数量

    SELECT id,type,count(price) AS cnt FROM book GROUP BY  id,type;
  6. 根据type查询卖出的总数量

    SELECT type,count(*) AS cnt FROM book GROUP BY type;
  7. 查询过去5分钟不同id的书卖出的总额,每1分钟刷新一次

    SELECT 
    id,
    type,
    HOP_START (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE) AS window_end,
    SUM(price) AS sumprice
    FROM book
    GROUP BY  
    id,
    type,
    HOP (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE);
  8. 创建一个模拟列,存放系统处理的时间

    select id,price,tss, PROCTIME() AS ts from book;

文章作者: wuzhiyong
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wuzhiyong !
评论
 上一篇
springboot利用系统变量和配置文件实现灵活配置 springboot利用系统变量和配置文件实现灵活配置
静态方式直接在bootstrap.yml或者application.yml中写死; spring: #mysql数据配置 datasource: driver-class-name: com.mysql.jdbc.Drive
2020-03-31
下一篇 
FlinkSQL中窗口函数 FlinkSQL中窗口函数
滚动窗口(TUMBLE)滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分成[0:00 - 0:05)、[0:0
2020-03-26
  目录