You need to enable JavaScript to run this app.
导航
使用变量
最近更新时间:2025.09.04 11:30:15首次发布时间:2023.07.21 17:25:17
复制全文
我的收藏
有用
有用
无用
无用

当您创建了变量时,您可以在

  1. SQL 任务中创建默认的临时表或指定 InMemoryCatalog 创建表时,在 WITH 参数中直接引用变量。
  2. Flink CDC 任务中 YAML 配置文件中,支持直接引用变量。
  3. Flink JAR / Python 作业,Args 参数中填写变量。
  4. 所有 Flink 任务类型的自定义参数支持注入变量。

本文用一个简单 Demo,来演示变量的使用方式。

变量优先级说明

指定域变量优先级高于全局变量。

  • 只引用全局变量,则只会在全局变量中查找变量。
  • 只引用指定域变量,先从您指定的作用域中查找变量,再从全局变量中查找变量。如果变量不存在则运行报错。
  • 同时引用全局变量和指定域变量,先从您指定的变量作用域中查找变量,再从全局变量中查找变量。如果变量不存在则运行报错。

示例 Demo

本文模拟一个读取 MySQL 数据的场景,希望将 MySQL 的用户名和密码以变量的形式呈现,不以明文展示。请提前完成变量的创建,具体操作请参见创建变量
示例中 MySQL 的用户名和密码以变量的形式呈现。在 SQL 任务中引用变量时格式为${secret_values.variable_name},其中secret_values.为固定前缀,variable_name为您创建的变量名称。

  • ${secret_values.username}:引用全局变量 username 变量。
  • ${secret_values.password}:引用全局变量 password 变量。
  • ${secret_values.tablename}:引用指定域变量 tablename ** 变量。注意:同时需要在参数配置中指定正确的变量作用域。如下图所示:

注意

引用指定域变量时,需要选择SQL编辑框右侧参数配置 -> 变量作用域 -> 指定域 对话框中选择正确的作用域。如果指定的作用域和全局变量中都没有找到引用的变量,任务运行时将报错。
Image

CREATE TEMPORARY TABLE
  mysql_source (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time timestamp,
    PRIMARY KEY (order_id) NOT ENFORCED
  )
WITH
  (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:30294/doc_test',            
    'username' = '${secret_values.username}',   //引用全局变量-username变量。
    'password' = '${secret_values.password}',   //引用全局变量-password变量。
    'table-name' = '${secret_values.tablename}'  //引用指定域变量-tablename变量。
  );


create table
  print_sink (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time timestamp
  )
WITH
  ('connector' = 'print');


insert into
  print_sink
select
  *
from
  mysql_source;

本节举一个 Flink JAR 作业的示例(Python 作业类似),在 Flink 作业的 Task 中,需要访问 MySQL 获取一些数据,所以我们一般最佳实践是使用环境变量来注入 MySQL 的用户名和密码。而用户名密码作为敏感信息,不应该以明文的形式在代码中出现,所以我们需要使用变量来替代。

步骤一:创建加密变量
在变量中,创建一个名为 mysql-passwd 的变量,并将密码作为变量值。
Image

步骤二:创建 Flink JAR 任务,设置环境变量
在创建 Flink JAR 作业后,在自定义参数中设置容器环境变量

# JobManager 如下设置 
# containerized.master.env.<环境变量名>
containerized.master.env.MYSQL_PASSWD: ${secret_values.mysql-passwd}
# TaskManager 如下设置 
# containerized.taskmanager.env.<环境变量名>
containerized.taskmanager.env.MYSQL_PASSWD: ${secret_values.mysql-passwd}

Image

步骤三:在代码中读取环境变量名,以获取注入变量

// 使用 RichMapFunction 读取环境变量
DataStream<String> resultStream = inputStream.map(new RichMapFunction<String, String>() {
    private transient String mysqlPassword;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 在 open 方法中读取环境变量
        mysqlPassword = System.getenv("MYSQL_PASSWD");
        
        if (mysqlPassword == null) {
            throw new RuntimeException("Environment variable MYSQL_PASSWD not found");
        }
    }

    @Override
    public String map(String value) throws Exception {
        return String.format("Task: %s, MySQL Password: %s", value, mysqlPassword);
    }
});

本节举一个例子,可以通过变量来批量控制不同任务的分支或者逻辑。比如我们可以在 Flink JAR 作业或者 Flink Python 作业中,设置 main args (支持 KV 和文本两种模式),然后在代码中读取参数,通过不同的参数选择不同的计算逻辑。这种模式常见于 A/B 实验,小流量发布等生产 Flink 任务切换场景。

步骤一:创建 Flink JAR 作业,填写 main args 参数,在 value 中我们可以使用已经定义好的参数
Image

步骤二:在 JAVA 作业中使用 ParameterTool 解析 version 参数,通过不同变量控制不同的处理逻辑

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class VersionedFlinkJob {

    public static void main(String[] args) throws Exception {
        // 设置流处理环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 解析命令行参数
        ParameterTool params = ParameterTool.fromArgs(args);
        String version = params.get("version", "v1"); // 默认使用v1版本
        
        // 创建示例数据流
        DataStream<String> text = env.fromElements(
            "Hello", "World", "Flink", "Example"
        );
        
        // 根据版本参数选择不同的处理逻辑
        DataStream<String> processed;
        
        switch (version) {
            case "v1":
                ...
                break;
            case "v2":
                ...
                break;
            default:
                ...
                break;
        }
        // 输出结果
        processed.print();
        // 执行任务
        env.execute("Versioned Flink Job - " + version);
    }
}

在 Flink CDC 的 YAML 配置中,我们可以在配置中引用变量,这样可以方便的在不同环境中切换不同的配置。
Image