You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Connector 参考
RocketMQ
复制全文
下载 pdf
RocketMQ

RocketMQ 连接器提供从 RocketMQ Topic 中消费和写入数据的能力,支持做数据源表和结果表。您可以创建 source 流从 RocketMQ Topic 中获取数据,作为作业的输入数据;也可以通过 RocketMQ 结果表将作业输出数据写入到 RocketMQ Topic 中。

注意事项

  • RocketMQ 连接器目前仅支持在 Flink 1.16-volcano 及以上引擎版本中使用
  • 当前暂不支持 MessageQueue 自动发现,如果 RocketMQ 有扩容,请在扩容后停止作业后再次启动作业,启动作业选择从上次状态恢复(从下次成功 checkpoint 后重启可以减少重复)
  • 注意请打开 checkpoint,否则数据消费没法保证不丢不重

DDL 定义

用作数据源(Source)

create table rmq_src (
    `id` BIGINT,
    `name` VARCHAR
) with (
    -- connector 名称,固定为 rocketmq
    'connector' = 'rocketmq',  
    -- 填写 RMQ 产品页面的访问地址
    'endPoint' = 'http://rocketmq-cnng535a1c59df3d.rocketmq.ivolces.com:9876',
    -- 填写访问的 topic
    'topic' = 'flink-rmq-test-topic',
    -- 填写 group id,可以在 RMQ 产品页面注册好
    'group' = 'GID-flink_rmq_connector_test_group',
    -- 填写有读权限的 AK/SK,在 RMQ 产品页面的秘钥管理那边可以查到
    'accessId' = 'xxx',
    'accessKey' = 'xxx',
    -- 填写 Flink 内置的 format
    'format' = 'json',
    -- 上游没有数据可供消费时,source 的休眠时间
    'pullIntervalMs' = '500'
);

create table print_sink (
    `id` BIGINT,
    `name` VARCHAR
) with (
    'connector' = 'print'
);

insert into print_sink
select * from rmq_src;

用作数据目的(Sink)

create table src (
    `id` BIGINT,
    `name` VARCHAR,
    proc_time as proctime()
) with (
    'connector' = 'datagen',
    'rows-per-second' = '3',
    'fields.id.min' = '1',
    'fields.id.max' = '20',
    'fields.name.length' = '3'
);

create table rmq_sink (
    `id` BIGINT,
    `name` VARCHAR
) with (
    -- connector 名称,固定为 rocketmq
    'connector' = 'rocketmq',  
    -- 填写 RMQ 产品页面的访问地址
    'endPoint' = 'http://rocketmq-cnng535a1c59df3d.rocketmq.ivolces.com:9876',
    -- 填写访问的 topic
    'topic' = 'flink-rmq-test-topic',
    -- 填写 group id,可以在 RMQ 产品页面注册好
    'group' = 'GID-flink_rmq_connector_test_group',
    -- 填写有读权限的 AK SK,在 RMQ 产品页面的秘钥管理那边可以查到
    'accessId' = 'xxx',
    'accessKey' = 'xxx',
    -- 写入失败重试次数 默认 10次
    'retryTimes' = '10',
    -- 写入失败重试间隔 默认 5s
    'sleepTimeMs' = '5000',
    -- 写入格式 json
    'format' = 'json'
);

insert into rmq_sink
select id, name from src;

WITH 参数

通用

参数

是否必填

默认值

数据类型

描述

connector

String

指定使用的连接器,此处是 rocketmq 连接器。

endPoint

String

消息队列 RocketMQ 版产品页面的访问地址,比如 http://rocketmq-xxx.rocketmq.ivolces.com:9876

topic

String

访问的 topic 名称

group

String

Consumer 或 Producer 的组名。

accessId

String

填写有读权限的 AK,可以在消息队列 RocketMQ 版产品页面的秘钥管理里查看

accessKey

String

填写有读权限的 SK,可以在消息队列 RocketMQ 版产品页面的秘钥管理里查看

format

String

指定使用的 Flink format

源表独有

参数

是否必填

默认值

数据类型

描述

pullIntervalMs

Int

上游没有数据可供消费时,source的休眠时间。单位为毫秒。目前没有限流机制,无法设置读取RocketMQ的速率。

结果表独有

参数

是否必填

默认值

数据类型

描述

retryTimes

10

Int

写入的重试次数。

sleepTimeMs

5000

Long

重试间隔时间。

最近更新时间:2025.05.07 13:42:44
这个页面对您有帮助吗?
有用
有用
无用
无用