以下列举下常用的算子,用到的代码例子都是Flink监听9000端口做为数据源。以下方法可以启动一个9000的socket端口服务。
Linux平台上可以使用
bash
nc -lk 9000
如果是 Windows 平台,可以通过 https://nmap.org/ncat/
安装 ncat 然后运行:
bash
ncat -lk 9000
map
map可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素。
举例1:
package operators;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MapDemo {
private static int index = 1;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
result.print();
env.execute();
}
}
举例2:
package com.bigdata.flink.dataStreamMapOperator;
import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class DataStreamMapOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
new UserAction("userID1", 1293984000, "click", "productID1", 10),
new UserAction("userID2", 1293984001, "browse", "productID2", 8),
new UserAction("userID1", 1293984002, "click", "productID1", 10)
));
SingleOutputStreamOperator<UserAction> result = source.map(new MapFunction<UserAction, UserAction>() {
@Override
public UserAction map(UserAction value) throws Exception {
int newPrice = value.getProductPrice() * 8;
return new UserAction(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
}
});
result.print();
env.execute();
}
}
flatmap
flatmap可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。
举例1:
package operators;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FlatMapDemo {
private static int index1 = 1;
private static int index2 = 1;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> result = textStream.flatMap((String s, Collector<String> collector) -> {
for (String str : s.split("")) {
collector.collect(str);
}
})
.returns(Types.STRING);
result.print();
env.execute();
}
}
举例2:
package com.bigdata.flink.dataStreamFlatMapOperator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class DataStreamFlatMapOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env
.fromElements(
"You jump I jump",
"Life was like a box of chocolates"
);
SingleOutputStreamOperator<String> result = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
if(value.contains("chocolates")){
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
}
});
result.print();
env.execute();
}
}
filter
filter是进行筛选。
举例:
package operators;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FilterDemo {
private static int index = 1;
public static void main(String[] args) throws Exception {
//1.获取执行环境配置信息
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.定义加载或创建数据源(source),监听9000端口的socket消息
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
//3.filter操作,筛选非空行。
DataStream<String> result = textStream.filter(line->!line.trim().equals(""));
//4.打印输出sink
result.print();
//5.开始执行
env.execute();
}
}
package com.bigdata.flink.dataStreamFilterOperator;
import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class DataStreamFilterOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
new UserAction("userID1", 1293984000, "click", "productID1", 10),
new UserAction("userID2", 1293984001, "browse", "productID2", 8),
new UserAction("userID1", 1293984002, "click", "productID1", 10)
));
SingleOutputStreamOperator<UserAction> result = source.filter(new FilterFunction<UserAction>() {
@Override
public boolean filter(UserAction value) throws Exception {
return value.getUserID().equals("userID1");
}
});
result.print();
env.execute();
}
}
keyBy
KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。
注意:
- 分区结果和KeyBy下游算子的并行度强相关。如下游算子只有一个并行度,不管怎么分,都会分到一起。
- 对于POJO类型,KeyBy可以通过keyBy(fieldName)指定字段进行分区。
- 对于Tuple类型,KeyBy可以通过keyBy(fieldPosition)指定字段进行分区。
- 对于一般类型,如上, KeyBy可以通过keyBy(new KeySelector {…})指定字段进行分区。
举例:
package operators;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.concurrent.TimeUnit;
public class KeyByDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
DataStream<Tuple2<String, Integer>> result = textStream
.map(line -> Tuple2.of(line.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.of(10, TimeUnit.SECONDS))
.sum(1);
result.print();
env.execute();
}
}
package com.bigdata.flink.dataStreamKeyByOperator;
import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class DataStreamKeyByOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
new UserAction("userID1", 1293984000, "click", "productID1", 10),
new UserAction("userID2", 1293984001, "browse", "productID2", 8),
new UserAction("userID1", 1293984002, "click", "productID1", 10)
));
KeyedStream<UserAction, String> result = source.keyBy(new KeySelector<UserAction, String>() {
@Override
public String getKey(UserAction value) throws Exception {
return value.getUserID();
}
});
result.print().setParallelism(3);
env.execute();
}
}
reduce
reduce是归并操作,它可以将KeyedStream 转变为 DataStream。
Reduce: 基于ReduceFunction进行滚动聚合,并向下游算子输出每次滚动聚合后的结果。
注意: Reduce会输出每一次滚动聚合的结果。
package operators;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.concurrent.TimeUnit;
public class ReduceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
DataStream<Tuple2<String, Integer>> result = textStream
.map(line -> Tuple2.of(line.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.of(10, TimeUnit.SECONDS))
.reduce((Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) -> new Tuple2(t1.f0, t1.f1 + t2.f1));
result.print();
env.execute();
}
}
package com.bigdata.flink.dataStreamReduceOperator;
import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class DataStreamReduceOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
new UserAction("userID1", 1293984000, "click", "productID1", 10),
new UserAction("userID2", 1293984001, "browse", "productID2", 8),
new UserAction("userID2", 1293984002, "browse", "productID2", 8),
new UserAction("userID2", 1293984003, "browse", "productID2", 8),
new UserAction("userID1", 1293984002, "click", "productID1", 10),
new UserAction("userID1", 1293984003, "click", "productID3", 10),
new UserAction("userID1", 1293984004, "click", "productID1", 10)
));
KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
@Override
public String getKey(UserAction value) throws Exception {
return value.getUserID();
}
});
SingleOutputStreamOperator<UserAction> result = keyedStream.reduce(new ReduceFunction<UserAction>() {
@Override
public UserAction reduce(UserAction value1, UserAction value2) throws Exception {
int newProductPrice = value1.getProductPrice() + value2.getProductPrice();
return new UserAction(value1.getUserID(), -1, "", "", newProductPrice);
}
});
result.print();
env.execute();
}
}
fold
基于初始值和FoldFunction进行滚动折叠(Fold),并向下游算子输出每次滚动折叠后的结果。
注意: Fold会输出每一次滚动折叠的结果。
举例1:
package operators;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.concurrent.TimeUnit;
public class FoldDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> result = textStream
.map(line -> Tuple2.of(line.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.timeWindow(Time.of(10, TimeUnit.SECONDS))
.fold("结果:",(String current, Tuple2<String, Integer> t2) -> current+t2.f0+",");
result.print();
env.execute();
}
}
package com.bigdata.flink.dataStreamFoldOperator;
import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class DataStreamFoldOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
new UserAction("userID1", 1293984000, "click", "productID1", 10),
new UserAction("userID2", 1293984001, "browse", "productID2", 8),
new UserAction("userID2", 1293984002, "browse", "productID2", 8),
new UserAction("userID2", 1293984003, "browse", "productID2", 8),
new UserAction("userID1", 1293984002, "click", "productID1", 10),
new UserAction("userID1", 1293984003, "click", "productID3", 10),
new UserAction("userID1", 1293984004, "click", "productID1", 10)
));
KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
@Override
public String getKey(UserAction value) throws Exception {
return value.getUserID();
}
});
SingleOutputStreamOperator<String> result = keyedStream.fold("浏览的商品及价格:", new FoldFunction<UserAction, String>() {
@Override
public String fold(String accumulator, UserAction value) throws Exception {
if(accumulator.startsWith("userID")){
return accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
}else {
return value.getUserID()+" " +accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
}
}
});
result.print();
env.execute();
}
}
Aggregate
Aggregate 对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果。默认的聚合函数有:sum、min、minBy、max、mabBy。
注意:
- max(field)与maxBy(field)的区别: maxBy返回field最大的那条数据;而max则是将最大的field的值赋值给第一条数据并返回第一条数据。同理,min与minBy。
- Aggregate聚合算子会滚动输出每一次聚合后的结果。
package com.bigdata.flink.dataStreamAggregateOperator;
import com.bigdata.flink.beans.UserActionLogPOJO;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class DataStreamAggregateOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<UserActionLogPOJO> userActionLogs = new ArrayList<>();
UserActionLogPOJO userActionLog1 = new UserActionLogPOJO();
userActionLog1.setUserID("userID1");
userActionLog1.setProductID("productID3");
userActionLog1.setProductPrice(10);
userActionLogs.add(userActionLog1);
UserActionLogPOJO userActionLog2 = new UserActionLogPOJO();
userActionLog2.setUserID("userID2");
userActionLog2.setProductPrice(10);
userActionLogs.add(userActionLog2);
UserActionLogPOJO userActionLog3 = new UserActionLogPOJO();
userActionLog3.setUserID("userID1");
userActionLog3.setProductID("productID5");
userActionLog3.setProductPrice(30);
userActionLogs.add(userActionLog3);
DataStreamSource<UserActionLogPOJO> source = env.fromCollection(userActionLogs);
KeyedStream<UserActionLogPOJO, String> keyedStream = source.keyBy(new KeySelector<UserActionLogPOJO, String>() {
@Override
public String getKey(UserActionLogPOJO value) throws Exception {
return value.getUserID();
}
});
keyedStream.max("productPrice").print();
keyedStream.maxBy("productPrice").print();
env.execute();
}
}
union
union可以将多个流合并到一个流中,以便对合并的流进行统一处理。是对多个流的水平拼接。
参与合并的流必须是同一种类型。
举例:
package operators;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class UnionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
DataStream<String> textStream9002 = env.socketTextStream("localhost", 9002, "\n");
DataStream<String> mapStream9000=textStream9000.map(s->"来自9000端口:"+s);
DataStream<String> mapStream9001=textStream9001.map(s->"来自9001端口:"+s);
DataStream<String> mapStream9002=textStream9002.map(s->"来自9002端口:"+s);
DataStream<String> result = mapStream9000.union(mapStream9001,mapStream9002);
result.print();
env.execute();
}
}
join
根据指定的Key将两个流进行关联。
举例:
package operators;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class WindowJoinDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
DataStream<Tuple2<String,String>> mapStream9000=textStream9000
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return Tuple2.of(s,"来自9000端口:"+s);
}
});
DataStream<Tuple2<String,String>> mapStream9001=textStream9001
.map(new MapFunction<String, Tuple2<String,String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return Tuple2.of(s,"来自9001端口:"+s);
}
});
DataStream<String> result = mapStream9000.join(mapStream9001)
.where(t1->t1.getField(0)).equalTo(t2->t2.getField(0))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply((t1,t2)->t1.getField(1)+"|"+t2.getField(1))
;
result.print();
env.execute();
}
}
coGroup
关联两个流,关联不上的也保留下来。
举例:
package operators;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class CoGroupDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
DataStream<Tuple2<String, String>> mapStream9000 = textStream9000
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return Tuple2.of(s, "来自9000端口:" + s);
}
});
DataStream<Tuple2<String, String>> mapStream9001 = textStream9001
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return Tuple2.of(s, "来自9001端口:" + s);
}
});
DataStream<String> result = mapStream9000.coGroup(mapStream9001)
.where(t1 -> t1.getField(0)).equalTo(t2 -> t2.getField(0))
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<String> collector) throws Exception {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("来自9000的stream:");
for (Tuple2<String, String> item : iterable) {
stringBuffer.append(item.f1 + ",");
}
stringBuffer.append("来自9001的stream:");
for (Tuple2<String, String> item : iterable1) {
stringBuffer.append(item.f1 + ",");
}
collector.collect(stringBuffer.toString());
}
});
result.print();
env.execute();
}
}
connect
参考:https://www.jianshu.com/p/5b0574d466f8
将两个流纵向地连接起来。DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。
举例:
package operators;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ConnectDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream9000 = env.socketTextStream("localhost", 9000, "\n");
DataStream<String> textStream9001 = env.socketTextStream("localhost", 9001, "\n");
DataStream<Integer> intStream = textStream9000.filter(s -> isNumeric(s)).map(s -> Integer.valueOf(s));
SingleOutputStreamOperator result = intStream.connect(textStream9001)
.map(new CoMapFunction<Integer, String, Tuple2<Integer, String>>() {
@Override
public Tuple2<Integer, String> map1(Integer value) throws Exception {
return Tuple2.of(value, "");
}
@Override
public Tuple2<Integer, String> map2(String value) throws Exception {
return Tuple2.of(null, value);
}
});
result.print();
env.execute();
}
private static boolean isNumeric(String str) {
Pattern pattern = Pattern.compile("[0-9]*");
Matcher isNum = pattern.matcher(str);
if (!isNum.matches()) {
return false;
}
return true;
}
}
split
参考:https://cloud.tencent.com/developer/article/1382892
将一个流拆分为多个流。
package operators;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SplitDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
SplitStream<Tuple2<String, Integer>> result = textStream
.map(line -> Tuple2.of(line.trim(), 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.split(t -> {
List<String> list = new ArrayList<>();
if (isNumeric(t.f0)) {
list.add("num");
} else {
list.add("str");
}
return list;
});
DataStream<Tuple2<String, Integer>> strSplitStream = result.select("str")
.map(t -> Tuple2.of("字符串:" + t.f0, t.f1))
.returns(Types.TUPLE(Types.STRING,Types.INT));
DataStream<Tuple2<String, Integer>> intSplitStream = result.select("num")
.map(t -> Tuple2.of("数字:" + t.f0, t.f1))
.returns(Types.TUPLE(Types.STRING,Types.INT));
strSplitStream.print();
intSplitStream.print();
env.execute();
}
private static boolean isNumeric(String str) {
Pattern pattern = Pattern.compile("[0-9]*");
Matcher isNum = pattern.matcher(str);
if (!isNum.matches()) {
return false;
}
return true;
}
}
原文链接