You need to enable JavaScript to run this app.
导航

使用 Logstash 跨集群迁移数据

最近更新时间2024.01.08 11:53:49

首次发布时间2021.11.02 17:52:23

Logstash 是一款开源的数据收集引擎,具备实时管道处理能力,能够同时从多个来源采集数据、转换数据,然后将数据发送到 Elasticsearch 中。本文介绍如何通过 Logstash 进行跨集群的数据迁移。

数据迁移流程图

图片

注意事项

  • 在进行数据迁移前,请确保源集群、Logstash 服务、目的集群网络互通。
  • 火山引擎云搜索服务暂时还未支持 Logstash 服务,需要手动下载并安装 Logstash,然后配置迁移任务。

步骤一:安装 Logstash

火山引擎云搜索服务支持 ES 集群和 OpenSearch 集群,请根据您的集群类型选择下载安装正确版本的 Logstash。如何安装 Logstash,请参见开源文档installing-logstash

  • 在 ES 集群间进行数据迁移,建议下载安装Logstash 7.10.2,该版本可以适配 ES 2、ES 5、ES 6、ES 7 版本之间的数据迁移。
  • 在 OpenSearch 集群间进行数据迁移,建议下载安装Logstash 8.10.0
    在 OpenSearch 集群安装 Logstash 服务,还需要安装logstash-input-opensearchlogstash-output-opensearch插件。安装命令如下:
    bin/logstash-plugin install logstash-input-opensearch
    bin/logstash-plugin install logstash-output-opensearch
    

步骤二:获取云搜索服务集群配置信息

在配置数据迁移过程中,需要使用到云搜索服务集群的配置信息,包括实例访问地址、证书等信息。

  1. 登录云搜索服务控制台
  2. 在顶部导航栏,选择目标实例所在的地域。
  3. 实例列表页面,单击目标实例名称。
  4. 在实例详情页面的服务访问区域,获取实例公网访问地址,然后下载 HTTPS 证书。
    如果还未给实例绑定公网地址,请参见开启实例公网访问
    图片

步骤三:获取索引元数据迁移脚本

迁移数据前,首先迁移索引的settingsmappingaliases元信息。
本文提供一个 Python 脚本代码,可以将其命名为migrate_index_meta.py,需使用 Python 3 执行脚本。

说明

  • 以下示例支持 HTTPS,但不校验 ESCloud 服务端的 CA 证书。
  • 迁移时索引默认number_of_replicas为 0 和refresh_interval为 60s。
  • 如需更新索引的其他 settings,可使用脚本的 default_index_settings配置项,配置项输入为 json 字符串。default_index_settings会和原索引 settings 合并,并覆盖原索引 settings 已有的同一配置项,当default_index_settings的配置项值为 null 时,会在同步索引时删除 settings 的此配置项。
    更多配置可参考 python migrate_index_meta.py -h
import json

import requests
import argparse
import copy
import collections.abc


def deep_update_pop_none(d, u):
    for k, v in u.items():
        if isinstance(v, collections.abc.Mapping):
            d[k] = deep_update_pop_none(d.get(k, {}), v)
        else:
            if v is None:
                d.pop(k, None)
            else:
                d[k] = v
    return d


def get_cluster_version(host, username="", password=""):
    req = requests.get(host, auth=(username, password), verify=False, timeout=10)
    if req.status_code != 200:
        raise RuntimeError("get cluster version failed, status_code: %s, 详细信息: %s" % (req.status_code, req.text))

    resp = req.json()
    is_os = False
    if resp['version'].get("distribution") == "opensearch":
        is_os = True
    major_version = int(resp["version"]["number"].split(".")[0])
    return is_os, major_version


def get_source_indices(host, target, username="", password=""):
    url = host.strip("/") + "/_cat/indices/" + target
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    try:
        req = requests.get(url, auth=(username, password), headers=headers, verify=False, timeout=10)
    except Exception as e:
        raise RuntimeError("_cat indices failed, url: %s, err: %s" % (url, e))
    if req.status_code != 200:
        raise RuntimeError("_cat indices failed, status_code: %s, 详细信息: %s" % (req.status_code, req.text))

    indices_result = req.text.strip()
    index_text_list = indices_result.split("\n")
    index_list = []
    for index in index_text_list:
        if index.find("open") > 0:
            index_list.append(index.split()[2])
            continue

        print("source_index: %s 已关闭,不做迁移" % index)

    return index_list


def get_index_meta(index_name, host, username="", password=""):
    url = host.strip("/") + "/" + index_name
    req = requests.get(url, auth=(username, password), verify=False, timeout=10)
    if req.status_code != 200:
        raise RuntimeError("get index: %s meta failed, 详细信息: %s" % (index_name, req.text))

    resp = req.json()

    settings = resp[index_name]["settings"]
    aliases = resp[index_name]["aliases"]
    mappings = resp[index_name]["mappings"]

    print("source_index: %s \nsettings: %s\nmapping: %s\naliases: %s" % (index_name, settings, mappings, aliases))

    target_settings = copy.deepcopy(settings)
    target_settings["index"]["refresh_interval"] = default_refresh_interval
    target_settings["index"]["number_of_replicas"] = default_replicas
    target_settings["index"].pop("version", None)
    target_settings["index"].pop("uuid", None)
    target_settings["index"].pop("creation_date", None)
    target_settings["index"].pop("provided_name", None)

    if default_index_settings is not None:
        target_settings = deep_update_pop_none(target_settings, default_index_settings)

    index_meta = {
        "settings": target_settings,
        "mappings": mappings,
        "aliases": aliases

    }

    return index_meta


# 保持mapping type兼容。
# https://www.elastic.co/cn/blog/strings-are-dead-long-live-strings
def transfer_mapping_field(field):
    if "properties" in field:
        for _field in field["properties"].keys():
            field["properties"][_field] = transfer_mapping_field(field["properties"][_field])

        return field

    if field["type"] == "string":
        if field.get("index") == "not_analyzed":
            return {"type": "keyword"}
        if field.get("analyzer") == "keyword":
            return {"type": "keyword"}
        return {"type": "text"}

    return field


# 保持mapping兼容。
def transfer_mapping(target_index_name, mappings, target_cluster_version):
    if target_cluster_version >= 7:
        # 去除index type。
        return {target_index_name: list(mappings.values())[0]}

    if not keep_source_index_type:
        # 重命名index type。
        return {target_index_name: {rename_target_index_type: list(mappings.values())[0]}}

    return {target_index_name: mappings}


# 如果源集群和目标集群版本都大于等于7,则mapping不变。
# 如果源集群等于6,目标集群大于等于7,则去除indexType,其余不变。
# 如果源集群和目标集群都等于6,则mapping的不变。
# 如果源集群小于6,且indexType大于1,则根据auto_split_index_type判断是否拆分为多个索引。
def parse_mappings(source_index_name, mappings, source_cluster_version, target_cluster_version):
    # 保持mapping field type版本兼容、去除ES2&5中mappings除_all属性。
    mappings.pop("_all", None)
    if source_cluster_version >= 7:
        for field_name, field in mappings.get('properties', {}).items():
            mappings['properties'][field_name] = transfer_mapping_field(field)
    else:
        for _type, _mappings in mappings.items():
            for field_name, field in _mappings.get('properties', {}).items():
                mappings[_type].pop("_all", None)
                mappings[_type]['properties'][field_name] = transfer_mapping_field(field)

    # 源集群为6或7。
    if source_cluster_version >= 7:
        return {source_index_name: mappings}
    elif source_cluster_version >= 6:
        return transfer_mapping(source_index_name, mappings, target_cluster_version)

    # 源集群小于6, 且只有一个index type。
    if len(mappings) == 1:
        return transfer_mapping(source_index_name, mappings, target_cluster_version)

    # 过滤获取需要迁移的index type。
    migrate_mappings = {}
    if not is_migrate_all_index_type:
        if migrate_index_type_list is None:
            raise RuntimeError("当is_migrate_all_index_type为False且源集群版本为`%s`时,migrate_index_type_list必须设置"
                               % source_cluster_version)
        for _type, mapping in mappings.items():
            # 判断index type是否需要迁移。
            if _type not in migrate_index_type_list:
                print("source_index: %s,_type: %s 无需迁移" % (source_index_name, _type))
                continue
            migrate_mappings[_type] = mapping
    else:
        migrate_mappings = mappings

    if len(migrate_mappings) == 0:
        raise RuntimeError("is_migrate_all_index_type: %s, migrate_index_type_list: %s,源索引:%s 未转化得到有效目标"
                           "索引mappings" % (is_migrate_all_index_type, migrate_index_type_list, source_index_name))

    # 源集群小于6, 过滤后只有一个index type。
    if len(migrate_mappings) == 1:
        return transfer_mapping(source_index_name, migrate_mappings, target_cluster_version)

    new_index_mappings = {}
    # 源集群小于6, 有多个index type。
    if not is_auto_split_index_type:
        assert RuntimeError(
            "source_index: %s 有多个`_type`,请手动处理或设置`auto_split_index_type`为True" % source_index_name)

    for _type, mapping in migrate_mappings.items():
        target_index_name = split_index_name_pattern % (source_index_name, _type)
        print("source_index: %s _type: %s 尝试迁移到 target_index: %s" % (source_index_name, _type, target_index_name))
        new_index_mapping = transfer_mapping(target_index_name, {_type: mapping}, target_cluster_version)
        new_index_mappings.update(new_index_mapping)

    return new_index_mappings


def generate_target_index_meta(source_index_name, source_cluster_version, target_cluster_version):
    index_meta = get_index_meta(source_index_name, source_cluster_host, source_cluster_user_name,
                                source_cluster_password)

    target_index_mappings = parse_mappings(
        source_index_name, index_meta["mappings"], source_cluster_version, target_cluster_version)
    target_index_meta = {}

    for target_index_name, mappings in target_index_mappings.items():
        target_index_meta[target_index_name] = {
            "settings": index_meta["settings"],
            "mappings": mappings,
            "aliases": index_meta["aliases"]
        }

    return target_index_meta


def create_target_index(target_index_name, index_meta):
    url = target_cluster_host.strip("/") + "/" + target_index_name
    exists = requests.head(url, auth=(target_cluster_user_name, target_cluster_password), verify=False, timeout=10)

    if exists.status_code == 200:
        print("target_index: %s 已存在,跳过创建" % target_index_name)
        return

    if exists.status_code != 404:
        raise RuntimeError("target_index: %s exists status_code: %s 详细信息: \n%s" %
                           (target_index_name, exists.status_code, exists.text))

    print("target_index: %s index_meta: \n%s" % (target_index_name, index_meta))
    url = target_cluster_host.strip("/") + "/" + target_index_name
    req = requests.put(url, json=index_meta,
                       auth=(target_cluster_user_name, target_cluster_password), verify=False, timeout=10)
    if req.status_code != 200:
        raise RuntimeError("target_index: %s 创建失败,status_code: %s 详细信息: \n%s" %
                           (target_index_name, req.status_code, req.text))
    print("target_index: %s 创建成功" % target_index_name)


# 不支持集群版本降级。
# 不支持目标集群版本小于6的迁移。
def run():
    source_is_os, source_cluster_version = get_cluster_version(source_cluster_host, source_cluster_user_name, source_cluster_password)
    target_is_os, target_cluster_version = get_cluster_version(target_cluster_host, target_cluster_user_name, target_cluster_password)
    is_same_core = source_is_os == target_is_os
    # 判断大版本。
    if (target_cluster_version < source_cluster_version) and is_same_core:
        raise RuntimeError(
            "不支持版本降级,源集群版本:%s, 目标集群版本:%s" % (source_cluster_version, target_cluster_version))
    if not target_is_os and target_cluster_version < 6:
        raise RuntimeError("不支持迁移到6以下的ES大版本,目标集群版本:%s" % target_cluster_version)

    _migrate_index_list = []
    if is_migrate_all_index:
        _migrate_index_list = get_source_indices(source_cluster_host, "_all", source_cluster_user_name,
                                                 source_cluster_password)
        print("本次尝试迁移源集群所有索引")
    else:
        print("本次尝试迁移源集群指定索引:%s" % migrate_index_list)
        for index in migrate_index_list:
            _migrate_index_list.extend(get_source_indices(
                source_cluster_host, index, source_cluster_user_name, source_cluster_password))

    for index in set(_migrate_index_list):
        if index.startswith("."):
            print("%s 可能是系统索引,不会重新创建,请单独处理~" % index)
        elif exclude_index_list and index in exclude_index_list:
            print("%s 在exclude_index_list中,跳过" % index)
        else:
            target_index_meta = generate_target_index_meta(index, source_cluster_version, target_cluster_version)
            for target_index_name, index_meta in target_index_meta.items():
                create_target_index(target_index_name, index_meta)
        print("-" * 80)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--source_cluster_host', help="源集群地址")
    parser.add_argument('--source_cluster_user_name', default=None, help="源集群用户名,用作basic_auth")
    parser.add_argument('--source_cluster_password', default=None, help="源集群密码,用作basic_auth")
    parser.add_argument('--target_cluster_host', help="目标集群地址")
    parser.add_argument('--target_cluster_user_name', default=None, help="目标集群用户名,用作basic_auth")
    parser.add_argument('--target_cluster_password', default=None, help="目标集群密码,用作basic_auth")
    parser.add_argument('--default_replicas', default=0, help="目标索引默认副本数,默认为0,提高迁移速度")
    parser.add_argument('--default_refresh_interval', default='60s', help="目标索引默认refresh_interval,默认为60s")
    parser.add_argument('--default_index_settings', default=None,
                        help="默认index settings,会和已有的index settings做合并,适用于期望统一更改settings的场景,值是json")
    parser.add_argument('--migrate_index_list', default=None,
                        help="待迁移源索引名,多个时用`,`分割,支持通配符,如test*表示所有以test开头的索引")
    parser.add_argument('--is_migrate_all_index', default=False,
                        help="是否迁移全部源索引,为True时migrate_index_list无效")
    parser.add_argument('--exclude_index_list', default=None,
                        help="迁移时,需排除的源索引名,多个时用`,`分割")
    parser.add_argument('--migrate_index_type_list', default=None,
                        help="源索引有多个待迁移索引type时,指定待迁移type,多个时用`,`分割。如源索引只有1个索引type,则此配置无效,直接迁移")
    parser.add_argument('--is_migrate_all_index_type', default=False,
                        help="源索引有多个索引type时,是否迁移所有索引type,为True时migrate_index_type_list无效")
    parser.add_argument('--is_auto_split_index_type', default=False,
                        help="源索引有多个待迁移索引type时,是否自动拆分不同type到不同目标索引",
                        )
    parser.add_argument('--split_index_name_pattern', default="%s-index_type-%s",
                        help="源索引有多个待迁移索引type时且拆分到不同目标索引时,目标索引名pattern")
    parser.add_argument('--rename_target_index_type', default="_doc",
                        help="当目标集群为ES6时,强制指定index type,默认为_doc")
    parser.add_argument('--keep_source_index_type', default=False,
                        help="当目标集群为ES6时,是否保留原index type,默认为false")

    parse_bool = lambda a: a == 1 or a == "1" or a == "true" or a == "true"
    args = parser.parse_args()
    source_cluster_host = args.source_cluster_host
    source_cluster_user_name = args.source_cluster_user_name
    source_cluster_password = args.source_cluster_password
    target_cluster_host = args.target_cluster_host
    target_cluster_user_name = args.target_cluster_user_name
    target_cluster_password = args.target_cluster_password
    default_replicas = args.default_replicas
    default_refresh_interval = args.default_refresh_interval
    default_index_settings = args.default_index_settings
    if default_index_settings is not None:
        default_index_settings = json.loads(default_index_settings)
    migrate_index_list = args.migrate_index_list
    if migrate_index_list is not None:
        migrate_index_list = migrate_index_list.strip().split(",")
    exclude_index_list = args.exclude_index_list
    if exclude_index_list is not None:
        exclude_index_list = exclude_index_list.strip().split(",")
    is_migrate_all_index = parse_bool(args.is_migrate_all_index)
    migrate_index_type_list = args.migrate_index_type_list
    if migrate_index_type_list is not None:
        migrate_index_type_list = migrate_index_type_list.strip().split(",")

    is_migrate_all_index_type = parse_bool(args.is_migrate_all_index_type)
    is_auto_split_index_type = parse_bool(args.is_auto_split_index_type)
    split_index_name_pattern = args.split_index_name_pattern
    rename_target_index_type = args.rename_target_index_type
    keep_source_index_type = args.keep_source_index_type

    run()

步骤四:迁移元数据并配置 Logstash

进入 Logstash 安装目录,在./config/ 目录下创建 migrate_es.conf配置文件,然后按照后续场景配置迁移文件。
ES 2 和 ES 5 版本支持单个索引设置多个_type,ES 6 版本仅支持单个索引设置一个_type,ES 7 以上版本不支持配置索引_type
索引支持单个_type和不支持_type,在迁移中可视为同一种模式,所以本文分为两种迁移场景分别介绍数据迁移配置方法: ES 6->ES 6&7,ES 7->ES 7ES 2&5 -> ES 6&7

说明

Logstash 会自动在每个文档中添加@version@timestamp字段,如果想要删除可参考Deleting @Version and @Timestamp

ES 6 版本中,单个索引 mapping 只有一个 _type,ES 7 以后版本 mapping 不支持设置_type,故索引迁移可以一对一完成。

迁移元数据

执行以下命令,将迁移源集群所有索引的元信息到目标集群,并重命名所有目标索引_type_doc

说明

ES7 虽然 mapping 不再支持_type,写入文档时依旧可以设置文档的_type_doc,但不建议使用非 _doc 以外的 _type。

python migrate_index_meta.py --source_cluster_host=源集群地址 --target_cluster_host=目标集群地址 --is_migrate_all_index=true --rename_target_index_type=_doc

配置 Logstash

配置 migrate_es.conf文件为以下内容。迁移源索引的文档到目标索引,目标索引名不变,目标索引_type_doc

input {
  elasticsearch {
   # 源es集群endpoint。
    hosts => "源集群链接"
    # 源集群basicAuth鉴权信息,如果有。
    user => "源集群用户名"
    password => "源集群密码"
    # 支持通配符,* 表示所有索引,如果索引多数据量大可以分开配置。
    index => "源索引名"
    query => '{ "sort": [ "_doc" ] }'
    size => 1000
    # scroll session保持时间
    scroll => "5m"
    docinfo => true
    # 是否仅用`hosts`配置项数组中的https链接进行请求。
    ssl => false
    # ES6以后支持配置,不超过源索引shard数,加速scroll。
    slices => 1
  }
}

# 删除logStash自动添加的字段,可选。
#filter {
#    mutate {
#       remove_field => ["@version", "@timestamp"]
#   }
#}

output {
  elasticsearch {
   # 目标es集群endpoint。
    hosts => "目标集群链接"
    # 目标集群basicAuth鉴权信息,如果有。
    user => "目标集群用户名"
    password => "目标集群密码"
    # 和源索引名保持一致。
    index => "%{[@metadata][_index]}"
    # # 全部文档迁移到 _doc。
    document_type => "_doc"
    document_id => "%{[@metadata][_id]}"
    # 是否仅用`hosts`配置项数组中的https链接进行请求。
    ssl => false
    # 当使用https链接时,是否验证ES服务端证书。
    ssl_certificate_verification => false
    # ssl_certificate_verification为true时,验证服务端ca的证书路径,.cer 或 .pem格式。
    # cacert => "ca证书路径"。
  }
}

步骤五:启动 Logstash

上述步骤创建的 Logstash 配置文件在config目录下,文件名为migrate_es.conf,执行以下命令,启动 Logstash。

./bin/logstash -f ./config/migrate_es.conf

说明

  • 如不同索引迁移策略不同,需配置不同的配置文件分别迁移。
  • 数据迁移通常时间会比较长,建议开启后台进程执行迁移。

步骤六:查看迁移进度

执行以下命令,查询集群中全部索引的信息,包括索引的健康情况、状态、名称、UUID、分片数、副本数、文档数量等信息。

GET /_cat/indices?v

您可以通过对比源集群和目标集群的索引和文档数,来判断迁移进度。当索引数相同、文档数趋近相同时,可视为数据迁移完成

说明

因为 ES 2、ES 5单个索引支持多个_type,如果迁移时源索引指定了_type或者将不同_type拆分到了不同的目标索引,则需分_type 查看索引迁移进度。
您可以在源集群通过/{索引名}/{_type}/_count查看指定索引指定_type的文档数。

步骤七:(可选)迁移增量数据

如果源索引需要增量更新,需注意以下事项:

  • 文档有时间字段,标记文档最后更新时间。
  • 迁移前记录迁移开始时间,用作后续增量迁移起始时间。

待完成存量数据迁移后,更新 Logstash 配置文件migrate_es.conf中的input.query配置项,设置查询迁移起始时间之后文档,然后重复执行步骤五~步骤六(重新启动 Logstash,并查询迁移进度)。

input {
  elasticsearch {
    # 源es集群endpoint。
    hosts => "源集群链接"
    # 源集群basicAuth鉴权信息,如果有。
    user => "源集群用户名"
    password => "源集群密码"
    # 支持通配符,* 表示所有索引,如果索引多数据量大可以分开配置。
    index => "源索引名"
    # 按时间范围查询增量数据,以下配置表示查询最近10分钟的数据。
    # 下述中@timestamp为区分新老数据的字段,迁移时可以根据实际索引字段灵活调整,查询语句同理。
    query => '{"query":{"range":{"@timestamp":{"gte":"now-10m","lte":"now/m"}}}}'
    size => 1000
    # 是否使用slice scroll加速迁移,值不超过单索引shard数,ES 6以后版本才支持配置。
    # slices => 4
    # scroll session保持时间
    scroll => "5m"
    docinfo => true
    ssl => false
  }
}

常见问题

1. 迁移报错 "type" : "illegal_argument_exception", "reason" : "unknown setting"

索引 setting 迁移时,可能由于跨 ES 集群版本有不兼容配置项,也可能是来自友商 ES 自研功能配置项在火山引擎云搜索上不支持,导致报错。
解决方案
通过迁移脚本的default_index_settings参数,将不支持的配置项设置为null,这样迁移 settings 时会删除对应不支持的配置项。
例如以下示例会将原索引已有配置{"index": {"merge": {"policy": {"inactive_merge_enabled": true}}}}在迁移时删除。

python migrate_index_meta.py --source_cluster_host=源集群地址 --target_cluster_host=目标集群地址 --migrate_index_type_list=t_1 --is_migrate_all_index=true --rename_target_index_type=_doc --default_index_settings='{"index": {"merge": {"policy": {"inactive_merge_enabled": null}}}}'

同时,此参数也可以覆盖原有的 settings,新增 settings,使用方式就是将 null 配置为具体的值。

2.当原始数据有 routing 时

ES output 需加上对应的 routing 配置,可参考开源文档Routing

3.增量迁移时没有 @timestamp 字段

@timestamp字段是示例,用作区分新老数据。
迁移时可以根据实际索引字段灵活调整,可以是其他的 date 类型字段,也可以是 id 编号这样的 number 字段。同时,查询语句也做相应调整。