0%

flink常用算子及代码

以下列举下常用的算子,用到的代码例子都是Flink监听9000端口做为数据源。以下方法可以启动一个9000的socket端口服务。

Linux平台上可以使用

bash
nc -lk 9000

如果是 Windows 平台,可以通过 https://nmap.org/ncat/ 安装 ncat 然后运行:

bash
ncat -lk 9000

map

map可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素。

举例1:

package operators;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

//这个例子是监听9000 socket端口,对于发送来的数据,以\n为分隔符分割后进行处理,
//将分割后的每个元素,添加上一个字符串后,打印出来。
public class MapDemo {
    private static int index = 1;
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.map操作。
        DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

举例2:

package com.bigdata.flink.dataStreamMapOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Map: 一对一转换
 */
public class DataStreamMapOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 商品的价格乘以8
        SingleOutputStreamOperator<UserAction> result = source.map(new MapFunction<UserAction, UserAction>() {
            @Override
            public UserAction map(UserAction value) throws Exception {

                int newPrice = value.getProductPrice() * 8;
                return new UserAction(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
            }
        });

        // 输出: 输出到控制台
        // UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=64)
        result.print();

        env.execute();
    }
}

flatmap

flatmap可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。

举例1:

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//这个例子是用Flink监听9000端口,将接受的字符串用\n分割为一个个的元素
//然后将每个元素拆为一个个的字符,并打印出来
public class FlatMapDemo {
    private static int index1 = 1;
    private static int index2 = 1;

    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.flatMap操作,对每一行字符串进行分割
        DataStream<String> result = textStream.flatMap((String s, Collector<String> collector) -> {
            for (String str : s.split("")) {
                collector.collect(str);
            }
        })
        //这个地方要注意,在flatMap这种参数里有泛型算子中。
        //如果用lambda表达式,必须将参数的类型显式地定义出来。
        //并且要有returns,指定返回的类型
        //详情可以参考Flink官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/java_lambdas.html
        .returns(Types.STRING);

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

举例2:

package com.bigdata.flink.dataStreamFlatMapOperator;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Summary:
 *      FlatMap: 一行变任意行(0~多行)
 */
public class DataStreamFlatMapOperator {
  public static void main(String[] args) throws Exception{
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 输入: 英文电影台词
      DataStreamSource<String> source = env
              .fromElements(
                      "You jump I jump",
                      "Life was like a box of chocolates"
              );

      // 转换: 将包含chocolates的句子转换为每行一个单词
      SingleOutputStreamOperator<String> result = source.flatMap(new FlatMapFunction<String, String>() {
          @Override
          public void flatMap(String value, Collector<String> out) throws Exception {
              if(value.contains("chocolates")){
                  String[] words = value.split(" ");
                  for (String word : words) {
                      out.collect(word);
                  }
              }
          }
      });

      // 输出: 输出到控制台
      // Life
      // was
      // like
      // a
      // box
      // of
      // chocolates
      result.print();

      env.execute();
  }
}

filter

filter是进行筛选。

举例:

package operators;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FilterDemo {
    private static int index = 1;
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.filter操作,筛选非空行。
        DataStream<String> result = textStream.filter(line->!line.trim().equals(""));
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamFilterOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Fliter: 过滤出需要的数据
 */
public class DataStreamFilterOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 过滤: 过滤出用户ID为userID1的用户行为
        SingleOutputStreamOperator<UserAction> result = source.filter(new FilterFunction<UserAction>() {
            @Override
            public boolean filter(UserAction value) throws Exception {
                return value.getUserID().equals("userID1");
            }
        });

        // 输出: 输出到控制台
        // UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=10)
        // UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=10)
        result.print();

        env.execute();

    }
}

keyBy

KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。

注意:

  • 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
  • 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
  • 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
  • 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {…})指定字段进行分区。

举例:

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.concurrent.TimeUnit;

//这个例子是每行输入一个单词,以单词为key进行计数
//每10秒统计一次每个单词的个数
public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        DataStream<Tuple2<String, Integer>> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                //keyBy进行分区,按照第一列,也就是按照单词进行分区
                .keyBy(0)
                //指定窗口,每10秒个计算一次
                .timeWindow(Time.of(10, TimeUnit.SECONDS))
                //计算个数,计算第1列
                .sum(1);
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamKeyByOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。
 */
public class DataStreamKeyByOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 按指定的Key(这里,用户ID)对数据重分区,将相同Key(用户ID)的数据分到同一个分区
        KeyedStream<UserAction, String> result = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserID();
            }
        });

        // 输出: 输出到控制台
        //3> UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=10)
        //3> UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=10)
        //2> UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=8)
        result.print().setParallelism(3);

        env.execute();

    }
}

reduce

reduce是归并操作,它可以将KeyedStream 转变为 DataStream。

Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
注意: Reduce会输出每一次滚动聚合的结果。

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.concurrent.TimeUnit;

//这个例子是对流进行分组,分组后进归并操作。
//是wordcount的另外一种实现方法
public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        DataStream<Tuple2<String, Integer>> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                //keyBy进行分区,按照第一列,也就是按照单词进行分区
                .keyBy(0)
                //指定窗口,每10秒个计算一次
                .timeWindow(Time.of(10, TimeUnit.SECONDS))
                //对每一组内的元素进行归并操作,即第一个和第二个归并,结果再与第三个归并...
                .reduce((Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) -> new Tuple2(t1.f0, t1.f1 + t2.f1));

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamReduceOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *     Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
 */
public class DataStreamReduceOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID2", 1293984002, "browse", "productID2", 8),
                new UserAction("userID2", 1293984003, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10),
                new UserAction("userID1", 1293984003, "click", "productID3", 10),
                new UserAction("userID1", 1293984004, "click", "productID1", 10)
        ));

        // 转换: KeyBy对数据重分区
        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserID();
            }
        });

        // 转换: Reduce滚动聚合。这里,滚动聚合每个用户对应的商品总价格。
        SingleOutputStreamOperator<UserAction> result = keyedStream.reduce(new ReduceFunction<UserAction>() {
            @Override
            public UserAction reduce(UserAction value1, UserAction value2) throws Exception {
                int newProductPrice = value1.getProductPrice() + value2.getProductPrice();
                return new UserAction(value1.getUserID(), -1, "", "", newProductPrice);
            }
        });

        // 输出: 将每次滚动聚合后的结果输出到控制台。
        //3> UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=8)
        //3> UserAction(userID=userID2, eventTime=-1, eventType=, productID=, productPrice=16)
        //3> UserAction(userID=userID2, eventTime=-1, eventType=, productID=, productPrice=24)
        //4> UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=10)
        //4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=20)
        //4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=30)
        //4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=40)
        result.print();

        env.execute();
    }
}

fold

基于初始值和FoldFunction进行滚动折叠(Fold),并向下游算子输出每次滚动折叠后的结果。
注意: Fold会输出每一次滚动折叠的结果。

举例1:

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.concurrent.TimeUnit;

public class FoldDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        DataStream<String> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                //keyBy进行分区,按照第一列,也就是按照单词进行分区
                .keyBy(0)
                //指定窗口,每10秒个计算一次
                .timeWindow(Time.of(10, TimeUnit.SECONDS))
                //指定一个开始的值,对每一组内的元素进行归并操作,即第一个和第二个归并,结果再与第三个归并...
                .fold("结果:",(String current, Tuple2<String, Integer> t2) -> current+t2.f0+",");

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamFoldOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Fold: 基于初始值和自定义的FoldFunction滚动折叠后发出新值
 */
public class DataStreamFoldOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID2", 1293984002, "browse", "productID2", 8),
                new UserAction("userID2", 1293984003, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10),
                new UserAction("userID1", 1293984003, "click", "productID3", 10),
                new UserAction("userID1", 1293984004, "click", "productID1", 10)
        ));

        // 转换: KeyBy对数据重分区
        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserID();
            }
        });

        // 转换: Fold 基于初始值和FoldFunction滚动折叠
        SingleOutputStreamOperator<String> result = keyedStream.fold("浏览的商品及价格:", new FoldFunction<UserAction, String>() {
            @Override
            public String fold(String accumulator, UserAction value) throws Exception {
                if(accumulator.startsWith("userID")){
                    return accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
                }else {
                    return value.getUserID()+" " +accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
                }
            }
        });

        // 输出: 输出到控制台
        // 每一条数据都会触发计算并输出
        // userID1 浏览的商品及价格: -> productID1:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10 -> productID3:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10 -> productID3:10 -> productID1:10
        // userID2 浏览的商品及价格: -> productID2:8
        // userID2 浏览的商品及价格: -> productID2:8 -> productID2:8
        // userID2 浏览的商品及价格: -> productID2:8 -> productID2:8 -> productID2:8
        result.print();

        env.execute();

    }
}

Aggregate

Aggregate 对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果。默认的聚合函数有:sum、min、minBy、max、mabBy。

注意:

  • max(field)与maxBy(field)的区别: maxBy返回field最大的那条数据;而max则是将最大的field的值赋值给第一条数据并返回第一条数据。同理,min与minBy。
  • Aggregate聚合算子会滚动输出每一次聚合后的结果。
package com.bigdata.flink.dataStreamAggregateOperator;

import com.bigdata.flink.beans.UserActionLogPOJO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * Summary:
 *     Aggregate: min()、minBy()、max()、maxBy() 滚动聚合并输出每次滚动聚合后的结果
 */
public class DataStreamAggregateOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        ArrayList<UserActionLogPOJO> userActionLogs = new ArrayList<>();
        UserActionLogPOJO userActionLog1 = new UserActionLogPOJO();
        userActionLog1.setUserID("userID1");
        userActionLog1.setProductID("productID3");
        userActionLog1.setProductPrice(10);
        userActionLogs.add(userActionLog1);

        UserActionLogPOJO userActionLog2 = new UserActionLogPOJO();
        userActionLog2.setUserID("userID2");
        userActionLog2.setProductPrice(10);
        userActionLogs.add(userActionLog2);

        UserActionLogPOJO userActionLog3 = new UserActionLogPOJO();
        userActionLog3.setUserID("userID1");
        userActionLog3.setProductID("productID5");
        userActionLog3.setProductPrice(30);
        userActionLogs.add(userActionLog3);

        DataStreamSource<UserActionLogPOJO> source = env.fromCollection(userActionLogs);

        // 转换: KeyBy对数据重分区
        // 这里, UserActionLog是POJO类型,也可通过keyBy("userID")进行分区
        KeyedStream<UserActionLogPOJO, String> keyedStream = source.keyBy(new KeySelector<UserActionLogPOJO, String>() {
            @Override
            public String getKey(UserActionLogPOJO value) throws Exception {
                return value.getUserID();
            }
        });

        // 转换: Aggregate并输出
        // 滚动求和并输出
        //keyedStream.sum("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.max("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.maxBy("productPrice").print();
        // 滚动求最小值并输出
        //keyedStream.min("productPrice").print();
        // 滚动求最小值并输出
        //keyedStream.minBy("productPrice").print();

        env.execute();
    }
}

union

union可以将多个流合并到一个流中,以便对合并的流进行统一处理。是对多个流的水平拼接。

参与合并的流必须是同一种类型。

举例:

package operators;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

//这个例子是将三个socket端口发送来的数据合并到一个流中
//可以对这三个流发送来的数据,集中处理。
public class UnionDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        DataStream<String> textStream9002 = env.socketTextStream("localhost", 9002, "\n");

        DataStream<String> mapStream9000=textStream9000.map(s->"来自9000端口:"+s);
        DataStream<String> mapStream9001=textStream9001.map(s->"来自9001端口:"+s);
        DataStream<String> mapStream9002=textStream9002.map(s->"来自9002端口:"+s);

        //3.union用来合并两个或者多个流的数据,统一到一个流中
        DataStream<String> result =  mapStream9000.union(mapStream9001,mapStream9002);

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

join

根据指定的Key将两个流进行关联。

举例:

package operators;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        //将输入处理一下,变为tuple2
        DataStream<Tuple2<String,String>> mapStream9000=textStream9000
                .map(new MapFunction<String, Tuple2<String,String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s,"来自9000端口:"+s);
                    }
                });

        DataStream<Tuple2<String,String>> mapStream9001=textStream9001
                .map(new MapFunction<String, Tuple2<String,String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s,"来自9001端口:"+s);
                    }
                });

        //3.两个流进行join操作,是inner join,关联上的才能保留下来
        DataStream<String> result =  mapStream9000.join(mapStream9001)
                //关联条件,以第0列关联(两个source输入的字符串)
                .where(t1->t1.getField(0)).equalTo(t2->t2.getField(0))
                //以处理时间,每10秒一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                //关联后输出
                .apply((t1,t2)->t1.getField(1)+"|"+t2.getField(1))
                ;

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

coGroup

关联两个流,关联不上的也保留下来。

举例:

package operators;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class CoGroupDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        //将输入处理一下,变为tuple2
        DataStream<Tuple2<String, String>> mapStream9000 = textStream9000
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s, "来自9000端口:" + s);
                    }
                });

        DataStream<Tuple2<String, String>> mapStream9001 = textStream9001
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s, "来自9001端口:" + s);
                    }
                });

        //3.两个流进行coGroup操作,没有关联上的也保留下来,功能更强大
        DataStream<String> result = mapStream9000.coGroup(mapStream9001)
                //关联条件,以第0列关联(两个source输入的字符串)
                .where(t1 -> t1.getField(0)).equalTo(t2 -> t2.getField(0))
                //以处理时间,每10秒一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                //关联后输出
                .apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<String> collector) throws Exception {
                        StringBuffer stringBuffer = new StringBuffer();
                        stringBuffer.append("来自9000的stream:");
                        for (Tuple2<String, String> item : iterable) {
                            stringBuffer.append(item.f1 + ",");
                        }
                        stringBuffer.append("来自9001的stream:");
                        for (Tuple2<String, String> item : iterable1) {
                            stringBuffer.append(item.f1 + ",");
                        }
                        collector.collect(stringBuffer.toString());
                    }
                });

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

connect

参考:https://www.jianshu.com/p/5b0574d466f8

将两个流纵向地连接起来。DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。

举例:

package operators;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        //转为Integer类型流
        DataStream<Integer> intStream = textStream9000.filter(s -> isNumeric(s)).map(s -> Integer.valueOf(s));
        //连接起来,分别处理,返回同样的一种类型。
        SingleOutputStreamOperator result = intStream.connect(textStream9001)
                .map(new CoMapFunction<Integer, String, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> map1(Integer value) throws Exception {
                        return Tuple2.of(value, "");
                    }

                    @Override
                    public Tuple2<Integer, String> map2(String value) throws Exception {
                        return Tuple2.of(null, value);
                    }
                });
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }

    private static boolean isNumeric(String str) {
        Pattern pattern = Pattern.compile("[0-9]*");
        Matcher isNum = pattern.matcher(str);
        if (!isNum.matches()) {
            return false;
        }
        return true;
    }
}

split

参考:https://cloud.tencent.com/developer/article/1382892

将一个流拆分为多个流。

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SplitDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        SplitStream<Tuple2<String, Integer>> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .split(t -> {
                    List<String> list = new ArrayList<>();
                    //根据逻辑拆分,并定义outputName
                    if (isNumeric(t.f0)) {
                        list.add("num");
                    } else {
                        list.add("str");
                    }
                    return list;
                });
        //选择指定名称的流
        DataStream<Tuple2<String, Integer>> strSplitStream = result.select("str")
                .map(t -> Tuple2.of("字符串:" + t.f0, t.f1))
                .returns(Types.TUPLE(Types.STRING,Types.INT));
        //选择指定名称的流
        DataStream<Tuple2<String, Integer>> intSplitStream = result.select("num")
                .map(t -> Tuple2.of("数字:" + t.f0, t.f1))
                .returns(Types.TUPLE(Types.STRING,Types.INT));

        //4.打印输出sink
        strSplitStream.print();
        intSplitStream.print();
        //5.开始执行
        env.execute();
    }

    private static boolean isNumeric(String str) {
        Pattern pattern = Pattern.compile("[0-9]*");
        Matcher isNum = pattern.matcher(str);
        if (!isNum.matches()) {
            return false;
        }
        return true;
    }
}

原文链接