You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Connector 参考
Elasticsearch
复制全文
下载 pdf
Elasticsearch

Elasticsearch 连接器提供了对 Elasticsearch 数据分析引擎的写入能力,仅支持做数据结果表。流式计算 Flink 版支持 Elasticsearch-6 和 Elasticsearch-7 两个版本,部分配置存在差异,请注意区分。

DDL 定义

CREATE TABLE elasticsearch_sink (
  user_id STRING,
  user_name STRING,
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED  
)  WITH (
  'connector' = 'elasticsearch-6',
  'hosts' = '<yourHosts>',
  'index' = '<yourIndex>',
  'document-type' = '<yourDocumentType>',
  'username' ='<yourUsername>',
  'password' ='<yourPassword>'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 Elasticsearch-6 或 Elasticsearch-7 连接器。
连接器版本与集群版本需要保持一致,以避免出现不兼容问题。

hosts

(none)

String

Elasticsearch 主机地址,支持 HTTP 和 HTTPS 协议。

index

(none)

String

索引目录。支持静态索引和动态索引两种方式。

  • 静态索引:静态索引取值必须是纯字符串,如myIndex,所有数据都被写入到 myIndex 索引下。
  • 动态索引:使用{field_name}引用记录中的字段值以动态生成目标索引。

您还可以使用 {field_name|date_format_string} 将TIMESTAMP、DATE 和 TIME 类型的字段值转换为 date_format_string 指定的格式。
例如,设置为 {log_ts|yyyy-MM-dd},则 log_ts 字段值为 2022-06-14 11:45:55 的记录将被写入 2022-06-14 索引。

document-type

Elasticsearch-6 中必选

(none)

String

文档类型。
Elasticsearch-7 中不支持。

document-id.key-delimiter

_

String

文档 ID 的分隔符,默认为_
如果设置了主键,则是主键的分隔符。例如,最终写入 Elasticsearch 的 _id 是 "1_2_3"。

failure-handler

fail

String

Elasticsearch 请求失败时的故障处理策略。取值如下:

  • fail:默认值,如果请求失败,则作业失败。
  • ignore:忽略失败并删除请求。
  • retry-rejected:重新写入该记录。
  • custom class name:用于使用 ActionRequestFailureHandler 子类进行故障处理。

sink.flush-on-checkpoint

true

Boolean

是否在 Checkpoint 时执行 Flush。
如果禁用该功能,在进行Checkpoint时,连接器将不等待所有请求完成。

sink.bulk-flush.max-actions

1000

Integer

批量写入的最大条数。 设置为0时,表示禁用批量写入功能。

sink.bulk-flush.max-size

2mb

MemorySize

批量写入时的最大缓存量,单位是 MB。设置为0时,表示禁用批量写入功能。

sink.bulk-flush.interval

1s

Duration

批量写入时的刷新间隔。

sink.bulk-flush.backoff.strategy

DISABLED

String

如果由于临时请求错误导致 Flush 操作失败,则可以指定重试策略。取值如下:

  • DISABLED(默认值):不执行重试,Flush 操作失败后保持状态。
  • CONSTANT:等待 sink.bulk-flush.backoff.delay 设置的时间后重试。
  • EXPONENTIAL:指数回退,即每次回退等待时间指数递增。

sink.bulk-flush.backoff.max-retries

8

Integer

批量写入失败,最大的重试次数。

sink.bulk-flush.backoff.delay

50ms

Duration

批量写入失败,重试的时间间隔(对于 CONSTANT 重试策略);也可理解为重试的间隔时间基数(对于 EXPONENTIAL 重试策略)。

connection.max-retry-timeout

(none)

Duration

重试请求的最大超时时长。

connection.path-prefix

(none)

String

添加到每个 REST 请求中的前缀字符串,一般不设置。

format

json

String

指定输出的格式,默认是内置的json格式。

示例代码

  • Elasticsearch-6 结果表

    create table orders (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '100',
        'fields.order_status.length' = '3',
        'fields.order_id.min' = '1',
        'fields.order_id.max' = '10000',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'
      );
    
    
    create table es_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp
      )
    WITH
      (
        'connector' = 'elasticsearch-6',
        'hosts' = 'https://elasticsearch-wu****l8e.escloud.ivolces.com:9200',
        'index' = 'test_orders',
        'document-type' = 'doc',
        'username' = 'admin', 
        'password' = 'cd****456' 
      );
    
    insert into es_sink
    select * from orders;
    
  • Elasticsearch-7 结果表

    create table orders (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.order_status.length' = '3',
        'fields.order_id.min' = '1',
        'fields.order_id.max' = '10000',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'
      );
    
    
    create table es_sink (
        order_id bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time timestamp
      )
    WITH
      (
        'connector' = 'elasticsearch-7',
        'hosts' = 'https://elasticsearch-wu****l8e.escloud.ivolces.com:9200',
        'index' = 'test_orders',
        'username' = 'admin', 
        'password' = 'cd****456' 
      );
    
    insert into es_sink
    select * from orders;
    
最近更新时间:2024.10.23 19:54:09
这个页面对您有帮助吗?
有用
有用
无用
无用