flink中returns函数使用


Java8出来之后,lambda表达式由于简单易读,在流式计算中的使用开始变得普遍。

同样,Flink也支持lambda表达式,例如我们改写一下wordcount样例

DataSource<String> lines = env.fromElements(
    "Apache Flink is a community-driven open source framework for distributed big data analytics,",
    "like Hadoop and Spark. The core of Apache Flink is a distributed streaming dataflow engine written",
            ...
);
lines.flatMap(new FlatMapFunction<String, Object>() {
    @Override
    public void flatMap(String line, Collector< Object> out) throws Exception {
        for (String word : line.split("\\W+")) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}).groupBy(0).sum(1).print();

这段代码很简单,先把每一行按空格拆分成若干单词,并将每个单词和数字1组成一个Tuple,然后把所有Tuple按照单词聚合,计算出每个单词的出现次数

尝试用lambda表达式来替换FlatMapFunction,代码如下

lines.flatMap((line, out) -> {
    for (String word : line.split("\\W+")) {
        out.collect(new Tuple2<>(word, 1));
    }
}).groupBy(0).sum(1).print();

但当运行这段代码时,会抛出如下异常:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1653)
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1639)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:573)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:188)
    at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:266)
    at TestFlink.main(TestFlink.java:21)

这是因为Flink在用户自定义的函数中会使用泛型来创建serializer,当我们使用匿名函数时,类型信息会被保留。但Lambda表达式并不是匿名函数,所以javac编译的时候并不会把泛型保存到class文件里。

解决办法有两种:

第一种办法在异常中已经提示,使用Eclipse JDT编译器会保留对lambda表达式来说必要的类型信息。在Maven中使用Eclipse JDT编译器,只需要在把下面的插件加入到pom.xml中

<plugins>
    <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <compilerId>jdt</compilerId>
        </configuration>
        <dependencies>
            <dependency>
                <groupId>org.eclipse.tycho</groupId>
                <artifactId>tycho-compiler-jdt</artifactId>
                <version>0.21.0</version>
            </dependency>
        </dependencies>
    </plugin>
</plugins>

另一种办法是,使用Flink提供的returns方法来指定flatMap的返回类型,

text.flatMap((line, out) -> {
    for (String word : line.split("\\W+")) {
        out.collect(new Tuple2<>(word, 1));
    }
}).returns((TypeInformation) TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class)).groupBy(0).sum(1)
        .print();
text.flatMap((line, out) -> {
    for (String word : line.split("\\W+")) {
        out.collect(new Tuple2<>(word, 1));
    }
}).returns(Types.TUPLE(Types.STRING, Types.INT)).groupBy(0).sum(1)
        .print();

returns函数接收TypeInformation类型的参数,这里我们创建TupleTypeInfo来指定Tuple的参数类型。


文章作者: wuzhiyong
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wuzhiyong !
评论
 上一篇
flink常用算子及代码 flink常用算子及代码
以下列举下常用的算子,用到的代码例子都是Flink监听9000端口做为数据源。以下方法可以启动一个9000的socket端口服务。 Linux平台上可以使用 bash nc -lk 9000 如果是 Windows 平台,可以通过 http
2020-03-19
下一篇 
flink多数据流转换操作:join、union、connect flink多数据流转换操作:join、union、connect
操作概览 本文主要讲flink中多条数据流转换操作:join、union、connect join批处理经常要解决的问题是将两个数据源做关联Join操作。比如,很多手机APP都有一个用户数据源User,同时APP会记录用户的行为,我们称之为
2020-03-18
  目录