Logstash 是一款开源的数据收集引擎,具备实时管道处理能力,能够同时从多个来源采集数据、转换数据,然后将数据发送到 Elasticsearch 中。本文介绍如何通过 Logstash 进行跨集群的数据迁移。
火山引擎云搜索服务支持 ES 集群和 OpenSearch 集群,请根据您的集群类型选择下载安装正确版本的 Logstash。
解压后的文件目录如下:
├── logstash-7.10.0 │ ├── bin │ ├── config │ ├── CONTRIBUTORS │ ├── data │ ├── Gemfile │ ├── Gemfile.lock │ ├── jdk │ ├── JDK_VERSION │ ├── lib │ ├── LICENSE.txt │ ├── logs │ ├── logstash-core │ ├── logstash-core-plugin-api │ ├── modules │ ├── NOTICE.TXT │ ├── tools │ └── vendor └── migrate_index_meta.py 其中config目录有相关示例配置: config/ ├── examples │ ├── migrate_6_to_6or7_example.conf │ ├── migrate_7_to_7_example.conf │ └── migrate_8_to_7_example.conf ├── jvm.options ├── log4j2.properties ├── logstash-sample.conf ├── logstash.yml ├── pipelines.yml └── startup.options
在配置数据迁移过程中,需要使用到云搜索服务集群的配置信息,包括实例访问地址、证书等信息。
迁移数据前,首先迁移索引的settings
、mapping
、aliases
元信息。您可以直接使用从 Logstash 安装包解压得到的migrate_index_meta.py
脚本(也可以复制以下示例代码),然后使用 Python 3 进行元数据迁移。
说明
number_of_replicas
为 0 和refresh_interval
为 60s。default_index_settings
配置项,配置项输入为 json 字符串。default_index_settings
会和原索引 settings 合并,并覆盖原索引 settings 已有的同一配置项,当default_index_settings
的配置项值为 null 时,会在同步索引时删除 settings 的此配置项。 python migrate_index_meta.py -h
。元数据迁移脚本代码示例:
import json import requests import argparse import copy import collections.abc def deep_update_pop_none(d, u): for k, v in u.items(): if isinstance(v, collections.abc.Mapping): d[k] = deep_update_pop_none(d.get(k, {}), v) else: if v is None: d.pop(k, None) else: d[k] = v return d def get_cluster_version(host, username="", password=""): req = requests.get(host, auth=(username, password), verify=False, timeout=10) if req.status_code != 200: raise RuntimeError("get cluster version failed, status_code: %s, 详细信息: %s" % (req.status_code, req.text)) resp = req.json() is_os = False if resp['version'].get("distribution") == "opensearch": is_os = True major_version = int(resp["version"]["number"].split(".")[0]) return is_os, major_version def get_source_indices(host, target, username="", password=""): url = host.strip("/") + "/_cat/indices/" + target headers = {"Content-Type": "application/x-www-form-urlencoded"} try: req = requests.get(url, auth=(username, password), headers=headers, verify=False, timeout=10) except Exception as e: raise RuntimeError("_cat indices failed, url: %s, err: %s" % (url, e)) if req.status_code != 200: raise RuntimeError("_cat indices failed, status_code: %s, 详细信息: %s" % (req.status_code, req.text)) indices_result = req.text.strip() index_text_list = indices_result.split("\n") index_list = [] for index in index_text_list: if index.find("open") > 0: index_list.append(index.split()[2]) continue print("source_index: %s 已关闭,不做迁移" % index) return index_list def get_index_meta(index_name, host, username="", password=""): url = host.strip("/") + "/" + index_name req = requests.get(url, auth=(username, password), verify=False, timeout=10) if req.status_code != 200: raise RuntimeError("get index: %s meta failed, 详细信息: %s" % (index_name, req.text)) resp = req.json() settings = resp[index_name]["settings"] aliases = resp[index_name]["aliases"] mappings = resp[index_name]["mappings"] print("source_index: %s \nsettings: %s\nmapping: %s\naliases: %s" % (index_name, settings, mappings, aliases)) target_settings = copy.deepcopy(settings) target_settings["index"]["refresh_interval"] = default_refresh_interval target_settings["index"]["number_of_replicas"] = default_replicas target_settings["index"].pop("version", None) target_settings["index"].pop("uuid", None) target_settings["index"].pop("creation_date", None) target_settings["index"].pop("provided_name", None) if "allocation" in target_settings["index"].get("routing", {}): target_settings["index"]["routing"].pop("allocation", None) if default_index_settings is not None: target_settings = deep_update_pop_none(target_settings, default_index_settings) index_meta = { "settings": target_settings, "mappings": mappings, "aliases": aliases } return index_meta # 保持mapping type兼容 # https://www.elastic.co/cn/blog/strings-are-dead-long-live-strings def transfer_mapping_field(field): if "properties" in field: for _field in field["properties"].keys(): field["properties"][_field] = transfer_mapping_field(field["properties"][_field]) return field if field["type"] == "string": if field.get("index") == "not_analyzed": return {"type": "keyword"} if field.get("analyzer") == "keyword": return {"type": "keyword"} return {"type": "text"} return field # 保持mapping兼容 def transfer_mapping(target_index_name, mappings, target_cluster_version): if target_cluster_version >= 7: # 去除index type return {target_index_name: list(mappings.values())[0]} if not keep_source_index_type: # 重命名index type return {target_index_name: {rename_target_index_type: list(mappings.values())[0]}} return {target_index_name: mappings} # 如果源集群和目标集群版本都大于等于7,则mapping不变 # 如果源集群等于6,目标集群大于等于7,则去除indexType,其余不变 # 如果源集群和目标集群都等于6,则mapping的不变 # 如果源集群小于6,且indexType大于1,则根据auto_split_index_type判断是否拆分为多个索引 def parse_mappings(source_index_name, mappings, source_cluster_version, target_cluster_version): # 保持mapping field type版本兼容、去除ES2&5中mappings除_all属性 mappings.pop("_all", None) if source_cluster_version >= 7: for field_name, field in mappings.get('properties', {}).items(): mappings['properties'][field_name] = transfer_mapping_field(field) else: for _type, _mappings in mappings.items(): for field_name, field in _mappings.get('properties', {}).items(): mappings[_type].pop("_all", None) mappings[_type]['properties'][field_name] = transfer_mapping_field(field) # 源集群为6或7 if source_cluster_version >= 7: return {source_index_name: mappings} elif source_cluster_version >= 6: return transfer_mapping(source_index_name, mappings, target_cluster_version) # 源集群小于6, 且只有一个index type if len(mappings) == 1: return transfer_mapping(source_index_name, mappings, target_cluster_version) # 过滤获取需要迁移的index type migrate_mappings = {} if not is_migrate_all_index_type: if migrate_index_type_list is None: raise RuntimeError("当is_migrate_all_index_type为False且源集群版本为`%s`时,migrate_index_type_list必须设置" % source_cluster_version) for _type, mapping in mappings.items(): # 判断index type是否需要迁移 if _type not in migrate_index_type_list: print("source_index: %s,_type: %s 无需迁移" % (source_index_name, _type)) continue migrate_mappings[_type] = mapping else: migrate_mappings = mappings if len(migrate_mappings) == 0: raise RuntimeError("is_migrate_all_index_type: %s, migrate_index_type_list: %s,源索引:%s 未转化得到有效目标" "索引mappings" % (is_migrate_all_index_type, migrate_index_type_list, source_index_name)) # 源集群小于6, 过滤后只有一个index type if len(migrate_mappings) == 1: return transfer_mapping(source_index_name, migrate_mappings, target_cluster_version) new_index_mappings = {} # 源集群小于6, 有多个index type if not is_auto_split_index_type: assert RuntimeError( "source_index: %s 有多个`_type`,请手动处理或设置`auto_split_index_type`为True" % source_index_name) for _type, mapping in migrate_mappings.items(): target_index_name = split_index_name_pattern % (source_index_name, _type) print("source_index: %s _type: %s 尝试迁移到 target_index: %s" % (source_index_name, _type, target_index_name)) new_index_mapping = transfer_mapping(target_index_name, {_type: mapping}, target_cluster_version) new_index_mappings.update(new_index_mapping) return new_index_mappings def generate_target_index_meta(source_index_name, source_cluster_version, target_cluster_version): index_meta = get_index_meta(source_index_name, source_cluster_host, source_cluster_user_name, source_cluster_password) target_index_mappings = parse_mappings( source_index_name, index_meta["mappings"], source_cluster_version, target_cluster_version) target_index_meta = {} for target_index_name, mappings in target_index_mappings.items(): target_index_meta[target_index_name] = { "settings": index_meta["settings"], "mappings": mappings, "aliases": index_meta["aliases"] } return target_index_meta def create_target_index(target_index_name, index_meta): url = target_cluster_host.strip("/") + "/" + target_index_name exists = requests.head(url, auth=(target_cluster_user_name, target_cluster_password), verify=False, timeout=10) if exists.status_code == 200: print("target_index: %s 已存在,跳过创建" % target_index_name) return if exists.status_code != 404: raise RuntimeError("target_index: %s exists status_code: %s 详细信息: \n%s" % (target_index_name, exists.status_code, exists.text)) print("target_index: %s index_meta: \n%s" % (target_index_name, index_meta)) url = target_cluster_host.strip("/") + "/" + target_index_name req = requests.put(url, json=index_meta, auth=(target_cluster_user_name, target_cluster_password), verify=False, timeout=30) if req.status_code != 200: raise RuntimeError("target_index: %s 创建失败,status_code: %s 详细信息: \n%s" % (target_index_name, req.status_code, req.text)) print("target_index: %s 创建成功" % target_index_name) # 不支持集群版本降级 # 不支持目标集群版本小于6的迁移 def run(): source_is_os, source_cluster_version = get_cluster_version(source_cluster_host, source_cluster_user_name, source_cluster_password) target_is_os, target_cluster_version = get_cluster_version(target_cluster_host, target_cluster_user_name, target_cluster_password) is_same_core = source_is_os == target_is_os # 判断大版本 if (target_cluster_version < source_cluster_version) and is_same_core: if target_cluster_version < 7: raise RuntimeError( "不支持版本降级,源集群版本:%s, 目标集群版本:%s" % (source_cluster_version, target_cluster_version)) if not target_is_os and target_cluster_version < 6: raise RuntimeError("不支持迁移到6以下的ES大版本,目标集群版本:%s" % target_cluster_version) _migrate_index_list = [] if is_migrate_all_index: _migrate_index_list = get_source_indices(source_cluster_host, "_all", source_cluster_user_name, source_cluster_password) print("本次尝试迁移源集群所有索引") else: print("本次尝试迁移源集群指定索引:%s" % migrate_index_list) for index in migrate_index_list: _migrate_index_list.extend(get_source_indices( source_cluster_host, index, source_cluster_user_name, source_cluster_password)) for index in set(_migrate_index_list): if index.startswith("."): print("%s 可能是系统索引,不会重新创建,请单独处理~" % index) elif exclude_index_list and index in exclude_index_list: print("%s 在exclude_index_list中,跳过" % index) else: target_index_meta = generate_target_index_meta(index, source_cluster_version, target_cluster_version) for target_index_name, index_meta in target_index_meta.items(): create_target_index(target_index_name, index_meta) print("-" * 80) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--source_cluster_host', help="源集群地址") parser.add_argument('--source_cluster_user_name', default=None, help="源集群用户名,用作basic_auth") parser.add_argument('--source_cluster_password', default=None, help="源集群密码,用作basic_auth") parser.add_argument('--target_cluster_host', help="目标集群地址") parser.add_argument('--target_cluster_user_name', default=None, help="目标集群用户名,用作basic_auth") parser.add_argument('--target_cluster_password', default=None, help="目标集群密码,用作basic_auth") parser.add_argument('--default_replicas', default=0, help="目标索引默认副本数,默认为0,提高迁移速度") parser.add_argument('--default_refresh_interval', default='60s', help="目标索引默认refresh_interval,默认为60s") parser.add_argument('--default_index_settings', default=None, help="默认index settings,会和已有的index settings做合并,适用于期望统一更改settings的场景,值是json") parser.add_argument('--migrate_index_list', default=None, help="待迁移源索引名,多个时用`,`分割,支持通配符,如test*表示所有以test开头的索引") parser.add_argument('--is_migrate_all_index', default=False, help="是否迁移全部源索引,为True时migrate_index_list无效") parser.add_argument('--exclude_index_list', default=None, help="迁移时,需排除的源索引名,多个时用`,`分割") parser.add_argument('--migrate_index_type_list', default=None, help="源索引有多个待迁移索引type时,指定待迁移type,多个时用`,`分割。如源索引只有1个索引type,则此配置无效,直接迁移") parser.add_argument('--is_migrate_all_index_type', default=False, help="源索引有多个索引type时,是否迁移所有索引type,为True时migrate_index_type_list无效") parser.add_argument('--is_auto_split_index_type', default=False, help="源索引有多个待迁移索引type时,是否自动拆分不同type到不同目标索引", ) parser.add_argument('--split_index_name_pattern', default="%s-index_type-%s", help="源索引有多个待迁移索引type时且拆分到不同目标索引时,目标索引名pattern") parser.add_argument('--rename_target_index_type', default="_doc", help="当目标集群为ES6时,强制指定index type,默认为_doc") parser.add_argument('--keep_source_index_type', default=False, help="当目标集群为ES6时,是否保留原index type,默认为false") parse_bool = lambda a: a == 1 or a == "1" or a == "true" or a == "true" args = parser.parse_args() source_cluster_host = args.source_cluster_host source_cluster_user_name = args.source_cluster_user_name source_cluster_password = args.source_cluster_password target_cluster_host = args.target_cluster_host target_cluster_user_name = args.target_cluster_user_name target_cluster_password = args.target_cluster_password default_replicas = args.default_replicas default_refresh_interval = args.default_refresh_interval default_index_settings = args.default_index_settings if default_index_settings is not None: default_index_settings = json.loads(default_index_settings) migrate_index_list = args.migrate_index_list if migrate_index_list is not None: migrate_index_list = migrate_index_list.strip().split(",") exclude_index_list = args.exclude_index_list if exclude_index_list is not None: exclude_index_list = exclude_index_list.strip().split(",") is_migrate_all_index = parse_bool(args.is_migrate_all_index) migrate_index_type_list = args.migrate_index_type_list if migrate_index_type_list is not None: migrate_index_type_list = migrate_index_type_list.strip().split(",") is_migrate_all_index_type = parse_bool(args.is_migrate_all_index_type) is_auto_split_index_type = parse_bool(args.is_auto_split_index_type) split_index_name_pattern = args.split_index_name_pattern rename_target_index_type = args.rename_target_index_type keep_source_index_type = args.keep_source_index_type run()
进入 Logstash 解压目录,在./config/
目录下创建 migrate_es.conf
配置文件,然后按照后续场景配置迁移文件。
ES 2 和 ES 5 版本支持单个索引设置多个_type
,ES 6 版本仅支持单个索引设置一个_type
,ES 7 以上版本和 Opensearch 不支持配置索引_type
。
索引支持单个_type
和不支持_type
,在迁移中可视为同一种模式。本文分别介绍以下 4 种迁移场景;
说明
Logstash 会自动在每个文档中添加@version
和@timestamp
字段,如果想要删除可参考Deleting @Version and @Timestamp。
ES 6 版本中,单个索引 mapping 只有一个 _type
,ES 7 以后版本 mapping 不支持设置_type
,故索引迁移可以一对一完成。
执行以下命令,将迁移源集群所有索引的元信息到目标集群,并重命名所有目标索引_type
为_doc
。
说明
ES7 虽然 mapping 不再支持_type
,写入文档时通过 URL PATH 依旧可以设置文档的_type
为_doc
,但不建议使用非 _doc 以外的 _type。
python migrate_index_meta.py --source_cluster_host=源集群地址 --target_cluster_host=目标集群地址 --is_migrate_all_index=true --rename_target_index_type=_doc
下列配置迁移源索引的文档到目标索引,目标索引名不变,目标索引_type
为_doc
。
input { elasticsearch { # 源es集群endpoint。 # 注意原始endpoint为https时,endpoint仅保留「域名:port」不要包括https:// hosts => "源集群链接" # 源集群basicAuth鉴权信息,如果有 user => "源集群用户名" password => "源集群密码" # 支持通配符,"*" 表示所有索引,如果索引多数据量大可以分开配置 # "*,-.*" 表示非"."开头的所有索引,"."开头索引通常是系统索引,无需同步 index => "源索引名" query => '{ "sort": [ "_doc" ] }' size => 1000 # scroll session保持时间 scroll => "5m" docinfo => true # 源es集群为https集群时,设置为true ssl => false # 源es集群为https集群时,设置证书路径 #ca_file => "ca.cer证书路径" # ES6以后支持配置,不超过源索引shard数,加速scroll slices => 1 } } # 删除logStash自动添加的字段,可选 #filter { # mutate { # remove_field => ["@version", "@timestamp"] # } #} output { elasticsearch { # 目标es集群endpoint hosts => "目标集群链接" # 目标集群basicAuth鉴权信息,如果有 user => "目标集群用户名" password => "目标集群密码" # 和源索引名保持一致 index => "%{[@metadata][_index]}" # # 全部文档迁移到 _doc document_type => "_doc" document_id => "%{[@metadata][_id]}" # 是否仅用`hosts`配置项数组中的https链接进行请求 ssl => false # 当使用https链接时,是否验证ES服务端证书 ssl_certificate_verification => false # ssl_certificate_verification为true时,验证服务端ca的证书路径,.cer 或 .pem格式 # cacert => "ca证书路径" } }
Logstash 配置文件在config
目录下,文件名为migrate_es.conf
,执行以下命令,启动 Logstash。
./bin/logstash -f ./config/migrate_es.conf
说明
执行以下命令,查询集群中全部索引的信息,包括索引的健康情况、状态、名称、UUID、分片数、副本数、文档数量等信息。
GET /_cat/indices?v
您可以通过对比源集群和目标集群的索引和文档数,来判断迁移进度。当索引数相同、文档数趋近相同时,可视为数据迁移完成。
说明
_type
,如果迁移时源索引指定了_type
或者将不同_type
拆分到了不同的目标索引,则需分_type
查看索引迁移进度。/{索引名}/{_type}/_count
查看指定索引指定_type
的文档数。如果源索引需要增量更新,需注意以下事项:
待完成存量数据迁移后,更新 Logstash 配置文件migrate_es.conf
中的input.query
配置项,设置查询迁移起始时间之后文档,然后重复执行步骤五~步骤六(重新启动 Logstash,并查询迁移进度)。
input { elasticsearch { # 源es集群endpoint hosts => "源集群链接" # 源集群basicAuth鉴权信息,如果有 user => "源集群用户名" password => "源集群密码" # 支持通配符,"*" 表示所有索引,如果索引多数据量大可以分开配置 # "*,-.*" 表示非"."开头的所有索引,"."开头索引通常是系统索引,无需同步 index => "源索引名" # 按时间范围查询增量数据,以下配置表示查询最近10分钟的数据。 # 下述中@timestamp为区分新老数据的字段,迁移时可以根据实际索引字段灵活调整,查询语句同理 query => '{"query":{"range":{"@timestamp":{"gte":"now-10m","lte":"now/m"}}}}' size => 1000 # 是否使用slice scroll加速迁移,值不超过单索引shard数,ES 6以后版本才支持配置 # slices => 4 # scroll session保持时间 scroll => "5m" docinfo => true ssl => false } }
当数据迁移完成后,您可以修改业务代码指向火山引擎的云搜索集群,进行测试和修改,符合预期后,迁移完成。
"type" : "illegal_argument_exception", "reason" : "unknown setting"
索引 setting 迁移时,可能由于跨 ES 集群版本有不兼容配置项,也可能是来自友商 ES 自研功能多增的配置项在火山引擎云搜索上不支持,导致报错。
解决方案:
通过迁移脚本的default_index_settings
参数,将不支持的配置项设置为null
,这样迁移 settings 时会删除对应不支持的配置项。
例如以下示例会将原索引已有配置{"index": {"merge": {"policy": {"inactive_merge_enabled": true}}}}
在迁移时删除。
python migrate_index_meta.py --source_cluster_host=源集群地址 --target_cluster_host=目标集群地址 --migrate_index_type_list=t_1 --is_migrate_all_index=true --rename_target_index_type=_doc --default_index_settings='{"index": {"merge": {"policy": {"inactive_merge_enabled": null}}}}'
同时,此参数也可以覆盖原有的 settings,新增 settings,使用方式就是将 null 配置为具体的值。
ES output 需加上对应的 routing 配置,可参考开源文档Routing。
@timestamp
字段是示例,用作区分新老数据。
迁移时可以根据实际索引字段灵活调整,可以是其他的 date 类型字段,也可以是 id 编号这样的 number 字段。同时,查询语句也做相应调整。