0%

问题

{"id":5,"price":40,"ts":1585125854697,"type":"math"}
{"id":3,"price":60,"ts":1585125861687,"type":"ph"}
{"id":1,"price":80,"ts":1585125862380,"type":"cs"}

模拟数据如上所示,时间字段是13位时间戳格式,在flink sql中直接转成TIMESTAMP格式会有问题。

参考阿里云日期函数TO_TIMESTAMP,文档中示例支持三种入参,

TIMESTAMP TO_TIMESTAMP(BIGINT time)
TIMESTAMP TO_TIMESTAMP(VARCHAR date)
TIMESTAMP TO_TIMESTAMP(VARCHAR date, VARCHAR format)

实际使用flink 1.10版本测试,TO_TIMESTAMP不能直接将BIGINT转成TIMESTAMP.

[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 'TO_TIMESTAMP' to arguments of type 'TO_TIMESTAMP()'. Supported form(s): 'TO_TIMESTAMP()' 'TO_TIMESTAMP(, )'

创建UDF,并用SQL client测试

创建一个UDF,传入一个Long型时间戳,返回Timestamp格式

public class TimeUdf extends ScalarFunction {

    private Long timestamp;

    public TimeUdf(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Timestamp eval(Long timestamp){
        return new Timestamp(timestamp);
    }
}

UDF需要的依赖

<!--Table Program Dependencies-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.11</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>1.10.0</version>
    <scope>provided</scope>
</dependency>

使用maven打包成jar,并拷贝到flink目录lib中,重启flink。

mvn clean package

修改sql-client-defaults.yaml配置

...
#创建表
  - name: bookpojo
    type: source-table
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      # derive-schema: true
      schema: "ROW<id INT, type STRING, price INT, ts BIGINT>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: ts
        data-type: BIGINT

# 创建UDF
functions: 
  - name: TimeUdf
    from: class
    class: com.deri.udx.TimeUdf
    constructor: 
      - type: BIGINT
        value: 111111111

启动SQL client

./bin/sql-client.sh embedded

SQL查询

-- 时间戳
select * from bookpojo;
-- 转成日期
select id,price,type,TimeUdf(ts) AS ts from bookpojo;

最新研究:LONG型时间戳可以直接转成TIMESTAMP格式

需要在ROW中定义为LONG,Schema中定义为TIMESTAMP

CREATE TABLE `t` (
   ctm TIMESTAMP,
) WITH (
  'format.schema' = 'ROW<ctm LONG>'
)

sql-client-defaults.yaml中详细配置,测试成功

tables: 
  - name: bookpojo
    type: source-table
    update-mode: append
    connector: 
      property-version: 1
      type: kafka
      version: "universal"
      topic: pojo
      startup-mode: earliest-offset
      properties:
        zookeeper.connect: localhost:2181
        bootstrap.servers: localhost:9092
        group.id: testGroup
    format: 
      property-version: 1
      type: json
      schema: "ROW<id INT, type STRING, price INT, ts LONG>"
    schema: 
      - name: id
        data-type: INT
      - name: type
        data-type: STRING
      - name: price
        data-type: INT
      - name: tss
        data-type: TIMESTAMP
        rowtime:
          timestamps:
            type: "from-field"
            from: "ts"
          watermarks:
            type: "periodic-bounded"
            delay: "60000"

流处理状态管理

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入数据的基础上不断更新状态。下面的几个场景都需要使用流处理的状态功能:

  • 数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重。
  • 检查输入流是否符合某个特定的模式,需要将之前流入的元素以状态的形式缓存下来。比如,判断一个温度传感器数据流中的温度是否在持续上升。
  • 对一个时间窗口内的数据进行聚合分析,分析一个小时内某项指标的75分位或99分位的数值。
  • 在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数。

flink状态管理

Flink的一个算子有多个子任务,每个子任务分布在不同实例上,我们可以把状态理解为某个算子子任务在其当前实例上的一个变量,变量记录了数据流的历史信息。当新数据流入时,我们可以结合历史信息来进行计算。实际上,Flink的状态是由算子的子任务来创建和管理的。一个状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。一个简单的例子是对一个时间窗口内输入流的某个整数字段求和,那么当算子子任务接收到新元素时,会获取已经存储在状态中的数值,然后将当前输入加到状态上,并将状态数据更新。
状态管理

获取和更新状态的逻辑其实并不复杂,但流处理框架还需要解决以下几类问题:

  • 数据的产出要保证实时性,延迟不能太高。
  • 需要保证数据不丢不重,恰好计算一次,尤其是当状态数据非常大或者应用出现故障需要恢复时,要保证状态的计算不出任何错误。
  • 一般流处理任务都是7*24小时运行的,程序的可靠性非常高。

基于上述要求,我们不能将状态直接交由内存管理,因为内存的容量是有限制的,当状态数据稍微大一些时,就会出现内存不够的问题。作为一个计算框架,Flink提供了有状态的计算,封装了一些底层的实现,比如状态的高效存储、Checkpoint和Savepoint持久化备份机制、计算资源扩缩容等问题。因为Flink接管了这些问题,开发者只需调用Flink API,这样可以更加专注于业务逻辑。

Flink的几种状态类型

Managed State和Raw State

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。从名称中也能读出两者的区别:Managed State是由Flink管理的,Flink帮忙存储、恢复和优化,Raw State是开发者自己管理的,需要自己序列化。

Managed State Raw State
状态管理方式 Flink Runtime自动管理,自动存储,自动恢复,内存管理优化 用户自己管理;需要自己序列化
状态数据结构 已知的数据结构:value,list,map… 字节数组:byte[]
推荐使用场景 大多数场景 自定义Operator时可使用

两者的具体区别有:

  • 从状态管理的方式上来说,Managed State由Flink Runtime托管,状态是自动存储、自动恢复的,Flink在存储管理和持久化上做了一些优化。当我们横向伸缩,或者说我们修改Flink应用的并行度时,状态也能自动重新分布到多个并行实例上。Raw State是用户自定义的状态。
  • 从状态的数据结构上来说,Managed State支持了一系列常见的数据结构,如ValueState、ListState、MapState等。Raw State只支持字节,任何上层数据结构需要序列化为字节数组。使用时,需要用户自己序列化,以非常底层的字节数组形式存储,Flink并不知道存储的是什么样的数据结构。
  • 从具体使用场景来说,绝大多数的算子都可以通过继承Rich函数类或其他提供好的接口类,在里面使用Managed State。Raw State是在已有算子和Managed State不够用时,用户自定义算子时使用。

Keyed State和Operator State

对Managed State继续细分,它又有两种类型:Keyed State和Operator State。

Keyed State

Keyed State是KeyedStream上的状态。假如输入流按照id为Key进行了keyBy分组,形成一个KeyedStream,数据流中所有id为1的数据共享一个状态,可以访问和更新这个状态,以此类推,每个Key对应一个自己的状态。下图展示了Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务1处理了两种Key,两种Key分别对应自己的状态。

状态管理

Operator State

Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。

状态管理

无论是Keyed State还是Operator State,Flink的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。

在之前各算子的介绍中曾提到,为了自定义Flink的算子,我们可以重写Rich Function接口类,比如RichFlatMapFunction。使用Keyed State时,我们也可以通过重写Rich Function接口类,在里面创建和访问状态。对于Operator State,我们还需进一步实现CheckpointedFunction接口。

Keyed State Operator State
适用算子类型 只适用keyedStream上的算子 适用所有算子
状态分配 每一个key对应一个状态 一个算子上的子任务对应一个状态
创建和访问方式 重写Rich Function,通过里面的RuntimeContext访问 实现CheckpointedFunctionListCheckpointed接口
横向扩展 状态随着key自动在多个算子子任务上迁移 有多种状态重新分配的方式:均匀分配、合并后每个得到全量
支持的数据结构 ValueStateListStateMapStateReducingState ListState

几种KeyedState之间的关系

状态管理

几种KeyedState之间的差异

状态管理

状态的保存与恢复

状态管理

可选的状态存储方式

MemoryStateBackend

Checkpoint 的存储,第一种是内存存储,即 MemoryStateBackend,构造方法是设置最大的StateSize,选择是否做异步快照,这种存储状态本身存储在 TaskManager 节点也就是执行节点内存中的,因为内存有容量限制,所以单个 State maxStateSize 默认 5 M,且需要注意 maxStateSize <= akka.framesize 默认 10 M。Checkpoint 存储在 JobManager 内存中,因此总大小不超过 JobManager 的内存。推荐使用的场景为:本地测试、几乎无状态的作业,比如 ETL、JobManager 不容易挂,或挂掉影响不大的情况。不推荐在生产场景使用

FsStateBackend

在文件系统上的 FsStateBackend ,构建方法是需要传一个文件路径和是否异步快照。State 依然在 TaskManager 内存中,但不会像 MemoryStateBackend 有 5 M 的设置上限,Checkpoint 存储在外部文件系统(本地或 HDFS),打破了总大小 Jobmanager 内存的限制。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。推荐使用的场景、常规使用状态的作业、例如分钟级窗口聚合或 join、需要开启HA的作业

RocksDBStateBacked

还有一种存储为 RocksDBStateBackend ,RocksDB 是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中,但需要注意 RocksDB 不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过 RocksDB 支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单 Key最大 2G,总大小不超过配置的文件系统容量即可。推荐使用的场景为:超大状态的作业,例如天级窗口聚合、需要开启 HA 的作业、最好是对状态读写性能要求不高的作业

参考链接

ProcessFunction简介

ProcessFunction是flink中最底层的API。
low-api

Flink的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限。如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。

目前,这个系列函数主要包括KeyedProcessFunctionProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessJoinFunctionProcessWindowFunction等多种函数,这些函数各有侧重,但核心功能比较相似,主要包括两点:

  • 状态:我们可以在这些函数中访问和更新Keyed State
  • 定时器(Timer):像定闹钟一样设置定时器,我们可以在时间维度上设计更复杂的业务逻辑。使用前先在Timer中注册一个未来的时间,当这个时间到达,闹钟会“响起”,程序会执行一个回调函数,回调函数中执行一定的业务逻辑。

ProcessFunction使用

ProcessFunction有两个重要的接口processElementonTimer

其中processElement函数在源码中的Java签名如下:

// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)

processElement方法处理数据流中的一条元素,并通过Collector<O>输出出来。Context是它的区别于FlatMapFunction等普通函数的特色,开发者可以通过Context来获取时间戳,访问TimerService,设置Timer

另外一个接口是onTimer

// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

这是一个回调函数,当到了“闹钟”时间,Flink会调用onTimer,并执行一些业务逻辑。这里也有一个参数OnTimerContext,它实际上是继承了前面的Context,与Context几乎相同。

使用Timer的方法主要逻辑为:

  1. processElement方法中通过Context注册一个未来的时间戳t。这个时间戳的语义可以是Processing Time,也可以是Event Time,根据业务需求来选择。
  2. onTimer方法中实现一些逻辑,到达t时刻,onTimer方法被自动调用。

获取、注册和删除Timer

  • Context中,我们可以获取一个TimerService,这是一个访问时间戳和Timer的接口。

  • 我们可以通过Context.timerService.registerProcessingTimeTimerContext.timerService.registerEventTimeTimer这两个方法来注册Timer,只需要传入一个时间戳即可。

  • 我们可以通过Context.timerService.deleteProcessingTimeTimerContext.timerService.deleteEventTimeTimer来删除之前注册的Timer。

  • 此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTimeContext.timerService.currentWatermark

注意,我们只能在KeyedStream上注册Timer。每个Key下可以使用不同的时间戳注册不同的Timer,但是每个Key的每个时间戳只能注册一个Timer。如果想在一个DataStream上应用Timer,可以将所有数据映射到一个伪造的Key上,但这样所有数据会流入一个算子子任务。

Flink 框架会自动忽略同一时间的重复注册Timer。

使用ProcessFunction实现Join

如果想从更细的粒度上实现两个数据流的Join,可以使用CoProcessFunctionKeyedCoProcessFunction。这两个函数都有processElement1processElement2方法,分别对第一个数据流和第二个数据流的每个元素进行处理。两个数据流的数据类型以及输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现Join

  • 创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。
  • processElement1方法处理第一个数据流,更新状态a
  • processElement2方法处理第二个数据流,根据状态a中的数据,生成相应的输出。

参考链接

List排序问题

java8中引入了流的概念,还有Lambda函数的概念,那么针对List<Object>排序有哪些方法呢?

首先我们创建一个User实体类:

public static class User {
        public String name;
        public int age;

        public User(String name, int age) {
            this.name = name;
            this.age = age;
        }
        //重写了toString方法,方便打印观察
        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    "}\n";
        }
}

创建List<User>对象

List<User> users = Arrays.asList(
        new User("zhang1", 14),
        new User("zhang2", 25),
        new User("zhang3", 16),
        new User("zhang4", 27),
        new User("zhang5", 18),
        new User("zhang6", 12)
);

我们想根据年龄来做个排序

方法一:

users.sort(new Comparator<User>() {
    @Override
    public int compare(User o1, User o2) {
        return o1.age - o2.age;
    }
});
users.forEach(System.out::print);

有了Lambda,直接可以简写成

users.sort((o1, o2) -> o1.age - o2.age);
users.forEach(System.out::print);

方法二:

users.stream().sorted((o1, o2) -> o1.age - o2.age).forEach(System.out::print);

以上都可以实现List<User>根据年龄排序,打印结果

User{name='zhang6', age=12}
User{name='zhang1', age=14}
User{name='zhang3', age=16}
User{name='zhang5', age=18}
User{name='zhang2', age=25}
User{name='zhang4', age=27}

案例说明

本案例主要结合kafka,实现:

  • 通过flink,向kafka中写入模拟数据book贩卖信息,包括书籍id、类型、价格、时间戳;
  • flink任务每五秒输出最近五分钟,根据id,不同书籍卖出的总价。

代码

BookPojo.java

书籍book基本类

package com.deri.pojo.util;

/**
 * @ClassName: BookPojo
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 9:48
 * @Version: v1.0
 **/
public class BookPojo {
    private int id;
    private String type;
    private int price;
    private long timestamp;

    public BookPojo() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }
}

Books.java

创建模拟数据,6本书,随机返回一本书。

package com.deri.pojo.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: Books
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/20 10:52
 * @Version: v1.0
 **/
public class Books {

    public static BookPojo getBook(){
        List<BookPojo> bookPojos = new ArrayList<>();
        {
            BookPojo book1 = new BookPojo();
            book1.setId(1);
            book1.setType("cs");
            book1.setPrice(80);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(2);
            book1.setType("math");
            book1.setPrice(70);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(3);
            book1.setType("ph");
            book1.setPrice(60);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(4);
            book1.setType("cs");
            book1.setPrice(50);
            bookPojos.add(book1);
        }
        {
            BookPojo book1 = new BookPojo();
            book1.setId(5);
            book1.setType("math");
            book1.setPrice(40);
            bookPojos.add(book1);
        }

        {
            BookPojo book1 = new BookPojo();
            book1.setId(6);
            book1.setType("ph");
            book1.setPrice(30);
            bookPojos.add(book1);
        }
        return bookPojos.get(new Random().nextInt(6));
    }
}

MyPojoSource.java

flink数据源,随机返回一本书并设置当时时间戳当作流水,并随机暂停几秒钟。

package com.deri.pojo.util;

import com.alibaba.fastjson.JSON;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * @ClassName: MyNoParalleSource
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:15
 * @Version: v1.0
 **/
//使用并行度为1的source
public class MyPojoSource implements SourceFunction<String> {//1

    private boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            BookPojo book = Books.getBook();
            book.setTimestamp(System.currentTimeMillis());
            ctx.collect(JSON.toJSONString(book));
            Thread.sleep(new Random().nextInt(8000));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

}

KafkaPojoProducer.java

flink将书籍贩卖流水转成json字符串,输入到kafka中。

package com.deri.pojo;

import com.deri.pojo.util.MyPojoSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.Properties;

/**
 * @ClassName: KafkaProducer
 * @Description: TODO
 * @Author: wuzhiyong
 * @Time: 2020/3/4 10:16
 * @Version: v1.0
 **/
public class KafkaPojoProducer {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyPojoSource()).setParallelism(1)/*设置并行度*/;

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.41.128:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("pojosource", new SimpleStringSchema(), properties);
/*
        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
*/
        text.addSink(producer);
        env.execute("POJO Kafka Source");
    }

}

KafkaPojoStream.java

从上面pojosource主题中,获取书籍贩卖流水,每10秒打印出过去5分钟每种书贩卖的总价。

package com.deri.pojo;

import com.alibaba.fastjson.JSON;
import com.deri.pojo.util.BookPojo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * @ClassName: KafkaWordCount
 * @Description: 从kafka主题中读取数据,进行word count
 * @Author: wuzhiyong
 * @Time: 2020/3/18 15:05
 * @Version: v1.0
 **/
public class KafkaPojoStream {

    public static void main(String[] args) throws Exception {
        // 创建Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.41.128:9092");
        properties.setProperty("group.id", "flink-group");
        String inputTopic = "pojosource";
        String outputTopic = "pojosink";

        FlinkKafkaConsumer<String> consumer =
                new FlinkKafkaConsumer<>(inputTopic, new SimpleStringSchema(), properties);
        FlinkKafkaProducer<String> producer =
                new FlinkKafkaProducer<>(outputTopic, new SimpleStringSchema(), properties);

        //设置EventTime,结合assignTimestampsAndWatermarks一起使用
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> stream = env.addSource(consumer);

        DataStream<String> bookPojoDataStream = stream.map(s -> JSON.parseObject(s, BookPojo.class))
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<BookPojo>() {
                    @Override
                    public long extractAscendingTimestamp(BookPojo bookPojo) {
                        return bookPojo.getTimestamp();
                    }
                })
//                .filter(bookPojo -> bookPojo.getType().equalsIgnoreCase("cs"))
                .keyBy("id")
//                .timeWindow(Time.seconds(10))
                //设置一个5分钟滑动窗口,每10秒滑动一次
                .timeWindow(Time.minutes(5), Time.seconds(10))
                //这边使用更加通用的reduce处理,累加书籍贩卖总流水
                .reduce(new ReduceFunction<BookPojo>() {
                    @Override
                    public BookPojo reduce(BookPojo bookPojo, BookPojo t1) throws Exception {
                        BookPojo book = new BookPojo();
                        book.setId(bookPojo.getId());
                        book.setPrice(bookPojo.getPrice()+ t1.getPrice());
                        book.setType(t1.getType());
                        book.setTimestamp(t1.getTimestamp());
                        return book;
                    }
                })
                .map(s -> JSON.toJSONString(s));

//        bookPojoDataStream.addSink(producer);
        bookPojoDataStream.print();
        // execute
        env.execute("kafka streaming pojo");

    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

pom.xml

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.deri</groupId>
    <artifactId>flink_kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.54</version>
        </dependency>
        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.deri.kafka.KafkaProducer</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

问题

国内git clone github项目由于网络原因比较慢

解决办法

git clone https://github.com/apache/flink.git

改成

git clone https://github.com.cnpmjs.org/apache/flink.git
# 全局配置, 不用每次都去修改地址了
git config --global url."https://github.com/".insteadOf https://github.com.cnpmjs.org/

# 删除全局配置
gti config --global --unset url."https://github.com/".insteadOf