在 SQL 任务开发中,如果系统预置的内部函数无法满足业务需求时,可以选择创建自定义函数解决问题。
UDF(User Defined Function)函数,即用户自定义函数。
系统已经内置了SUBSTRING、REPLACE等字符串操作函数,以及MAX、MIN、AVG等聚合函数;但是内置函数也会出现无法支撑现实业务的时候,此时您可以选择自行编写代码逻辑来创建自定义函数,从而实现现实业务。
您在创建 UDF 函数前,请自行完成编写代码,并构建 JAR 包。
首先,在您的 Maven 项目中添加必要的 Flink 依赖,version 可以根据任务选择的 Flink 引擎填写对应的版本
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
ScalarFunction (标量函数):标量函数将零个、一个或多个值映射到一个新值。
import org.apache.flink.table.functions.ScalarFunction; public class MyScalarFunction extends ScalarFunction { // 必须实现 eval 方法 public String eval(String input) { if (input == null) { return null; } return input.toUpperCase(); } // 可以重载 eval 方法 public Integer eval(Integer a, Integer b) { return a + b; } }
TableFunction (表函数):表函数返回零个、一个或多个行作为输出。
import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class MyTableFunction extends TableFunction<Row> { public void eval(String str) { if (str == null || str.isEmpty()) { return; } for (String s : str.split(",")) { Row row = new Row(2); row.setField(0, s); row.setField(1, s.length()); collect(row); } } }
AggregateFunction (聚合函数):聚合函数对多行数据进行聚合计算。
import org.apache.flink.table.functions.AggregateFunction; public class MyAggregateFunction extends AggregateFunction<Double, MyAccumulator> { // 创建累加器 @Override public MyAccumulator createAccumulator() { return new MyAccumulator(); } // 获取最终结果 @Override public Double getValue(MyAccumulator accumulator) { if (accumulator.count == 0) { return null; } return accumulator.sum / accumulator.count; } // 累加方法 public void accumulate(MyAccumulator accumulator, Double value) { if (value != null) { accumulator.sum += value; accumulator.count++; } } // 可选的 retract 方法(用于有界 OVER 窗口) public void retract(MyAccumulator accumulator, Double value) { if (value != null) { accumulator.sum -= value; accumulator.count--; } } // 可选的 merge 方法(用于会话窗口和分组聚合) public void merge(MyAccumulator accumulator, Iterable<MyAccumulator> iterable) { for (MyAccumulator other : iterable) { accumulator.sum += other.sum; accumulator.count += other.count; } } // 可选的 reset 方法 public void resetAccumulator(MyAccumulator accumulator) { accumulator.sum = 0.0; accumulator.count = 0L; } // 自定义累加器类 public static class MyAccumulator { public double sum = 0.0; public long count = 0L; } }
在作业开发 - 右侧配置参数侧栏 - 依赖文件,上传并且选择 UDF JAR 包。
在 SQL 中使用如下方式注册和引用 UDF 函数:
-- 注册函数 CREATE FUNCTION my_scalar_func AS 'com.example.udf.MyScalarFunction'; -- 使用函数 SELECT my_scalar_func(name) FROM source_table;