You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Connector 参考
Redis
复制全文
下载 pdf
Redis

Redis 连接器提供了对 Redis 缓存数据库的写入能力,支持做数据结果表和维表。
使用 Redis 连接器做数据结果表和维表时,有不同的扩展优势:

类型

优势

结果表

  • String 类型数据支持按照指定格式序列化写入。
  • 写入支持设置数据的过期时间。
  • 支持 batch 写入。

维表

  • String 类型数据支持按照指定格式反序列化读出。
  • 支持开启缓存。
  • 支持延迟 join。

DDL 定义

CREATE TABLE redis_sink (
  key VARCHAR PRIMARY KEY NOT ENFORCED,
  val VARCHAR
) WITH (
  'connector' = 'redis',
  'redis-mode' = 'single-node',
  'host' = '172.0.0.1',
  'port' = '6379',
  'value-type' = 'string'
);

WITH参数

通用参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Redis 连接器。

value-type

string

Enum

Redis 数据库支持的数据类型。取值如下:

  • String:基本的字符串(string)类型。
    • 结果表支持setsetexincrbyincrbyfloat命令。
    • 维表支持get命令。
  • Hash:哈希类型。
    • 结果表支持hmsethincrbyhincrbyfloat命令。
    • 维表支持hmget命令。
  • List:列表类型。
    • 结果表支持lpush命令。
    • 维表支持lrange命令。
  • Set:集合类型。
    • 结果表支持sadd命令。
    • 维表支持smembers命令。
  • zset:有序集合(Sorted Set)类型。
    • 结果表支持zadd命令。
    • 维表支持zrange命令。

format

(none)

String

数据类型为 String 的时候,序列化和反序列化的格式,取值为:json、pb 等。

说明

如果配置 format ,则无需配置 value-type。

redis-mode

single-node

Enum

连接 Redis 数据库的模式,可选类型:

  • single-node (单点模式)
  • cluster (集群模式)
  • sentinel (sentinel 高可用模式)

host

(单点模式必须)

172.0.0.1

String

单点模式下的 Redis Server 地址,如172.0.0.1

port

(单点模式必须)

6379

String

单点模式下的 Redis Server 连接端口,默认值为6379。

cluster.nodes

(集群模式必须)

(none)

String

集群模式的 host 列表,为分号分隔的 host 和 ip 列表,host 和 ip 之间用分号(;)间隔。例如:“127.0..:6379;10.133..:6379;10.248..:6379”

master.name

(sentinel模式必须)

(none)

String

sentinel 模式的 mater 名称。

sentinels.info

(sentinel模式必须)

(none)

String

sentinel 模式的 sentinel 信息,分号分隔的列表。

db

0

Integer

连接的 Redis 数据库。

password

(none)

String

Redis 数据库登录密码。默认值为空,表示不进行权限验证。
如果 Redis 集群需要鉴权认证,则需要填写密码。

connection.timeout

2 seconds

Duration

连接超时时间。

connection.socket.timeout

2 seconds

Duration

socket 超时时间。

connection.max-retries

5

Integer

连接失败最大重试次数。

connection.max-total-num

5

Integer

最大连接数。

connection.max-idle-num

5

Integer

最大空闲连接数。

connection.min-idle-num

5

Integer

最小空闲连接数。

rate-limit-num

(none)

Integer

作业的限速配置。
例如,该参数设置为 1000000,表示整个算子的总 OPS 不超过 100W。

说明

限速会在各个 subtask 之间平均分配额度,考虑到数据均衡和 quota 计算整除,限速一定不要设置太小。

结果表参数

参数

是否必选

默认值

数据类型

描述

sink.mode

insert

Enum

数据写入模式。取值如下:

  • insert (默认)
  • incr (增量写入模式)

说明

incr 增量写入模式用于支持 String 和 Hash 的 incrby 相关场景使用,其他场景写入请使用默认值。

sink.record.ttl

(none)

Duration

写入数据的过期时间,默认为永久保存。
设置时,支持填写以下单位:

  • DAYS:比1 d1 day
  • HOURS:比如1 h1 hour
  • MINUTES:比如1 min1 minute
  • SECONDS:比如1 s1 sec1 second
  • MILLISECONDS:比如1 ms1 milli1 millisecond
  • MICROSECONDS:比如1 µs1 micro1 microsecond
  • NANOSECONDS:比如1 ns1 nano1 nanosecond

说明

设置时如果不写单位,则默认为毫秒(milliseconds);单位不区分大小写,数字与单位间的空格可省略。

sink.max-retries

5

Integer

写入失败的最大重试次数。

value.format.skip-key

true

Boolean

序列化写入的时候 value 中是否包含主键。

  • true:表示不包含主键。
  • false:包含主键。

sink.ignore-delete

true

Boolean

是否过滤掉上游 retract 消息。

sink.ignore-null

false

Boolean

是否过滤掉 null 值写入。

sink.buffer-flush.max-rows

50

Integer

Batch 写入记录数:用于确定在写入 Redis 之前进行攒批操作的最大记录数。当记录数超过此配置值时,会将数据写入 Redis。可设为 0 来禁用此攒批写入功能。

sink.buffer-flush.interval

2s

Duration

Batch 写入间隔时间:用于确定异步线程向 Redis 写入数据的时间间隔。当达到该间隔时,数据会被写入 Redis。可设为 0 来禁用此定时写入功能。

维表参数

参数

是否必选

默认值

数据类型

描述

lookup.cache.max-rows

(none)

Integer

维表缓存的最大数据条数。

说明

缓存默认关闭,如需开启,lookup.cache.max-rowslookup.cache.ttl两个参数均需要配置。

lookup.cache.ttl

(none)

Duration

维表缓存的过期时间。

说明

缓存默认关闭,如需开启,lookup.cache.max-rowslookup.cache.ttl两个参数均需要配置。

lookup.max-retries

3

Integer

维表查询失败的最大重试次数。

lookup.cache-null-value

true

Boolean

是否缓存 null 数据,仅在开启缓存时才生效。

lookup.later-join-retry-times

0

Duration

延迟 Join 的间隔时间。

lookup.later-join-latency

1

Integer

延迟 Join 的重试次数。

lookup.enable-input-keyby

false

Boolean

上游数据是否按照 Join 的 key 进行 hash 下发到 Join 算子。

示例代码

单点模式

create table datagen_source (
    key varchar,
    val varchar
) with (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.key.length' = '2',
    'fields.val.length' = '5'
 );
 
 create table redis_sink (
     key varchar PRIMARY KEY NOT ENFORCED,
     val varchar
 ) with (
     'connector' = 'redis',
     'redis-mode' = 'single-node',
     'host' = '172.0.0.1',
     'port' = '6379',
     'password' = 'Pw**5',
     'value-type' = 'string'
);

insert into redis_sink
select * from datagen_source;

集群模式

create table datagen_source (
    key varchar,
    val varchar
) with (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.key.length' = '2',
    'fields.val.length' = '5'
 );
 
 create table redis_sink (
     key varchar PRIMARY KEY NOT ENFORCED,
     val varchar
 ) with (
     'connector' = 'redis',
     'redis-mode' = 'cluster',
     'cluster.nodes' = 'redis-cnl**ymz.redis.ivolces.com:6379',
     'password' = 'Pw**5',
     'value-type' = 'string'
);

insert into redis_sink
select * from datagen_source;

String 数据类型

  • 指定 Format

    create table redis_tb (
      id int primary key not enforced,
      name varchar,
      age int,
      gender boolean,
      description varchar
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'format' = 'json'
    )
    
  • 增量写入

    create table redis_tb (
      id int primary key not enforced,
      val long
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'value-type' = 'string',
      'sink.mode' = 'incr'
    )
    

Hash 数据类型

  • 普通读写

    create table redis_tb (
      key int primary key not enforced,
      field1 varchar,
      field2 int,
      field3 boolean
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'value-type' = 'hash'
    )
    
  • 增量写入

    create table redis_tb (
      key int primary key not enforced,
      field1 varchar,
      val long,
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'value-type' = 'hash',
      'sink.mode' = 'incr'
    )
    

List 和 Set 数据类型

普通读写

  • item 字段用于写入。
  • vals 字段用于维表读。
create table redis_tb (
  key int primary key not enforced,
  item varchar,
  vals array<varchar>
) with (
  'connector' = 'redis',
  'redis-mode' = 'single-node',
  'host' = '172.0.0.1',
  'port' = '6379',
  'password' = 'Pw**5',
  'value-type' = 'list'
)

ZSet 数据类型

普通读写

  • score 和 item 字段用于写入。
  • vals 字段用于维表读。
create table redis_tb (
  key int primary key not enforced,
  score double,
  item varchar,
  vals array<varchar>
) with (
  'connector' = 'redis',
  'redis-mode' = 'single-node',
  'host' = '172.0.0.1',
  'port' = '6379',
  'password' = 'Pw**5',
  'value-type' = 'zset'
)
最近更新时间:2024.09.03 20:33:29
这个页面对您有帮助吗?
有用
有用
无用
无用