You need to enable JavaScript to run this app.
导航
RocketMQ
最近更新时间:2025.05.07 13:42:44首次发布时间:2025.05.07 13:42:44
我的收藏
有用
有用
无用
无用

RocketMQ 连接器提供从 RocketMQ Topic 中消费和写入数据的能力,支持做数据源表和结果表。您可以创建 source 流从 RocketMQ Topic 中获取数据,作为作业的输入数据;也可以通过 RocketMQ 结果表将作业输出数据写入到 RocketMQ Topic 中。

注意事项

  • RocketMQ 连接器目前仅支持在 Flink 1.16-volcano 及以上引擎版本中使用
  • 当前暂不支持 MessageQueue 自动发现,如果 RocketMQ 有扩容,请在扩容后停止作业后再次启动作业,启动作业选择从上次状态恢复(从下次成功 checkpoint 后重启可以减少重复)
  • 注意请打开 checkpoint,否则数据消费没法保证不丢不重

DDL 定义

用作数据源(Source)

create table rmq_src (
    `id` BIGINT,
    `name` VARCHAR
) with (
    -- connector 名称,固定为 rocketmq
    'connector' = 'rocketmq',  
    -- 填写 RMQ 产品页面的访问地址
    'endPoint' = 'http://rocketmq-cnng535a1c59df3d.rocketmq.ivolces.com:9876',
    -- 填写访问的 topic
    'topic' = 'flink-rmq-test-topic',
    -- 填写 group id,可以在 RMQ 产品页面注册好
    'group' = 'GID-flink_rmq_connector_test_group',
    -- 填写有读权限的 AK/SK,在 RMQ 产品页面的秘钥管理那边可以查到
    'accessId' = 'xxx',
    'accessKey' = 'xxx',
    -- 填写 Flink 内置的 format
    'format' = 'json',
    -- 上游没有数据可供消费时,source 的休眠时间
    'pullIntervalMs' = '500'
);

create table print_sink (
    `id` BIGINT,
    `name` VARCHAR
) with (
    'connector' = 'print'
);

insert into print_sink
select * from rmq_src;

用作数据目的(Sink)

create table src (
    `id` BIGINT,
    `name` VARCHAR,
    proc_time as proctime()
) with (
    'connector' = 'datagen',
    'rows-per-second' = '3',
    'fields.id.min' = '1',
    'fields.id.max' = '20',
    'fields.name.length' = '3'
);

create table rmq_sink (
    `id` BIGINT,
    `name` VARCHAR
) with (
    -- connector 名称,固定为 rocketmq
    'connector' = 'rocketmq',  
    -- 填写 RMQ 产品页面的访问地址
    'endPoint' = 'http://rocketmq-cnng535a1c59df3d.rocketmq.ivolces.com:9876',
    -- 填写访问的 topic
    'topic' = 'flink-rmq-test-topic',
    -- 填写 group id,可以在 RMQ 产品页面注册好
    'group' = 'GID-flink_rmq_connector_test_group',
    -- 填写有读权限的 AK SK,在 RMQ 产品页面的秘钥管理那边可以查到
    'accessId' = 'xxx',
    'accessKey' = 'xxx',
    -- 写入失败重试次数 默认 10次
    'retryTimes' = '10',
    -- 写入失败重试间隔 默认 5s
    'sleepTimeMs' = '5000',
    -- 写入格式 json
    'format' = 'json'
);

insert into rmq_sink
select id, name from src;

WITH 参数

通用

参数

是否必填

默认值

数据类型

描述

connector

String

指定使用的连接器,此处是 rocketmq 连接器。

endPoint

String

消息队列 RocketMQ 版产品页面的访问地址,比如 http://rocketmq-xxx.rocketmq.ivolces.com:9876

topic

String

访问的 topic 名称

group

String

Consumer 或 Producer 的组名。

accessId

String

填写有读权限的 AK,可以在消息队列 RocketMQ 版产品页面的秘钥管理里查看

accessKey

String

填写有读权限的 SK,可以在消息队列 RocketMQ 版产品页面的秘钥管理里查看

format

String

指定使用的 Flink format

源表独有

参数

是否必填

默认值

数据类型

描述

pullIntervalMs

Int

上游没有数据可供消费时,source的休眠时间。单位为毫秒。目前没有限流机制,无法设置读取RocketMQ的速率。

结果表独有

参数

是否必填

默认值

数据类型

描述

retryTimes

10

Int

写入的重试次数。

sleepTimeMs

5000

Long

重试间隔时间。