当您创建了变量时,您可以在
本文用一个简单 Demo,来演示变量的使用方式。
指定域变量优先级高于全局变量。
本文模拟一个读取 MySQL 数据的场景,希望将 MySQL 的用户名和密码以变量的形式呈现,不以明文展示。请提前完成变量的创建,具体操作请参见创建变量。
示例中 MySQL 的用户名和密码以变量的形式呈现。在 SQL 任务中引用变量时格式为${secret_values.variable_name},其中secret_values.为固定前缀,variable_name为您创建的变量名称。
${secret_values.username}:引用全局变量 username 变量。${secret_values.password}:引用全局变量 password 变量。${secret_values.tablename}:引用指定域变量 tablename ** 变量。注意:同时需要在参数配置中指定正确的变量作用域。如下图所示:注意
引用指定域变量时,需要选择SQL编辑框右侧参数配置 -> 变量作用域 -> 指定域 对话框中选择正确的作用域。如果指定的作用域和全局变量中都没有找到引用的变量,任务运行时将报错。
CREATE TEMPORARY TABLE mysql_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:30294/doc_test', 'username' = '${secret_values.username}', //引用全局变量-username变量。 'password' = '${secret_values.password}', //引用全局变量-password变量。 'table-name' = '${secret_values.tablename}' //引用指定域变量-tablename变量。 ); create table print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ('connector' = 'print'); insert into print_sink select * from mysql_source;
本节举一个 Flink JAR 作业的示例(Python 作业类似),在 Flink 作业的 Task 中,需要访问 MySQL 获取一些数据,所以我们一般最佳实践是使用环境变量来注入 MySQL 的用户名和密码。而用户名密码作为敏感信息,不应该以明文的形式在代码中出现,所以我们需要使用变量来替代。
步骤一:创建加密变量
在变量中,创建一个名为 mysql-passwd 的变量,并将密码作为变量值。
步骤二:创建 Flink JAR 任务,设置环境变量
在创建 Flink JAR 作业后,在自定义参数中设置容器环境变量
# JobManager 如下设置 # containerized.master.env.<环境变量名> containerized.master.env.MYSQL_PASSWD: ${secret_values.mysql-passwd} # TaskManager 如下设置 # containerized.taskmanager.env.<环境变量名> containerized.taskmanager.env.MYSQL_PASSWD: ${secret_values.mysql-passwd}
步骤三:在代码中读取环境变量名,以获取注入变量
// 使用 RichMapFunction 读取环境变量 DataStream<String> resultStream = inputStream.map(new RichMapFunction<String, String>() { private transient String mysqlPassword; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 在 open 方法中读取环境变量 mysqlPassword = System.getenv("MYSQL_PASSWD"); if (mysqlPassword == null) { throw new RuntimeException("Environment variable MYSQL_PASSWD not found"); } } @Override public String map(String value) throws Exception { return String.format("Task: %s, MySQL Password: %s", value, mysqlPassword); } });
本节举一个例子,可以通过变量来批量控制不同任务的分支或者逻辑。比如我们可以在 Flink JAR 作业或者 Flink Python 作业中,设置 main args (支持 KV 和文本两种模式),然后在代码中读取参数,通过不同的参数选择不同的计算逻辑。这种模式常见于 A/B 实验,小流量发布等生产 Flink 任务切换场景。
步骤一:创建 Flink JAR 作业,填写 main args 参数,在 value 中我们可以使用已经定义好的参数
步骤二:在 JAVA 作业中使用 ParameterTool 解析 version 参数,通过不同变量控制不同的处理逻辑
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class VersionedFlinkJob { public static void main(String[] args) throws Exception { // 设置流处理环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 解析命令行参数 ParameterTool params = ParameterTool.fromArgs(args); String version = params.get("version", "v1"); // 默认使用v1版本 // 创建示例数据流 DataStream<String> text = env.fromElements( "Hello", "World", "Flink", "Example" ); // 根据版本参数选择不同的处理逻辑 DataStream<String> processed; switch (version) { case "v1": ... break; case "v2": ... break; default: ... break; } // 输出结果 processed.print(); // 执行任务 env.execute("Versioned Flink Job - " + version); } }
在 Flink CDC 的 YAML 配置中,我们可以在配置中引用变量,这样可以方便的在不同环境中切换不同的配置。