You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Flink DataStream执行flatMap增量转换后未输出预期值,求排查

我写了一段Flink代码,想要把列表里的每个元素自增1后打印出来,但实际输出还是原来的元素。代码如下:

List<Integer> lst = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .collect(Collectors.toList()); 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
DataStream<Integer> dataStream = env.fromCollection(lst); 
dataStream.flatMap(new FlatMapFunction<Integer, Integer>() { 
    @Override 
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception { 
        collector.collect(new Integer(integer.intValue() + 1)); 
    } 
}); 
dataStream.print(); 
env.execute("Executing list");

执行后得到的输出是:

10> 4 
9> 3 
8> 2 
12> 6 
11> 5 
4> 10 
1> 7 
2> 8 
3> 9 
7> 1

我原本期望每个元素自增1后输出,推测错误可能出在FlatMap函数或打印环节,请问为何DataStream未打印转换后的结果?


嗨,这个问题其实是Flink数据流处理里很常见的小疏忽——你没有把flatMap转换后的数据流赋值给变量,而是直接对原始的dataStream调用了print()方法

Flink的算子(比如flatMapmap这类)都是无状态且返回新数据流的,它们不会修改原始的数据流对象。你的代码里,dataStream.flatMap(...)执行后生成了一个全新的DataStream<Integer>,但你没有保存这个新数据流,反而还是用最初的dataStream去打印,自然输出的是未处理的原始数据。

解决方法很简单,两种方式都能搞定:

方案一:把转换后的数据流赋值给新变量

List<Integer> lst = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).collect(Collectors.toList()); 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
DataStream<Integer> dataStream = env.fromCollection(lst); 
// 保存flatMap处理后的新数据流
DataStream<Integer> transformedStream = dataStream.flatMap(new FlatMapFunction<Integer, Integer>() { 
    @Override 
    public void flatMap(Integer integer, Collector<Integer> collector) throws Exception { 
        collector.collect(integer + 1); // 这里可以简化写法,自动装箱会处理类型转换
    } 
}); 
transformedStream.print(); // 打印转换后的结果
env.execute("Executing list");

方案二:直接链式调用算子

List<Integer> lst = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).collect(Collectors.toList()); 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
env.fromCollection(lst)
   .flatMap(new FlatMapFunction<Integer, Integer>() { 
       @Override 
       public void flatMap(Integer integer, Collector<Integer> collector) throws Exception { 
           collector.collect(integer + 1); 
       } 
   })
   .print(); // 转换后直接链式调用print
env.execute("Executing list");

另外提个小细节:代码里new Integer(integer.intValue() + 1)完全可以简化成integer + 1,Java的自动装箱机制已经帮你处理了基本类型和包装类的转换,这样代码会更简洁~

修改后再运行,你就能看到每个元素自增1后的正确输出啦!

内容的提问来源于stack exchange,提问作者hyvbug

火山引擎 最新活动