Flink DataStream执行flatMap增量转换后未输出预期值,求排查
Why isn't my Flink DataStream printing the transformed results after flatMap?
我写了一段Flink代码,想要把列表里的每个元素自增1后打印出来,但实际输出还是原来的元素。代码如下:
List<Integer> lst = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .collect(Collectors.toList()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> dataStream = env.fromCollection(lst); dataStream.flatMap(new FlatMapFunction<Integer, Integer>() { @Override public void flatMap(Integer integer, Collector<Integer> collector) throws Exception { collector.collect(new Integer(integer.intValue() + 1)); } }); dataStream.print(); env.execute("Executing list");
执行后得到的输出是:
10> 4 9> 3 8> 2 12> 6 11> 5 4> 10 1> 7 2> 8 3> 9 7> 1
我原本期望每个元素自增1后输出,推测错误可能出在FlatMap函数或打印环节,请问为何DataStream未打印转换后的结果?
嗨,这个问题其实是Flink数据流处理里很常见的小疏忽——你没有把flatMap转换后的数据流赋值给变量,而是直接对原始的dataStream调用了print()方法。
Flink的算子(比如flatMap、map这类)都是无状态且返回新数据流的,它们不会修改原始的数据流对象。你的代码里,dataStream.flatMap(...)执行后生成了一个全新的DataStream<Integer>,但你没有保存这个新数据流,反而还是用最初的dataStream去打印,自然输出的是未处理的原始数据。
解决方法很简单,两种方式都能搞定:
方案一:把转换后的数据流赋值给新变量
List<Integer> lst = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).collect(Collectors.toList()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Integer> dataStream = env.fromCollection(lst); // 保存flatMap处理后的新数据流 DataStream<Integer> transformedStream = dataStream.flatMap(new FlatMapFunction<Integer, Integer>() { @Override public void flatMap(Integer integer, Collector<Integer> collector) throws Exception { collector.collect(integer + 1); // 这里可以简化写法,自动装箱会处理类型转换 } }); transformedStream.print(); // 打印转换后的结果 env.execute("Executing list");
方案二:直接链式调用算子
List<Integer> lst = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).collect(Collectors.toList()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromCollection(lst) .flatMap(new FlatMapFunction<Integer, Integer>() { @Override public void flatMap(Integer integer, Collector<Integer> collector) throws Exception { collector.collect(integer + 1); } }) .print(); // 转换后直接链式调用print env.execute("Executing list");
另外提个小细节:代码里new Integer(integer.intValue() + 1)完全可以简化成integer + 1,Java的自动装箱机制已经帮你处理了基本类型和包装类的转换,这样代码会更简洁~
修改后再运行,你就能看到每个元素自增1后的正确输出啦!
内容的提问来源于stack exchange,提问作者hyvbug




