You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
自定义函数 Demo
Aggregation Functions
复制全文
下载 pdf
Aggregation Functions

Aggregation function 是将一行或多行数据聚合为一个标量值。

如何实现

  1. pom.xml中添加flink-table-common的依赖。version 可以根据任务选择的 Flink 引擎填写对应的版本

    1. Flink 1.16 版本建议选择 1.16.3
    2. Flink 1.17 版本建议选择 1.17.2
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    
  2. 代码实现。
    通过继承org.apache.flink.table.functions.AggregateFunction实现,必需实现以下三个方法:

    • createAccumulator():生成一个累加器,用来存放聚合状态。

      说明

      创建 accumulator 时,所用的比较器 comparator 不能为 lambda 表达式,最好将 comparator 放在类的成员变量中,因为构造出的对象无法序列化,会导致 Flink 任务失败。

    • accumulate():为每个输入行调用 accumulate() 方法来更新累加器。
    • getValue():处理完所有行后,将调用 getValue() 方法来计算并返回最终结果。
      除以上三个必需的办法外,以下几种方法只在某些情况需要实现。
    • retract():对于有界的 OVER 窗口是必需的。
    • merge():对于很多批处理聚合和会话窗口聚合是必需的。
    • resetAccumulator():对于很多批处理聚合是必需的。
    • getResultType():如果结果类型不是基本类型或简单的 POJO 对象,则是必需的。
    • getAccumulatorType():如果累加器类型不是基本类型或简单的 POJO 对象,则是必需的。

示例 Demo

本文提供一个简单的 Aggregation function 的示例,作用是统计 sum 和 count。

/**
 * Accumulator for WeightedAvg.
 */
public static class WeightedAvgAccum {
    public long sum = 0;
    public int count = 0;
}

/**
 * Weighted Average user-defined aggregate function.
 */
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {

    // 初始化累加器。
    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }

    // 获取结果。
    @Override
    public Long getValue(WeightedAvgAccum acc) {
        if (acc.count == 0) {
            return null;
        } else {
            return acc.sum / acc.count;
        }
    }
    
    // 指明返回值类型,这个例子中返回的是 Long 类型,其实可以不写这个方法,Flink 能自动推断。
    // 此处为了说明该方法的使用,因此加上了。
    // 1. 对于基础类型或 POJO 类型,比如 Integer, Long, String 等类型,其实不需要写这个方法。
    // 2. 对于复杂类型,比如 array, map, row 等类型,必须写这个方法。
    // 3. getAccumulatorType 方法也类似,但是是返回 Accumulator 的类型。
    @Override
        public TypeInformation<Row[]> getResultType() {
            // 本例中是 Long。
                return Types.LONG;
        
        // 以下是几个例子:
        // 1. Map<String, Integer>
        // Types.MAP(Types.STRING, Types.INT)
        // 2. String[], Object 类型的数组
        // Types.OBJECT_ARRAY(Types.STRING)
        // 3. int[] 或 Integer[], 原始类型数组
        // Types.PRIMITIVE_ARRAY(Types.INT);
        // 4. Row 类型, 比如有两列,name varchar, id int.
        // Types.ROW_NAMED(new String[]{"name", "id"}, new TypeInformation[]{Types.STRING, Types.INT})
        // 5. 更多常见的类型见  org.apache.flink.api.common.typeinfo.Types
        }

        // 累加某条消息
    public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }

        // 撤回某条消息
    public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }
    
    // 多累加器合并,例如 session window 就会用到
    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }
    
    // 重置累加器
    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0L;
    }
}

如何使用

如何使用自定义的 Aggregation function,请参见创建自定义函数

最近更新时间:2025.08.06 17:31:38
这个页面对您有帮助吗?
有用
有用
无用
无用