You need to enable JavaScript to run this app.
导航
创建自定义函数
最近更新时间:2025.08.06 17:13:37首次发布时间:2022.11.11 17:16:16
复制全文
我的收藏
有用
有用
无用
无用

在 SQL 任务开发中,如果系统预置的内部函数无法满足业务需求时,可以选择创建自定义函数解决问题。

背景信息

UDF(User Defined Function)函数,即用户自定义函数。
系统已经内置了SUBSTRINGREPLACE等字符串操作函数,以及MAXMINAVG等聚合函数;但是内置函数也会出现无法支撑现实业务的时候,此时您可以选择自行编写代码逻辑来创建自定义函数,从而实现现实业务。

前提条件

您在创建 UDF 函数前,请自行完成编写代码,并构建 JAR 包。

  • 一般是由开发人员(Project_Dev)管理 UDF 函数,请确保操作者已被添加为项目成员并为其关联角色,请参见添加项目成员
  • UDF 函数使用的 JAR 包,文件名称要符合控制台要求,只能由大小写字母、数字、下划线(_)、短横线(-)和英文句号(.)组成。

UDF 代码编写

POM 依赖

首先,在您的 Maven 项目中添加必要的 Flink 依赖,version 可以根据任务选择的 Flink 引擎填写对应的版本

  1. Flink 1.16 版本建议选择 1.16.3
  2. Flink 1.17 版本建议选择 1.17.2:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

标量函数

ScalarFunction (标量函数):标量函数将零个、一个或多个值映射到一个新值。

import org.apache.flink.table.functions.ScalarFunction;

public class MyScalarFunction extends ScalarFunction {
    
    // 必须实现 eval 方法
    public String eval(String input) {
        if (input == null) {
            return null;
        }
        return input.toUpperCase();
    }
    
    // 可以重载 eval 方法
    public Integer eval(Integer a, Integer b) {
        return a + b;
    }
}

表函数

TableFunction (表函数):表函数返回零个、一个或多个行作为输出。

import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

public class MyTableFunction extends TableFunction<Row> {
    
    public void eval(String str) {
        if (str == null || str.isEmpty()) {
            return;
        }
        
        for (String s : str.split(",")) {
            Row row = new Row(2);
            row.setField(0, s);
            row.setField(1, s.length());
            collect(row);
        }
    }
}

聚合函数

AggregateFunction (聚合函数):聚合函数对多行数据进行聚合计算。

import org.apache.flink.table.functions.AggregateFunction;

public class MyAggregateFunction extends AggregateFunction<Double, MyAccumulator> {
    
    // 创建累加器
    @Override
    public MyAccumulator createAccumulator() {
        return new MyAccumulator();
    }
    
    // 获取最终结果
    @Override
    public Double getValue(MyAccumulator accumulator) {
        if (accumulator.count == 0) {
            return null;
        }
        return accumulator.sum / accumulator.count;
    }
    
    // 累加方法
    public void accumulate(MyAccumulator accumulator, Double value) {
        if (value != null) {
            accumulator.sum += value;
            accumulator.count++;
        }
    }
    
    // 可选的 retract 方法(用于有界 OVER 窗口)
    public void retract(MyAccumulator accumulator, Double value) {
        if (value != null) {
            accumulator.sum -= value;
            accumulator.count--;
        }
    }
    
    // 可选的 merge 方法(用于会话窗口和分组聚合)
    public void merge(MyAccumulator accumulator, Iterable<MyAccumulator> iterable) {
        for (MyAccumulator other : iterable) {
            accumulator.sum += other.sum;
            accumulator.count += other.count;
        }
    }
    
    // 可选的 reset 方法
    public void resetAccumulator(MyAccumulator accumulator) {
        accumulator.sum = 0.0;
        accumulator.count = 0L;
    }
    
    // 自定义累加器类
    public static class MyAccumulator {
        public double sum = 0.0;
        public long count = 0L;
    }
}

注册和使用 UDF

编译 UDF JAR 包上传依赖文件

在作业开发 - 右侧配置参数侧栏 - 依赖文件,上传并且选择 UDF JAR 包。

在 SQL 客户端中注册 UDF

在 SQL 中使用如下方式注册和引用 UDF 函数:

-- 注册函数
CREATE FUNCTION my_scalar_func AS 'com.example.udf.MyScalarFunction';

-- 使用函数
SELECT my_scalar_func(name) FROM source_table;