RocketMQ 连接器提供从 RocketMQ Topic 中消费和写入数据的能力,支持做数据源表和结果表。您可以创建 source 流从 RocketMQ Topic 中获取数据,作为作业的输入数据;也可以通过 RocketMQ 结果表将作业输出数据写入到 RocketMQ Topic 中。
MessageQueue
自动发现,如果 RocketMQ 有扩容,请在扩容后停止作业后再次启动作业,启动作业选择从上次状态恢复(从下次成功 checkpoint 后重启可以减少重复)create table rmq_src ( `id` BIGINT, `name` VARCHAR ) with ( -- connector 名称,固定为 rocketmq 'connector' = 'rocketmq', -- 填写 RMQ 产品页面的访问地址 'endPoint' = 'http://rocketmq-cnng535a1c59df3d.rocketmq.ivolces.com:9876', -- 填写访问的 topic 'topic' = 'flink-rmq-test-topic', -- 填写 group id,可以在 RMQ 产品页面注册好 'group' = 'GID-flink_rmq_connector_test_group', -- 填写有读权限的 AK/SK,在 RMQ 产品页面的秘钥管理那边可以查到 'accessId' = 'xxx', 'accessKey' = 'xxx', -- 填写 Flink 内置的 format 'format' = 'json', -- 上游没有数据可供消费时,source 的休眠时间 'pullIntervalMs' = '500' ); create table print_sink ( `id` BIGINT, `name` VARCHAR ) with ( 'connector' = 'print' ); insert into print_sink select * from rmq_src;
create table src ( `id` BIGINT, `name` VARCHAR, proc_time as proctime() ) with ( 'connector' = 'datagen', 'rows-per-second' = '3', 'fields.id.min' = '1', 'fields.id.max' = '20', 'fields.name.length' = '3' ); create table rmq_sink ( `id` BIGINT, `name` VARCHAR ) with ( -- connector 名称,固定为 rocketmq 'connector' = 'rocketmq', -- 填写 RMQ 产品页面的访问地址 'endPoint' = 'http://rocketmq-cnng535a1c59df3d.rocketmq.ivolces.com:9876', -- 填写访问的 topic 'topic' = 'flink-rmq-test-topic', -- 填写 group id,可以在 RMQ 产品页面注册好 'group' = 'GID-flink_rmq_connector_test_group', -- 填写有读权限的 AK SK,在 RMQ 产品页面的秘钥管理那边可以查到 'accessId' = 'xxx', 'accessKey' = 'xxx', -- 写入失败重试次数 默认 10次 'retryTimes' = '10', -- 写入失败重试间隔 默认 5s 'sleepTimeMs' = '5000', -- 写入格式 json 'format' = 'json' ); insert into rmq_sink select id, name from src;
参数 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | 无 | String | 指定使用的连接器,此处是 rocketmq 连接器。 |
endPoint | 是 | 无 | String | 消息队列 RocketMQ 版产品页面的访问地址,比如 http://rocketmq-xxx.rocketmq.ivolces.com:9876 |
topic | 是 | 无 | String | 访问的 topic 名称 |
group | 是 | 无 | String | Consumer 或 Producer 的组名。 |
accessId | 是 | 无 | String | 填写有读权限的 AK,可以在消息队列 RocketMQ 版产品页面的秘钥管理里查看 |
accessKey | 是 | 无 | String | 填写有读权限的 SK,可以在消息队列 RocketMQ 版产品页面的秘钥管理里查看 |
format | 是 | 无 | String | 指定使用的 Flink format |
参数 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
pullIntervalMs | 是 | 无 | Int | 上游没有数据可供消费时,source的休眠时间。单位为毫秒。目前没有限流机制,无法设置读取RocketMQ的速率。 |
参数 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
retryTimes | 否 | 10 | Int | 写入的重试次数。 |
sleepTimeMs | 否 | 5000 | Long | 重试间隔时间。 |