You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
管理变量
使用变量
复制全文
使用变量

当您创建了变量时,您可以在

  1. SQL 任务中创建默认的临时表或指定 InMemoryCatalog 创建表时,在 WITH 参数中直接引用变量。
  2. Flink CDC 任务中 YAML 配置文件中,支持直接引用变量。
  3. Flink JAR / Python 作业,Args 参数中填写变量。
  4. 所有 Flink 任务类型的自定义参数支持注入变量。

本文用一个简单 Demo,来演示变量的使用方式。

变量优先级说明

指定域变量优先级高于全局变量。

  • 只引用全局变量,则只会在全局变量中查找变量。
  • 只引用指定域变量,先从您指定的作用域中查找变量,再从全局变量中查找变量。如果变量不存在则运行报错。
  • 同时引用全局变量和指定域变量,先从您指定的变量作用域中查找变量,再从全局变量中查找变量。如果变量不存在则运行报错。

示例 Demo

本文模拟一个读取 MySQL 数据的场景,希望将 MySQL 的用户名和密码以变量的形式呈现,不以明文展示。请提前完成变量的创建,具体操作请参见创建变量
示例中 MySQL 的用户名和密码以变量的形式呈现。在 SQL 任务中引用变量时格式为${secret_values.variable_name},其中secret_values.为固定前缀,variable_name为您创建的变量名称。

  • ${secret_values.username}:引用全局变量 username 变量。
  • ${secret_values.password}:引用全局变量 password 变量。
  • ${secret_values.tablename}:引用指定域变量 tablename ** 变量。注意:同时需要在参数配置中指定正确的变量作用域。如下图所示:

注意

引用指定域变量时,需要选择SQL编辑框右侧参数配置 -> 变量作用域 -> 指定域 对话框中选择正确的作用域。如果指定的作用域和全局变量中都没有找到引用的变量,任务运行时将报错。
Image

CREATE TEMPORARY TABLE
  mysql_source (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time timestamp,
    PRIMARY KEY (order_id) NOT ENFORCED
  )
WITH
  (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.*.*.*:30294/doc_test',            
    'username' = '${secret_values.username}',   //引用全局变量-username变量。
    'password' = '${secret_values.password}',   //引用全局变量-password变量。
    'table-name' = '${secret_values.tablename}'  //引用指定域变量-tablename变量。
  );


create table
  print_sink (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time timestamp
  )
WITH
  ('connector' = 'print');


insert into
  print_sink
select
  *
from
  mysql_source;

本节举一个 Flink JAR 作业的示例(Python 作业类似),在 Flink 作业的 Task 中,需要访问 MySQL 获取一些数据,所以我们一般最佳实践是使用环境变量来注入 MySQL 的用户名和密码。而用户名密码作为敏感信息,不应该以明文的形式在代码中出现,所以我们需要使用变量来替代。

步骤一:创建加密变量
在变量中,创建一个名为 mysql-passwd 的变量,并将密码作为变量值。
Image

步骤二:创建 Flink JAR 任务,设置环境变量
在创建 Flink JAR 作业后,在自定义参数中设置容器环境变量

# JobManager 如下设置 
# containerized.master.env.<环境变量名>
containerized.master.env.MYSQL_PASSWD: ${secret_values.mysql-passwd}
# TaskManager 如下设置 
# containerized.taskmanager.env.<环境变量名>
containerized.taskmanager.env.MYSQL_PASSWD: ${secret_values.mysql-passwd}

Image

步骤三:在代码中读取环境变量名,以获取注入变量

// 使用 RichMapFunction 读取环境变量
DataStream<String> resultStream = inputStream.map(new RichMapFunction<String, String>() {
    private transient String mysqlPassword;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 在 open 方法中读取环境变量
        mysqlPassword = System.getenv("MYSQL_PASSWD");
        
        if (mysqlPassword == null) {
            throw new RuntimeException("Environment variable MYSQL_PASSWD not found");
        }
    }

    @Override
    public String map(String value) throws Exception {
        return String.format("Task: %s, MySQL Password: %s", value, mysqlPassword);
    }
});

本节举一个例子,可以通过变量来批量控制不同任务的分支或者逻辑。比如我们可以在 Flink JAR 作业或者 Flink Python 作业中,设置 main args (支持 KV 和文本两种模式),然后在代码中读取参数,通过不同的参数选择不同的计算逻辑。这种模式常见于 A/B 实验,小流量发布等生产 Flink 任务切换场景。

步骤一:创建 Flink JAR 作业,填写 main args 参数,在 value 中我们可以使用已经定义好的参数
Image

步骤二:在 JAVA 作业中使用 ParameterTool 解析 version 参数,通过不同变量控制不同的处理逻辑

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class VersionedFlinkJob {

    public static void main(String[] args) throws Exception {
        // 设置流处理环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 解析命令行参数
        ParameterTool params = ParameterTool.fromArgs(args);
        String version = params.get("version", "v1"); // 默认使用v1版本
        
        // 创建示例数据流
        DataStream<String> text = env.fromElements(
            "Hello", "World", "Flink", "Example"
        );
        
        // 根据版本参数选择不同的处理逻辑
        DataStream<String> processed;
        
        switch (version) {
            case "v1":
                ...
                break;
            case "v2":
                ...
                break;
            default:
                ...
                break;
        }
        // 输出结果
        processed.print();
        // 执行任务
        env.execute("Versioned Flink Job - " + version);
    }
}

在 Flink CDC 的 YAML 配置中,我们可以在配置中引用变量,这样可以方便的在不同环境中切换不同的配置。
Image

最近更新时间:2025.09.04 11:30:15
这个页面对您有帮助吗?
有用
有用
无用
无用