0%

flink多数据流转换操作:join、union、connect

操作概览

操作概览

本文主要讲flink中多条数据流转换操作:join、union、connect

join

批处理经常要解决的问题是将两个数据源做关联Join操作。比如,很多手机APP都有一个用户数据源User,同时APP会记录用户的行为,我们称之为Behavior,两个表按照userId来进行Join。在流处理场景下,Flink也支持了Join,只不过Flink是在一个时间窗口上来进行两个表的Join。
操作概览
目前,Flink支持了两种Join:Window Join(窗口连接)和Interval Join(时间间隔连接)。

Window Join

从名字中能猜到,Window Join主要在Flink的窗口上进行操作,它将两个流中落在相同窗口的元素按照某个Key进行Join。一个Window Join的大致骨架结构为:

input1.join(input2)
    .where(<KeySelector>)      <- input1使用哪个字段作为Key
    .equalTo(<KeySelector>)    <- input2使用哪个字段作为Key
    .window(<WindowAssigner>)  <- 指定WindowAssigner
    [.trigger(<Trigger>)]      <- 指定Trigger(可选)
    [.evictor(<Evictor>)]      <- 指定Evictor(可选)
    .apply(<JoinFunction>)     <- 指定JoinFunction

下图展示了Join的大致过程。两个输入数据流先分别按Key进行分组,然后将元素划分到窗口中。窗口的划分需要使用WindowAssigner来定义,这里可以使用Flink提供的滚动窗口、滑动窗口或会话窗口等默认的WindowAssigner。随后两个数据流中的元素会被分配到各个窗口上,也就是说一个窗口会包含来自两个数据流的元素。相同窗口内的数据会以INNER JOIN的语义来相互关联,形成一个数据对。当窗口的时间结束,Flink会调用JoinFunction来对窗口内的数据对进行处理。当然,我们也可以使用Trigger或Evictor做一些自定义优化,他们的使用方法和普通窗口的使用方法一样。
join

接下来我们重点分析一下两个数据流是如何INNER JOIN的:
join
一般滴,INNER JOIN只对两个数据源都出现的元素做Join,形成一个数据对,即数据源input1中的某个元素与数据源input2中的所有元素逐个配对。当数据源某个窗口内没数据时,比如图中的第三个窗口,Join的结果也是空的。

如果INNER JOIN不能满足我们的需求,CoGroupFunction提供了更多可自定义的功能。需要注意的是,在调用时,要写成input1.coGroup(input2).where(<KeySelector>).equalTo(<KeySelecotr>)

Interval Join

与Window Join不同,Interval Join不依赖Flink的WindowAssigner,而是根据一个时间间隔(Interval)界定时间。Interval需要一个时间下界(lower bound)和上界(upper bound),如果我们将input1和input2进行Interval Join,input1中的某个元素为input1.element1,时间戳为input1.element1.ts,那么一个Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在这个时间段内的元素将会和input1.element1组成一个数据对。

用数学公式表达为,凡是符合下面公式input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN语义,两两组合在一起。上下界可以是正数也可以是负数。

注意,目前Flink(1.9)的Interval Join只支持Event Time语义。

join

默认的时间间隔是包含上下界的,我们可以使用.lowerBoundExclusive().upperBoundExclusive来确定是否需要包含上下界。

val intervalJoinResult = input1.keyBy(_._1)
      .intervalJoin(input2.keyBy(_._1))
      .between(Time.milliseconds(-5), Time.milliseconds(10))
      .upperBoundExclusive()
      .lowerBoundExclusive()
      .process(new MyProcessFunction)

Interval Join内部是用缓存来存储所有数据的,因此需要注意缓存数据不能太大,以免对内存造成绝大压力。

union

DataStream上使用union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图union对白色和深色两个数据流进行合并,生成一个数据流。

union

connect

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  • connect只能连接两个数据流union可以连接多个数据流
  • connect所连接的两个数据流的数据类型可以不一致union所连接的两个数据流的数据类型必须一致
  • 两个DataStream经过connect之后被转化为ConnectedStreamsConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上,如下图所示。控制流可以是阈值、规则、机器学习模型或其他参数。
connect

  • 对于ConnectedStreams,我们需要重写CoMapFunction或CoFlatMapFunction。这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。
  • 在重写函数时,对于CoMapFunction,map1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunction,flatMap1处理第一个流的数据,flatMap2处理第二个流的数据。
  • Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1或flatMap1会被调用,第二个数据流有数据到达时,map2或flatMap2会被调用。
  • Flink允许我们将connect和keyBy或broadcast结合起来使用。

原文链接