概述
本章创建一个模拟数据源将模拟数据源源不断写到kafka
中,然后使用Flink SQL client
查询kafka
中的数据。
模拟数据由java对象序列化而成的json格式字符串,包含book的id、type、price和时间。
BookPojo.java
package com.deri.pojo.util;
import com.alibaba.fastjson.annotation.JSONField;
import java.sql.Timestamp;
import java.util.Date;
/**
* @ClassName: BookPojo
* @Description: TODO
* @Author: wuzhiyong
* @Time: 2020/3/20 9:48
* @Version: v1.0
**/
public class BookPojo {
private int id;
private String type;
private int price;
//Currently, JSON format only allowed the timestamp data is in RFC-3339, i.e. "2019-07-09 02:02:00.040Z".
//注意引入的是java.util.Date;而不是java.sql.Date;
@JSONField(format = "yyyy-MM-dd'T'HH:mm:ss'Z'")
private Date ts;
public BookPojo() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
public Date getTs() {
return ts;
}
public void setTs(Date ts) {
this.ts = ts;
}
}
模拟书籍Books.java
package com.deri.pojo.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* @ClassName: Books
* @Description: TODO
* @Author: wuzhiyong
* @Time: 2020/3/20 10:52
* @Version: v1.0
**/
public class Books {
public static BookPojo getBook(){
List<BookPojo> bookPojos = new ArrayList<>();
{
BookPojo book1 = new BookPojo();
book1.setId(1);
book1.setType("cs");
book1.setPrice(80);
bookPojos.add(book1);
}
{
BookPojo book1 = new BookPojo();
book1.setId(2);
book1.setType("math");
book1.setPrice(70);
bookPojos.add(book1);
}
{
BookPojo book1 = new BookPojo();
book1.setId(3);
book1.setType("ph");
book1.setPrice(60);
bookPojos.add(book1);
}
{
BookPojo book1 = new BookPojo();
book1.setId(4);
book1.setType("cs");
book1.setPrice(50);
bookPojos.add(book1);
}
{
BookPojo book1 = new BookPojo();
book1.setId(5);
book1.setType("math");
book1.setPrice(40);
bookPojos.add(book1);
}
{
BookPojo book1 = new BookPojo();
book1.setId(6);
book1.setType("ph");
book1.setPrice(30);
bookPojos.add(book1);
}
return bookPojos.get(new Random().nextInt(6));
}
}
创建flink source
package com.deri.pojo.util;
import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;
/**
* @ClassName: MyNoParalleSource
* @Description: TODO
* @Author: wuzhiyong
* @Time: 2020/3/4 10:15
* @Version: v1.0
**/
//使用并行度为1的source
public class MyPojoSource implements SourceFunction<String> {//1
private boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
BookPojo book = Books.getBook();
book.setTs(new Date());
ctx.collectWithTimestamp(JSON.toJSONString(book),book.getTs().getTime());
Thread.sleep(new Random().nextInt(4000));
}
}
@Override
public void cancel() {
isRunning = false;
}
}
创建flink任务,此任务只是将模拟数据写到kafka中
package com.deri.pojo;
import com.deri.pojo.util.MyPojoSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;
/**
* @ClassName: KafkaProducer
* @Description: TODO
* @Author: wuzhiyong
* @Time: 2020/3/4 10:16
* @Version: v1.0
**/
public class KafkaPojoProducer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.addSource(new MyPojoSource()).setParallelism(1)/*设置并行度*/;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.41.128:9092");
//new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("pojo4", new SimpleStringSchema(), properties);
//event-timestamp事件的发生时间
producer.setWriteTimestampToKafka(true);
text.addSink(producer);
env.execute("POJO Kafka Source");
}
}
配置sql-client-defaults.yaml
配置文件位于flink根目录/conf
下
配置table
tables:
- name: book
type: source-table
update-mode: append
connector:
property-version: 1
type: kafka
version: "universal"
topic: pojo4
startup-mode: earliest-offset
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
property-version: 1
type: json
# derive-schema: true
schema: "ROW<id INT, type STRING, price INT, ts TIMESTAMP>"
schema:
- name: id
data-type: INT
- name: type
data-type: STRING
- name: price
data-type: INT
- name: tss
data-type: TIMESTAMP
rowtime:
timestamps:
type: "from-field"
from: "ts"
watermarks:
type: "periodic-bounded"
delay: "60000"
启动SQL client
进入到flink根目录下,执行
./bin/sql-client.sh embedded
如果没有报错,可以看到一个小松鼠,并进入以下界面
...
Flink SQL>
一些测试SQL语句
- 最简单的查询
SELECT * FROM book;
- 查询价格大于50的
SELECT * FROM book GROUP BY id,price,type,tss having price > 50;
- 每五分钟book的总销售额
SELECT
id,
type,
TUMBLE_END(tss, INTERVAL '5' MINUTE) AS window_end,
SUM(price) AS sumprice
FROM book
GROUP BY
id,
type,
TUMBLE(tss, INTERVAL '5' MINUTE);
- 创建一个新的view,用于存放SELECT结果
CREATE VIEW book_view AS
SELECT * FROM book GROUP BY id,price,type,tss having price > 60;
- 根据id查询卖出的总数量
SELECT id,type,count(price) AS cnt FROM book GROUP BY id,type;
- 根据type查询卖出的总数量
SELECT type,count(*) AS cnt FROM book GROUP BY type;
- 查询过去5分钟不同id的书卖出的总额,每1分钟刷新一次
SELECT
id,
type,
HOP_START (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE) AS window_end,
SUM(price) AS sumprice
FROM book
GROUP BY
id,
type,
HOP (tss, INTERVAL '60' SECOND, INTERVAL '5' MINUTE);
- 创建一个模拟列,存放系统处理的时间
select id,price,tss, PROCTIME() AS ts from book;