You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Apache Flink中连接两个DataStream并实现数值除法操作?

嘿,这个场景我刚好在项目里遇到过,给你两种在Flink里的实现思路,都能完美解决你的需求:

方案一:使用Connected DataStreams

因为你的second流基本只有一个键值对,我们可以利用Connected DataStreams的CoFlatMap来缓存这个常量值,再对one流的每个元素做除法操作。具体逻辑很直接:

  1. 先把onesecond两个流连接起来;
  2. CoFlatMapFunction里维护一个变量,专门存储second传来的常量值;
  3. 处理one流的每个元素时,直接用元素的数值除以缓存好的常量。

代码示例大概是这样:

// 连接两个流
ConnectedStreams<Tuple2<String, Double>, Tuple2<String, Double>> connectedStreams = one.connect(second);

// 处理连接后的流,完成除法计算
DataStream<Tuple2<String, Double>> resultStream = connectedStreams
    .flatMap(new CoFlatMapFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() {
        // 缓存second流的常量值
        private Double constantValue = null;

        @Override
        public void flatMap1(Tuple2<String, Double> value, Collector<Tuple2<String, Double>> out) throws Exception {
            // 只有当常量值已缓存且不为0时,才处理one流的元素
            if (constantValue != null && constantValue != 0) {
                Double dividedValue = value.f1 / constantValue;
                out.collect(new Tuple2<>(value.f0, dividedValue));
            }
        }

        @Override
        public void flatMap2(Tuple2<String, Double> value, Collector<Tuple2<String, Double>> out) throws Exception {
            // 缓存second流的元素值(因为只有一个,直接覆盖即可)
            constantValue = value.f1;
        }
    });

这里要注意:如果second流的元素延迟到达,one流的早期元素会被丢弃。如果业务需要等待常量值到位后再处理,可以结合TimerService加个简单的等待逻辑。

方案二:使用Broadcast State(更推荐生产环境)

如果你的Flink是分布式部署的,Broadcast State会是更优的选择——它能把second流的常量值广播到所有并行任务的状态中,确保每个处理one流的Task都能拿到这个值,而且后续如果second流有更新(哪怕很少),也能自动同步到所有任务。

实现步骤如下:

  1. 定义一个广播状态描述器,用来存储second的常量值;
  2. second流转换成广播流;
  3. one流和广播流连接,在ProcessFunction中读取广播状态,完成除法操作。

代码示例:

// 定义广播状态描述器,用固定字符串作为键,Double存储常量值
MapStateDescriptor<String, Double> broadcastStateDescriptor = new MapStateDescriptor<>(
    "ConstantBroadcastState",
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.DOUBLE_TYPE_INFO
);

// 将second流转换成广播流
BroadcastStream<Tuple2<String, Double>> broadcastStream = second
    .broadcast(broadcastStateDescriptor);

// 连接one流和广播流,处理每个元素
DataStream<Tuple2<String, Double>> resultStream = one
    .connect(broadcastStream)
    .process(new BroadcastProcessFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() {
        @Override
        public void processElement(Tuple2<String, Double> value, ReadOnlyContext ctx, Collector<Tuple2<String, Double>> out) throws Exception {
            // 从广播状态中取出常量值
            ReadOnlyBroadcastState<String, Double> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
            Double constantValue = broadcastState.get("fixed_constant_key"); // 自定义固定键名
            if (constantValue != null && constantValue != 0) {
                Double dividedValue = value.f1 / constantValue;
                out.collect(new Tuple2<>(value.f0, dividedValue));
            }
        }

        @Override
        public void processBroadcastElement(Tuple2<String, Double> value, Context ctx, Collector<Tuple2<String, Double>> out) throws Exception {
            // 更新广播状态,存入second流的常量值
            BroadcastState<String, Double> broadcastState = ctx.getBroadcastState(broadcastStateDescriptor);
            broadcastState.put("fixed_constant_key", value.f1);
        }
    });

两种方案的适用场景:

  • Connected DataStreams实现简单,适合本地测试或者单任务场景,但分布式环境下每个并行任务的缓存是独立的,如果second流只发送一次,可能会有部分任务拿不到值;
  • Broadcast State是分布式友好的,能保证所有并行任务同步拿到常量值,更适合生产环境。

内容的提问来源于stack exchange,提问作者ChackM

火山引擎 最新活动