You need to enable JavaScript to run this app.
导航

Redis

最近更新时间2024.02.02 16:34:47

首次发布时间2023.09.12 16:22:50

Redis 连接器提供了对 Redis 缓存数据库的写入能力,支持做数据结果表和维表。
使用 Redis 连接器做数据结果表和维表时,有不同的扩展优势:

类型

优势

结果表

  • String 类型数据支持按照指定格式序列化写入。
  • 写入支持设置数据的过期时间。
  • 支持 batch 写入。

维表

  • String 类型数据支持按照指定格式反序列化读出。
  • 支持开启缓存。
  • 支持延迟 join。

DDL 定义

CREATE TABLE redis_sink (
  key VARCHAR PRIMARY KEY NOT ENFORCED,
  val VARCHAR
) WITH (
  'connector' = 'redis',
  'redis-mode' = 'single-node',
  'host' = '172.0.0.1',
  'port' = '6379',
  'value-type' = 'string'
);

WITH参数

通用参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Redis 连接器。

value-type

string

Enum

Redis 数据库支持的数据类型。取值如下:

  • String:基本的字符串(string)类型。
    • 结果表支持setsetexincrbyincrbyfloat命令。
    • 维表支持get命令。
  • Hash:哈希类型。
    • 结果表支持hmsethincrbyhincrbyfloat命令。
    • 维表支持hmget命令。
  • List:列表类型。
    • 结果表支持lpush命令。
    • 维表支持lrange命令。
  • Set:集合类型。
    • 结果表支持sadd命令。
    • 维表支持smembers命令。
  • zset:有序集合(Sorted Set)类型。
    • 结果表支持zadd命令。
    • 维表支持zrange命令。

format

(none)

String

数据类型为 String 的时候,序列化和反序列化的格式,取值为:json、pb 等。

说明

如果配置 format ,则无需配置 value-type。

redis-mode

single-node

Enum

连接 Redis 数据库的模式,可选类型:

  • single-node (单点模式)
  • cluster (集群模式)
  • sentinel (sentinel 高可用模式)

host

(单点模式必须)

172.0.0.1

String

单点模式下的 Redis Server 地址,如172.0.0.1

port

(单点模式必须)

6379

String

单点模式下的 Redis Server 连接端口,默认值为6379。

cluster.nodes

(集群模式必须)

(none)

String

集群模式的 host 列表,为分号分隔的 host 和 ip 列表,host 和 ip 之间用分号(;)间隔。例如:“127.0..:6379;10.133..:6379;10.248..:6379”

master.name

(sentinel模式必须)

(none)

String

sentinel 模式的 mater 名称。

sentinels.info

(sentinel模式必须)

(none)

String

sentinel 模式的 sentinel 信息,分号分隔的列表。

db

0

Integer

连接的 Redis 数据库。

password

(none)

String

Redis 数据库登录密码。默认值为空,表示不进行权限验证。
如果 Redis 集群需要鉴权认证,则需要填写密码。

connection.timeout

2 seconds

Duration

连接超时时间。

connection.socket.timeout

2 seconds

Duration

socket 超时时间。

connection.max-retries

5

Integer

连接失败最大重试次数。

connection.max-total-num

5

Integer

最大连接数。

connection.max-idle-num

5

Integer

最大空闲连接数。

connection.min-idle-num

5

Integer

最小空闲连接数。

rate-limit-num

(none)

Integer

作业的限速配置。
例如,该参数设置为 1000000,表示整个算子的总 OPS 不超过 100W。

说明

限速会在各个 subtask 之间平均分配额度,考虑到数据均衡和 quota 计算整除,限速一定不要设置太小。

结果表参数

参数

是否必选

默认值

数据类型

描述

sink.mode

insert

Enum

数据写入模式。取值如下:

  • insert (默认)
  • incr (增量写入模式)

说明

incr 增量写入模式用于支持 String 和 Hash 的 incrby 相关场景使用,其他场景写入请使用默认值。

sink.record.ttl

(none)

Duration

写入数据的过期时间,默认为永久保存。
设置时,支持填写以下单位:

  • DAYS:比1 d1 day
  • HOURS:比如1 h1 hour
  • MINUTES:比如1 min1 minute
  • SECONDS:比如1 s1 sec1 second
  • MILLISECONDS:比如1 ms1 milli1 millisecond
  • MICROSECONDS:比如1 µs1 micro1 microsecond
  • NANOSECONDS:比如1 ns1 nano1 nanosecond

说明

设置时如果不写单位,则默认为毫秒(milliseconds);单位不区分大小写,数字与单位间的空格可省略。

sink.max-retries

5

Integer

写入失败的最大重试次数。

value.format.skip-key

true

Boolean

序列化写入的时候 value 中是否包含主键。

  • true:表示不包含主键。
  • false:包含主键。

sink.ignore-delete

true

Boolean

是否过滤掉上游 retract 消息。

sink.ignore-null

false

Boolean

是否过滤掉 null 值写入。

维表参数

参数

是否必选

默认值

数据类型

描述

lookup.cache.max-rows

(none)

Integer

维表缓存的最大数据条数。

说明

缓存默认关闭,如需开启,lookup.cache.max-rowslookup.cache.ttl两个参数均需要配置。

lookup.cache.ttl

(none)

Duration

维表缓存的过期时间。

说明

缓存默认关闭,如需开启,lookup.cache.max-rowslookup.cache.ttl两个参数均需要配置。

lookup.max-retries

3

Integer

维表查询失败的最大重试次数。

lookup.cache-null-value

true

Boolean

是否缓存 null 数据,仅在开启缓存时才生效。

lookup.later-join-retry-times

0

Duration

延迟 Join 的间隔时间。

lookup.later-join-latency

1

Integer

延迟 Join 的重试次数。

lookup.enable-input-keyby

false

Boolean

上游数据是否按照 Join 的 key 进行 hash 下发到 Join 算子。

示例代码

单点模式

create table datagen_source (
    key varchar,
    val varchar
) with (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.key.length' = '2',
    'fields.val.length' = '5'
 );
 
 create table redis_sink (
     key varchar PRIMARY KEY NOT ENFORCED,
     val varchar
 ) with (
     'connector' = 'redis',
     'redis-mode' = 'single-node',
     'host' = '172.0.0.1',
     'port' = '6379',
     'password' = 'Pw**5',
     'value-type' = 'string'
);

insert into redis_sink
select * from datagen_source;

集群模式

create table datagen_source (
    key varchar,
    val varchar
) with (
    'connector' = 'datagen',
    'rows-per-second' = '5',
    'fields.key.length' = '2',
    'fields.val.length' = '5'
 );
 
 create table redis_sink (
     key varchar PRIMARY KEY NOT ENFORCED,
     val varchar
 ) with (
     'connector' = 'redis',
     'redis-mode' = 'cluster',
     'cluster.nodes' = 'redis-cnl**ymz.redis.ivolces.com:6379',
     'password' = 'Pw**5',
     'value-type' = 'string'
);

insert into redis_sink
select * from datagen_source;

String 数据类型

  • 指定 Format

    create table redis_tb (
      id int primary key not enforced,
      name varchar,
      age int,
      gender boolean,
      description varchar
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'format' = 'json'
    )
    
  • 增量写入

    create table redis_tb (
      id int primary key not enforced,
      val long
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'value-type' = 'string',
      'sink.mode' = 'incr'
    )
    

Hash 数据类型

  • 普通读写

    create table redis_tb (
      key int primary key not enforced,
      field1 varchar,
      field2 int,
      field3 boolean
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'value-type' = 'hash'
    )
    
  • 增量写入

    create table redis_tb (
      key int primary key not enforced,
      field1 varchar,
      val long,
    ) with (
      'connector' = 'redis',
      'redis-mode' = 'single-node',
      'host' = '172.0.0.1',
      'port' = '6379',
      'password' = 'Pw**5',
      'value-type' = 'hash',
      'sink.mode' = 'incr'
    )
    

List 和 Set 数据类型

普通读写

  • item 字段用于写入。
  • vals 字段用于维表读。
create table redis_tb (
  key int primary key not enforced,
  item varchar,
  vals array<varchar>
) with (
  'connector' = 'redis',
  'redis-mode' = 'single-node',
  'host' = '172.0.0.1',
  'port' = '6379',
  'password' = 'Pw**5',
  'value-type' = 'list'
)

ZSet 数据类型

普通读写

  • score 和 item 字段用于写入。
  • vals 字段用于维表读。
create table redis_tb (
  key int primary key not enforced,
  score double,
  item varchar,
  vals array<varchar>
) with (
  'connector' = 'redis',
  'redis-mode' = 'single-node',
  'host' = '172.0.0.1',
  'port' = '6379',
  'password' = 'Pw**5',
  'value-type' = 'zset'
)