You need to enable JavaScript to run this app.
导航
Elasticsearch
最近更新时间:2024.10.23 19:54:09首次发布时间:2022.09.08 17:27:42

Elasticsearch 连接器提供了对 Elasticsearch 数据分析引擎的写入能力,仅支持做数据结果表。流式计算 Flink 版支持 Elasticsearch-6 和 Elasticsearch-7 两个版本,部分配置存在差异,请注意区分。

DDL 定义

CREATE TABLE elasticsearch_sink (
  user_id STRING,
  user_name STRING,
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED  
)  WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = '<yourHosts>',
  'index' = '<yourIndex>',
  'document-type' = '<yourDocumentType>',
  'username' ='<yourUsername>',
  'password' ='<yourPassword>'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Elasticsearch-6 或 Elasticsearch-7 连接器。
连接器版本与集群版本需要保持一致,以避免出现不兼容问题。

hosts

(none)

String

Elasticsearch 主机地址,支持 HTTP 和 HTTPS 协议。

index

(none)

String

索引目录。支持静态索引和动态索引两种方式。

  • 静态索引:静态索引取值必须是纯字符串,如myIndex,所有数据都被写入到 myIndex 索引下。
  • 动态索引:使用{field_name}引用记录中的字段值以动态生成目标索引。

您还可以使用 {field_name|date_format_string} 将TIMESTAMP、DATE 和 TIME 类型的字段值转换为 date_format_string 指定的格式。
例如,设置为 {log_ts|yyyy-MM-dd},则 log_ts 字段值为 2022-06-14 11:45:55 的记录将被写入 2022-06-14 索引。

document-type

Elasticsearch-6 中必选

(none)

String

文档类型。
Elasticsearch-7 中不支持。

document-id.key-delimiter

_

String

文档 ID 的分隔符,默认为_
如果设置了主键,则是主键的分隔符。例如,最终写入 Elasticsearch 的 _id 是 "1_2_3"。

failure-handler

fail

String

Elasticsearch 请求失败时的故障处理策略。取值如下:

  • fail:默认值,如果请求失败,则作业失败。
  • ignore:忽略失败并删除请求。
  • retry-rejected:重新写入该记录。
  • custom class name:用于使用 ActionRequestFailureHandler 子类进行故障处理。

sink.flush-on-checkpoint

true

Boolean

是否在 Checkpoint 时执行 Flush。
如果禁用该功能,在进行Checkpoint时,连接器将不等待所有请求完成。

sink.bulk-flush.max-actions

1000

Integer

批量写入的最大条数。 设置为0时,表示禁用批量写入功能。

sink.bulk-flush.max-size

2mb

MemorySize

批量写入时的最大缓存量,单位是 MB。设置为0时,表示禁用批量写入功能。

sink.bulk-flush.interval

1s

Duration

批量写入时的刷新间隔。

sink.bulk-flush.backoff.strategy

DISABLED

String

如果由于临时请求错误导致 Flush 操作失败,则可以指定重试策略。取值如下:

  • DISABLED(默认值):不执行重试,Flush 操作失败后保持状态。
  • CONSTANT:等待 sink.bulk-flush.backoff.delay 设置的时间后重试。
  • EXPONENTIAL:指数回退,即每次回退等待时间指数递增。

sink.bulk-flush.backoff.max-retries

8

Integer

批量写入失败,最大的重试次数。

sink.bulk-flush.backoff.delay

50ms

Duration

批量写入失败,重试的时间间隔(对于 CONSTANT 重试策略);也可理解为重试的间隔时间基数(对于 EXPONENTIAL 重试策略)。

connection.max-retry-timeout

(none)

Duration

重试请求的最大超时时长。

connection.path-prefix

(none)

String

添加到每个 REST 请求中的前缀字符串,一般不设置。

format

json

String

指定输出的格式,默认是内置的json格式。

示例代码

  • Elasticsearch-6 结果表

    create table orders (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '100',
        'fields.order_status.length' = '3',
        'fields.order_id.min' = '1',
        'fields.order_id.max' = '10000',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'
      );
    
    
    create table es_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp
      )
    WITH
      (
        'connector' = 'elasticsearch-6',
        'hosts' = 'https://elasticsearch-wu****l8e.escloud.ivolces.com:9200',
        'index' = 'test_orders',
        'document-type' = 'doc',
        'username' = 'admin', 
        'password' = 'cd****456' 
      );
    
    insert into es_sink
    select * from orders;
    
  • Elasticsearch-7 结果表

    create table orders (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.order_status.length' = '3',
        'fields.order_id.min' = '1',
        'fields.order_id.max' = '10000',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'
      );
    
    
    create table es_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp
      )
    WITH
      (
        'connector' = 'elasticsearch-7',
        'hosts' = 'https://elasticsearch-wu****l8e.escloud.ivolces.com:9200',
        'index' = 'test_orders',
        'username' = 'admin', 
        'password' = 'cd****456' 
      );
    
    insert into es_sink
    select * from orders;