0%

以下列举下常用的算子,用到的代码例子都是Flink监听9000端口做为数据源。以下方法可以启动一个9000的socket端口服务。

Linux平台上可以使用

bash
nc -lk 9000

如果是 Windows 平台,可以通过 https://nmap.org/ncat/ 安装 ncat 然后运行:

bash
ncat -lk 9000

map

map可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素。

举例1:

package operators;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

//这个例子是监听9000 socket端口,对于发送来的数据,以\n为分隔符分割后进行处理,
//将分割后的每个元素,添加上一个字符串后,打印出来。
public class MapDemo {
    private static int index = 1;
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.map操作。
        DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

举例2:

package com.bigdata.flink.dataStreamMapOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Map: 一对一转换
 */
public class DataStreamMapOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 商品的价格乘以8
        SingleOutputStreamOperator<UserAction> result = source.map(new MapFunction<UserAction, UserAction>() {
            @Override
            public UserAction map(UserAction value) throws Exception {

                int newPrice = value.getProductPrice() * 8;
                return new UserAction(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
            }
        });

        // 输出: 输出到控制台
        // UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=80)
        // UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=64)
        result.print();

        env.execute();
    }
}

flatmap

flatmap可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。

举例1:

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//这个例子是用Flink监听9000端口,将接受的字符串用\n分割为一个个的元素
//然后将每个元素拆为一个个的字符,并打印出来
public class FlatMapDemo {
    private static int index1 = 1;
    private static int index2 = 1;

    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.flatMap操作,对每一行字符串进行分割
        DataStream<String> result = textStream.flatMap((String s, Collector<String> collector) -> {
            for (String str : s.split("")) {
                collector.collect(str);
            }
        })
        //这个地方要注意,在flatMap这种参数里有泛型算子中。
        //如果用lambda表达式,必须将参数的类型显式地定义出来。
        //并且要有returns,指定返回的类型
        //详情可以参考Flink官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/java_lambdas.html
        .returns(Types.STRING);

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

举例2:

package com.bigdata.flink.dataStreamFlatMapOperator;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Summary:
 *      FlatMap: 一行变任意行(0~多行)
 */
public class DataStreamFlatMapOperator {
  public static void main(String[] args) throws Exception{
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

      // 输入: 英文电影台词
      DataStreamSource<String> source = env
              .fromElements(
                      "You jump I jump",
                      "Life was like a box of chocolates"
              );

      // 转换: 将包含chocolates的句子转换为每行一个单词
      SingleOutputStreamOperator<String> result = source.flatMap(new FlatMapFunction<String, String>() {
          @Override
          public void flatMap(String value, Collector<String> out) throws Exception {
              if(value.contains("chocolates")){
                  String[] words = value.split(" ");
                  for (String word : words) {
                      out.collect(word);
                  }
              }
          }
      });

      // 输出: 输出到控制台
      // Life
      // was
      // like
      // a
      // box
      // of
      // chocolates
      result.print();

      env.execute();
  }
}

filter

filter是进行筛选。

举例:

package operators;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FilterDemo {
    private static int index = 1;
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.filter操作,筛选非空行。
        DataStream<String> result = textStream.filter(line->!line.trim().equals(""));
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamFilterOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Fliter: 过滤出需要的数据
 */
public class DataStreamFilterOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 过滤: 过滤出用户ID为userID1的用户行为
        SingleOutputStreamOperator<UserAction> result = source.filter(new FilterFunction<UserAction>() {
            @Override
            public boolean filter(UserAction value) throws Exception {
                return value.getUserID().equals("userID1");
            }
        });

        // 输出: 输出到控制台
        // UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=10)
        // UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=10)
        result.print();

        env.execute();

    }
}

keyBy

KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。

注意:

  • 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
  • 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
  • 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
  • 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {…})指定字段进行分区。

举例:

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.concurrent.TimeUnit;

//这个例子是每行输入一个单词,以单词为key进行计数
//每10秒统计一次每个单词的个数
public class KeyByDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        DataStream<Tuple2<String, Integer>> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                //keyBy进行分区,按照第一列,也就是按照单词进行分区
                .keyBy(0)
                //指定窗口,每10秒个计算一次
                .timeWindow(Time.of(10, TimeUnit.SECONDS))
                //计算个数,计算第1列
                .sum(1);
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamKeyByOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。
 */
public class DataStreamKeyByOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 按指定的Key(这里,用户ID)对数据重分区,将相同Key(用户ID)的数据分到同一个分区
        KeyedStream<UserAction, String> result = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserID();
            }
        });

        // 输出: 输出到控制台
        //3> UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=10)
        //3> UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=10)
        //2> UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=8)
        result.print().setParallelism(3);

        env.execute();

    }
}

reduce

reduce是归并操作,它可以将KeyedStream 转变为 DataStream。

Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
注意: Reduce会输出每一次滚动聚合的结果。

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.concurrent.TimeUnit;

//这个例子是对流进行分组,分组后进归并操作。
//是wordcount的另外一种实现方法
public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        DataStream<Tuple2<String, Integer>> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                //keyBy进行分区,按照第一列,也就是按照单词进行分区
                .keyBy(0)
                //指定窗口,每10秒个计算一次
                .timeWindow(Time.of(10, TimeUnit.SECONDS))
                //对每一组内的元素进行归并操作,即第一个和第二个归并,结果再与第三个归并...
                .reduce((Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) -> new Tuple2(t1.f0, t1.f1 + t2.f1));

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamReduceOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *     Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
 */
public class DataStreamReduceOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID2", 1293984002, "browse", "productID2", 8),
                new UserAction("userID2", 1293984003, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10),
                new UserAction("userID1", 1293984003, "click", "productID3", 10),
                new UserAction("userID1", 1293984004, "click", "productID1", 10)
        ));

        // 转换: KeyBy对数据重分区
        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserID();
            }
        });

        // 转换: Reduce滚动聚合。这里,滚动聚合每个用户对应的商品总价格。
        SingleOutputStreamOperator<UserAction> result = keyedStream.reduce(new ReduceFunction<UserAction>() {
            @Override
            public UserAction reduce(UserAction value1, UserAction value2) throws Exception {
                int newProductPrice = value1.getProductPrice() + value2.getProductPrice();
                return new UserAction(value1.getUserID(), -1, "", "", newProductPrice);
            }
        });

        // 输出: 将每次滚动聚合后的结果输出到控制台。
        //3> UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=8)
        //3> UserAction(userID=userID2, eventTime=-1, eventType=, productID=, productPrice=16)
        //3> UserAction(userID=userID2, eventTime=-1, eventType=, productID=, productPrice=24)
        //4> UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=10)
        //4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=20)
        //4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=30)
        //4> UserAction(userID=userID1, eventTime=-1, eventType=, productID=, productPrice=40)
        result.print();

        env.execute();
    }
}

fold

基于初始值和FoldFunction进行滚动折叠(Fold),并向下游算子输出每次滚动折叠后的结果。
注意: Fold会输出每一次滚动折叠的结果。

举例1:

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.concurrent.TimeUnit;

public class FoldDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        DataStream<String> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                //keyBy进行分区,按照第一列,也就是按照单词进行分区
                .keyBy(0)
                //指定窗口,每10秒个计算一次
                .timeWindow(Time.of(10, TimeUnit.SECONDS))
                //指定一个开始的值,对每一组内的元素进行归并操作,即第一个和第二个归并,结果再与第三个归并...
                .fold("结果:",(String current, Tuple2<String, Integer> t2) -> current+t2.f0+",");

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}
package com.bigdata.flink.dataStreamFoldOperator;

import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Fold: 基于初始值和自定义的FoldFunction滚动折叠后发出新值
 */
public class DataStreamFoldOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID2", 1293984002, "browse", "productID2", 8),
                new UserAction("userID2", 1293984003, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10),
                new UserAction("userID1", 1293984003, "click", "productID3", 10),
                new UserAction("userID1", 1293984004, "click", "productID1", 10)
        ));

        // 转换: KeyBy对数据重分区
        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserID();
            }
        });

        // 转换: Fold 基于初始值和FoldFunction滚动折叠
        SingleOutputStreamOperator<String> result = keyedStream.fold("浏览的商品及价格:", new FoldFunction<UserAction, String>() {
            @Override
            public String fold(String accumulator, UserAction value) throws Exception {
                if(accumulator.startsWith("userID")){
                    return accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
                }else {
                    return value.getUserID()+" " +accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
                }
            }
        });

        // 输出: 输出到控制台
        // 每一条数据都会触发计算并输出
        // userID1 浏览的商品及价格: -> productID1:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10 -> productID3:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10 -> productID3:10 -> productID1:10
        // userID2 浏览的商品及价格: -> productID2:8
        // userID2 浏览的商品及价格: -> productID2:8 -> productID2:8
        // userID2 浏览的商品及价格: -> productID2:8 -> productID2:8 -> productID2:8
        result.print();

        env.execute();

    }
}

Aggregate

Aggregate 对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果。默认的聚合函数有:sum、min、minBy、max、mabBy。

注意:

  • max(field)与maxBy(field)的区别: maxBy返回field最大的那条数据;而max则是将最大的field的值赋值给第一条数据并返回第一条数据。同理,min与minBy。
  • Aggregate聚合算子会滚动输出每一次聚合后的结果。
package com.bigdata.flink.dataStreamAggregateOperator;

import com.bigdata.flink.beans.UserActionLogPOJO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * Summary:
 *     Aggregate: min()、minBy()、max()、maxBy() 滚动聚合并输出每次滚动聚合后的结果
 */
public class DataStreamAggregateOperator {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品,以及商品的价格。
        ArrayList<UserActionLogPOJO> userActionLogs = new ArrayList<>();
        UserActionLogPOJO userActionLog1 = new UserActionLogPOJO();
        userActionLog1.setUserID("userID1");
        userActionLog1.setProductID("productID3");
        userActionLog1.setProductPrice(10);
        userActionLogs.add(userActionLog1);

        UserActionLogPOJO userActionLog2 = new UserActionLogPOJO();
        userActionLog2.setUserID("userID2");
        userActionLog2.setProductPrice(10);
        userActionLogs.add(userActionLog2);

        UserActionLogPOJO userActionLog3 = new UserActionLogPOJO();
        userActionLog3.setUserID("userID1");
        userActionLog3.setProductID("productID5");
        userActionLog3.setProductPrice(30);
        userActionLogs.add(userActionLog3);

        DataStreamSource<UserActionLogPOJO> source = env.fromCollection(userActionLogs);

        // 转换: KeyBy对数据重分区
        // 这里, UserActionLog是POJO类型,也可通过keyBy("userID")进行分区
        KeyedStream<UserActionLogPOJO, String> keyedStream = source.keyBy(new KeySelector<UserActionLogPOJO, String>() {
            @Override
            public String getKey(UserActionLogPOJO value) throws Exception {
                return value.getUserID();
            }
        });

        // 转换: Aggregate并输出
        // 滚动求和并输出
        //keyedStream.sum("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.max("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.maxBy("productPrice").print();
        // 滚动求最小值并输出
        //keyedStream.min("productPrice").print();
        // 滚动求最小值并输出
        //keyedStream.minBy("productPrice").print();

        env.execute();
    }
}

union

union可以将多个流合并到一个流中,以便对合并的流进行统一处理。是对多个流的水平拼接。

参与合并的流必须是同一种类型。

举例:

package operators;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

//这个例子是将三个socket端口发送来的数据合并到一个流中
//可以对这三个流发送来的数据,集中处理。
public class UnionDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        DataStream<String> textStream9002 = env.socketTextStream("localhost", 9002, "\n");

        DataStream<String> mapStream9000=textStream9000.map(s->"来自9000端口:"+s);
        DataStream<String> mapStream9001=textStream9001.map(s->"来自9001端口:"+s);
        DataStream<String> mapStream9002=textStream9002.map(s->"来自9002端口:"+s);

        //3.union用来合并两个或者多个流的数据,统一到一个流中
        DataStream<String> result =  mapStream9000.union(mapStream9001,mapStream9002);

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

join

根据指定的Key将两个流进行关联。

举例:

package operators;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        //将输入处理一下,变为tuple2
        DataStream<Tuple2<String,String>> mapStream9000=textStream9000
                .map(new MapFunction<String, Tuple2<String,String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s,"来自9000端口:"+s);
                    }
                });

        DataStream<Tuple2<String,String>> mapStream9001=textStream9001
                .map(new MapFunction<String, Tuple2<String,String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s,"来自9001端口:"+s);
                    }
                });

        //3.两个流进行join操作,是inner join,关联上的才能保留下来
        DataStream<String> result =  mapStream9000.join(mapStream9001)
                //关联条件,以第0列关联(两个source输入的字符串)
                .where(t1->t1.getField(0)).equalTo(t2->t2.getField(0))
                //以处理时间,每10秒一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                //关联后输出
                .apply((t1,t2)->t1.getField(1)+"|"+t2.getField(1))
                ;

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

coGroup

关联两个流,关联不上的也保留下来。

举例:

package operators;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class CoGroupDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        //将输入处理一下,变为tuple2
        DataStream<Tuple2<String, String>> mapStream9000 = textStream9000
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s, "来自9000端口:" + s);
                    }
                });

        DataStream<Tuple2<String, String>> mapStream9001 = textStream9001
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        return Tuple2.of(s, "来自9001端口:" + s);
                    }
                });

        //3.两个流进行coGroup操作,没有关联上的也保留下来,功能更强大
        DataStream<String> result = mapStream9000.coGroup(mapStream9001)
                //关联条件,以第0列关联(两个source输入的字符串)
                .where(t1 -> t1.getField(0)).equalTo(t2 -> t2.getField(0))
                //以处理时间,每10秒一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                //关联后输出
                .apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<String> collector) throws Exception {
                        StringBuffer stringBuffer = new StringBuffer();
                        stringBuffer.append("来自9000的stream:");
                        for (Tuple2<String, String> item : iterable) {
                            stringBuffer.append(item.f1 + ",");
                        }
                        stringBuffer.append("来自9001的stream:");
                        for (Tuple2<String, String> item : iterable1) {
                            stringBuffer.append(item.f1 + ",");
                        }
                        collector.collect(stringBuffer.toString());
                    }
                });

        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }
}

connect

参考:https://www.jianshu.com/p/5b0574d466f8

将两个流纵向地连接起来。DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。

举例:

package operators;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
        DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
        //转为Integer类型流
        DataStream<Integer> intStream = textStream9000.filter(s -> isNumeric(s)).map(s -> Integer.valueOf(s));
        //连接起来,分别处理,返回同样的一种类型。
        SingleOutputStreamOperator result = intStream.connect(textStream9001)
                .map(new CoMapFunction<Integer, String, Tuple2<Integer, String>>() {
                    @Override
                    public Tuple2<Integer, String> map1(Integer value) throws Exception {
                        return Tuple2.of(value, "");
                    }

                    @Override
                    public Tuple2<Integer, String> map2(String value) throws Exception {
                        return Tuple2.of(null, value);
                    }
                });
        //4.打印输出sink
        result.print();
        //5.开始执行
        env.execute();
    }

    private static boolean isNumeric(String str) {
        Pattern pattern = Pattern.compile("[0-9]*");
        Matcher isNum = pattern.matcher(str);
        if (!isNum.matches()) {
            return false;
        }
        return true;
    }
}

split

参考:https://cloud.tencent.com/developer/article/1382892

将一个流拆分为多个流。

package operators;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SplitDemo {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境配置信息
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.定义加载或创建数据源(source),监听9000端口的socket消息
        DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
        //3.
        SplitStream<Tuple2<String, Integer>> result = textStream
                //map是将每一行单词变为一个tuple2
                .map(line -> Tuple2.of(line.trim(), 1))
                //如果要用Lambda表示是,Tuple2是泛型,那就得用returns指定类型。
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .split(t -> {
                    List<String> list = new ArrayList<>();
                    //根据逻辑拆分,并定义outputName
                    if (isNumeric(t.f0)) {
                        list.add("num");
                    } else {
                        list.add("str");
                    }
                    return list;
                });
        //选择指定名称的流
        DataStream<Tuple2<String, Integer>> strSplitStream = result.select("str")
                .map(t -> Tuple2.of("字符串:" + t.f0, t.f1))
                .returns(Types.TUPLE(Types.STRING,Types.INT));
        //选择指定名称的流
        DataStream<Tuple2<String, Integer>> intSplitStream = result.select("num")
                .map(t -> Tuple2.of("数字:" + t.f0, t.f1))
                .returns(Types.TUPLE(Types.STRING,Types.INT));

        //4.打印输出sink
        strSplitStream.print();
        intSplitStream.print();
        //5.开始执行
        env.execute();
    }

    private static boolean isNumeric(String str) {
        Pattern pattern = Pattern.compile("[0-9]*");
        Matcher isNum = pattern.matcher(str);
        if (!isNum.matches()) {
            return false;
        }
        return true;
    }
}

原文链接

Java8出来之后,lambda表达式由于简单易读,在流式计算中的使用开始变得普遍。

同样,Flink也支持lambda表达式,例如我们改写一下wordcount样例

DataSource<String> lines = env.fromElements(
    "Apache Flink is a community-driven open source framework for distributed big data analytics,",
    "like Hadoop and Spark. The core of Apache Flink is a distributed streaming dataflow engine written",
            ...
);
lines.flatMap(new FlatMapFunction<String, Object>() {
    @Override
    public void flatMap(String line, Collector< Object> out) throws Exception {
        for (String word : line.split("\\W+")) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}).groupBy(0).sum(1).print();

这段代码很简单,先把每一行按空格拆分成若干单词,并将每个单词和数字1组成一个Tuple,然后把所有Tuple按照单词聚合,计算出每个单词的出现次数

尝试用lambda表达式来替换FlatMapFunction,代码如下

lines.flatMap((line, out) -> {
    for (String word : line.split("\\W+")) {
        out.collect(new Tuple2<>(word, 1));
    }
}).groupBy(0).sum(1).print();

但当运行这段代码时,会抛出如下异常:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1653)
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1639)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:573)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:188)
    at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
    at TestFlink.main(TestFlink.java:21)

这是因为Flink在用户自定义的函数中会使用泛型来创建serializer,当我们使用匿名函数时,类型信息会被保留。但Lambda表达式并不是匿名函数,所以javac编译的时候并不会把泛型保存到class文件里。

解决办法有两种:

第一种办法在异常中已经提示,使用Eclipse JDT编译器会保留对lambda表达式来说必要的类型信息。在Maven中使用Eclipse JDT编译器,只需要在把下面的插件加入到pom.xml中

<plugins>
    <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <compilerId>jdt</compilerId>
        </configuration>
        <dependencies>
            <dependency>
                <groupId>org.eclipse.tycho</groupId>
                <artifactId>tycho-compiler-jdt</artifactId>
                <version>0.21.0</version>
            </dependency>
        </dependencies>
    </plugin>
</plugins>

另一种办法是,使用Flink提供的returns方法来指定flatMap的返回类型,

text.flatMap((line, out) -> {
    for (String word : line.split("\\W+")) {
        out.collect(new Tuple2<>(word, 1));
    }
}).returns((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class)).groupBy(0).sum(1)
        .print();
text.flatMap((line, out) -> {
    for (String word : line.split("\\W+")) {
        out.collect(new Tuple2<>(word, 1));
    }
}).returns(Types.TUPLE(Types.STRING, Types.INT)).groupBy(0).sum(1)
        .print();

returns函数接收TypeInformation类型的参数,这里我们创建TupleTypeInfo来指定Tuple的参数类型。

操作概览

操作概览

本文主要讲flink中多条数据流转换操作:join、union、connect

join

批处理经常要解决的问题是将两个数据源做关联Join操作。比如,很多手机APP都有一个用户数据源User,同时APP会记录用户的行为,我们称之为Behavior,两个表按照userId来进行Join。在流处理场景下,Flink也支持了Join,只不过Flink是在一个时间窗口上来进行两个表的Join。
操作概览
目前,Flink支持了两种Join:Window Join(窗口连接)和Interval Join(时间间隔连接)。

Window Join

从名字中能猜到,Window Join主要在Flink的窗口上进行操作,它将两个流中落在相同窗口的元素按照某个Key进行Join。一个Window Join的大致骨架结构为:

input1.join(input2)
    .where(<KeySelector>)      <- input1使用哪个字段作为Key
    .equalTo(<KeySelector>)    <- input2使用哪个字段作为Key
    .window(<WindowAssigner>)  <- 指定WindowAssigner
    [.trigger(<Trigger>)]      <- 指定Trigger(可选)
    [.evictor(<Evictor>)]      <- 指定Evictor(可选)
    .apply(<JoinFunction>)     <- 指定JoinFunction

下图展示了Join的大致过程。两个输入数据流先分别按Key进行分组,然后将元素划分到窗口中。窗口的划分需要使用WindowAssigner来定义,这里可以使用Flink提供的滚动窗口、滑动窗口或会话窗口等默认的WindowAssigner。随后两个数据流中的元素会被分配到各个窗口上,也就是说一个窗口会包含来自两个数据流的元素。相同窗口内的数据会以INNER JOIN的语义来相互关联,形成一个数据对。当窗口的时间结束,Flink会调用JoinFunction来对窗口内的数据对进行处理。当然,我们也可以使用Trigger或Evictor做一些自定义优化,他们的使用方法和普通窗口的使用方法一样。
join

接下来我们重点分析一下两个数据流是如何INNER JOIN的:
join
一般滴,INNER JOIN只对两个数据源都出现的元素做Join,形成一个数据对,即数据源input1中的某个元素与数据源input2中的所有元素逐个配对。当数据源某个窗口内没数据时,比如图中的第三个窗口,Join的结果也是空的。

如果INNER JOIN不能满足我们的需求,CoGroupFunction提供了更多可自定义的功能。需要注意的是,在调用时,要写成input1.coGroup(input2).where(<KeySelector>).equalTo(<KeySelecotr>)

Interval Join

与Window Join不同,Interval Join不依赖Flink的WindowAssigner,而是根据一个时间间隔(Interval)界定时间。Interval需要一个时间下界(lower bound)和上界(upper bound),如果我们将input1和input2进行Interval Join,input1中的某个元素为input1.element1,时间戳为input1.element1.ts,那么一个Interval就是[input1.element1.ts + lower bound, input1.element1.ts + upper bound],input2中落在这个时间段内的元素将会和input1.element1组成一个数据对。

用数学公式表达为,凡是符合下面公式input1.element1.ts + lower bound <= input2.elementx.ts <=input1.element1.ts + upper bound的元素使用INNER JOIN语义,两两组合在一起。上下界可以是正数也可以是负数。

注意,目前Flink(1.9)的Interval Join只支持Event Time语义。

join

默认的时间间隔是包含上下界的,我们可以使用.lowerBoundExclusive().upperBoundExclusive来确定是否需要包含上下界。

val intervalJoinResult = input1.keyBy(_._1)
      .intervalJoin(input2.keyBy(_._1))
      .between(Time.milliseconds(-5), Time.milliseconds(10))
      .upperBoundExclusive()
      .lowerBoundExclusive()
      .process(new MyProcessFunction)

Interval Join内部是用缓存来存储所有数据的,因此需要注意缓存数据不能太大,以免对内存造成绝大压力。

union

DataStream上使用union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图union对白色和深色两个数据流进行合并,生成一个数据流。

union

connect

union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  • connect只能连接两个数据流union可以连接多个数据流
  • connect所连接的两个数据流的数据类型可以不一致union所连接的两个数据流的数据类型必须一致
  • 两个DataStream经过connect之后被转化为ConnectedStreamsConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态

connect经常被应用在对一个数据流使用另外一个流进行控制处理的场景上,如下图所示。控制流可以是阈值、规则、机器学习模型或其他参数。
connect

  • 对于ConnectedStreams,我们需要重写CoMapFunction或CoFlatMapFunction。这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。
  • 在重写函数时,对于CoMapFunction,map1处理第一个流的数据,map2处理第二个流的数据;对于CoFlatMapFunction,flatMap1处理第一个流的数据,flatMap2处理第二个流的数据。
  • Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流数据的流入先后顺序,即第一个数据流有数据到达时,map1或flatMap1会被调用,第二个数据流有数据到达时,map2或flatMap2会被调用。
  • Flink允许我们将connect和keyBy或broadcast结合起来使用。

原文链接

转换关系总图

转换关系

DataStream

DataStream 是 Flink 流处理 API 中最核心的数据结构。它代表了一个运行在多个分区上的并行流。一个 DataStream 可以从 StreamExecutionEnvironment 通过env.addSource(SourceFunction) 获得。

DataStream 上的转换操作都是逐条的,比如 map(),flatMap(),filter()。DataStream 也可以执行 rebalance(再平衡,用来减轻数据倾斜)和 broadcaseted(广播)等分区转换。

KeyedStream

KeyedStream用来表示根据指定的key进行分组的数据流。一个KeyedStream可以通过调用DataStream.keyBy()来获得。而在KeyedStream上进行任何transformation都将转变回DataStream。在实现中,KeyedStream是把key的信息写入到了transformation中。每条记录只能访问所属key的状态,其上的聚合函数可以方便地操作和保存对应key的状态。

WindowedStream & AllWindowedStream

WindowedStream代表了根据key分组,并且基于WindowAssigner切分窗口的数据流。所以WindowedStream都是从KeyedStream衍生而来的。而在WindowedStream上进行任何transformation也都将转变回DataStream。

在key分组的流上进行窗口切分是比较常用的场景,也能够很好地并行化(不同的key上的窗口聚合可以分配到不同的task去处理)。不过有时候我们也需要在普通流上进行窗口的操作,这就是 AllWindowedStream。AllWindowedStream是直接在DataStream上进行windowAll(…)操作。

Flink 的窗口实现中会将到达的数据缓存在对应的窗口buffer中(一个数据可能会对应多个窗口)。当到达窗口发送的条件时(由Trigger控制),Flink 会对整个窗口中的数据进行处理。Flink 在聚合类窗口有一定的优化,即不会保存窗口中的所有值,而是每到一个元素执行一次聚合函数,最终只保存一份数据即可。

window

JoinedStreams & CoGroupedStreams

ConnectedStreams

参考链接

http://wuchong.me/

::用法

jdk8中使用了::的用法。就是把方法当做参数传到stream内部,使stream的每个元素都传入到该方法里面执行一下,双冒号运算就是Java中的[方法引用],[方法引用]的格式是:

类名::方法名

例如:

表达式:

person -> person.getAge();

使用双冒号:

Person :: getAge

stream和parallelStream

用于生成数据流

List<String> strings = Arrays.asList("abc", "", "bc", "efg", "abcd","", "jkl");
//生成顺序流
strings.stream()
//生成并行流
strings.parallelStream()

forEach

用于迭代数据流中每个数据

List<String> strings = Arrays.asList("abc", "", "bc", "efg", "abcd","", "jkl");
strings.stream().forEach(System.out::println);

map

对数据流中每个数据执行方法

List<Integer> strings = Arrays.asList(1,2,3,4,5,6,7,8);
// 每个数据加1操作
strings.stream().map(x -> x + 1).forEach(System.out::println);

distinct

去重

List<Integer> strings = Arrays.asList(1,2,3,4,4,5,5,5,6,7,8);
// 去除重复的4,5,5
strings.stream().distinct().forEach(System.out::println);

filter

过滤

List<String>strings = Arrays.asList("abc", "", "bc", "efg", "abcd","", "jkl");
// 获取空字符串的数量
long count = strings.stream().filter(string -> string.isEmpty()).count();
//获取非空数量
long count = strings.stream().filter(string -> !string.isEmpty()).count();

limit

限制流的大小

List<Integer> strings = Arrays.asList(1,2,3,4,4,5,5,5,6,7,8);
// 只会打印前4个
strings.stream().limit(4).forEach(System.out::println);

sorted

排序

List<Integer> strings = Arrays.asList(11,12,13,14,4,15,5,5,6,7,8);
// 排序打印
strings.stream().sorted().forEach(System.out::println);

Collectors

归约操作

List<String>strings = Arrays.asList("abc", "", "bc", "efg", "abcd","", "jkl");
List<String> filtered = strings.stream().filter(string -> !string.isEmpty()).collect(Collectors.toList());
 
System.out.println("筛选列表: " + filtered);
String mergedString = strings.stream().filter(string -> !string.isEmpty()).collect(Collectors.joining(", "));
System.out.println("合并字符串: " + mergedString);

summaryStatistics

统计

List<Integer> numbers = Arrays.asList(3, 2, 2, 3, 7, 3, 5);
IntSummaryStatistics stats = numbers.stream().mapToInt((x) -> x).summaryStatistics();
 
System.out.println("列表中最大的数 : " + stats.getMax());
System.out.println("列表中最小的数 : " + stats.getMin());
System.out.println("所有数之和 : " + stats.getSum());
System.out.println("平均数 : " + stats.getAverage());

参考链接

参考1

一些关键字的解释说明

.split(“\s”)

\\s表示空格、回车、换行等空白符
\\s+表示一个或多个空格、回车、换行等空白符

.split(“\w+”)

表示匹配数字和字母下划线的多个字符

tuple元组

tuple是flink中自定义的一种组合类型,类似java中Map<String,String>,只不过Map只有两个字段,相当于Tuple2

flink中tuple最多支持25个字段,不支持空字段.

复合类型有:

  • Flink Java Tuples(Flink Java API的一部分):最多25个字段,空字段不支持
  • Scala Case classes(包括Scala tuples):最多25个字段,空字段不支持
  • Row:具有任意数量字段的元组,并支持空字段
  • POJO:遵循某种Bean模式的类

tuple使用

最简单的情况是在元组的一个或多个字段上对元组进行分组:

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

元组在第一个字段(整数类型)上分组。

DataStream<Tuple3<Integer,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。

关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如:

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;

指定keyBy(0)将导致系统使用full Tuple2作为键(以Integer和Float为键)。

更多参考官网

map和flatMap

map

把数组流中的每一个值,使用所提供的函数执行一遍,一一对应。得到元素个数相同的数组流。
map

map算子对一个DataStream中的每个元素使用用户自定义的map函数进行处理,每个输入元素对应一个输出元素,最终整个数据流被转换成一个新的DataStream。输出的数据流DataStream[OUT]类型可能和输入的数据流DataStream[IN]不同。

flatMap

flat是扁平的意思。它把数组流中的每一个值,使用所提供的函数执行一遍,一一对应。得到元素相同的数组流。只不过,里面的元素也是一个子数组流。把这些子数组合并成一个数组以后,元素个数大概率会和原数组流的个数不同。
flatMap

filter

filter算子对每个元素进行过滤,过滤的过程使用一个filter函数进行逻辑判断。对于输入的每个元素,如果filter函数返回True,则保留,如果返回False,则丢弃。
filter

keyBy

绝大多数情况,我们要根据事件的某种属性或数据的某个字段进行分组,对一个分组内的数据进行处理。如下图所示,keyBy算子根据元素的形状对数据进行分组,相同形状的元素被分到了一起,可被后续算子统一处理。比如,多支股票数据流处理时,可以根据股票代号进行分组,然后对同一股票代号的数据统计其价格变动。又如,电商用户行为日志把所有用户的行为都记录了下来,如果要分析某一个用户行为,需要先按用户ID进行分组。
keyby

aggregation

常见的聚合操作有sum、max、min等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟keyBy相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。

与批处理不同,这些聚合函数是对流数据进行数据,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。

  • max算子对该字段求最大值,并将结果保存在该字段上。对于其他字段,该操作并不能保证其数值。
  • maxBy算子对该字段求最大值,maxBy与max的区别在于,maxBy同时保留其他字段的数值,即maxBy可以得到数据流中最大的元素。
  • 同样,min和minBy的区别在于,min算子对某字段求最小值,minBy返回具有最小值的元素。

其实,这些aggregation操作里已经封装了状态数据,比如,sum算子内部记录了当前的和,max算子内部记录了当前的最大值。由于内部封装了状态数据,而且状态数据并不会被清理,因此一定要避免在一个无限数据流上使用aggregation。

注意,对于一个KeyedStream,一次只能使用一个aggregation操作,无法链式使用多个。

reduce

前面几个aggregation是几个较为特殊的操作,对分组数据进行处理更为通用的方法是使用reduce算子。
reduce

上图展示了reduce算子的原理:reduce在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。

DataStream<Tuple2<String, Integer>> counts =
    // split up the lines in pairs (2-tuples) containing: (word,1)
    text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
        .keyBy(0)
        //ReduceFunction定义了reduce方法,它主要是用来将两个同类型的值操作为一个同类型的值,第一个参数为前面reduce的结果,第二参数为当前的元素
        .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                System.out.println("value1:"+value1.f1+";value2:"+value2.f1);
                return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
            }
        });

参考

皮皮鲁的AI星球