为降低您已有的数仓中的实时数据/历史数据导入DataFinder的ETL工作量,DataFinder提供了通过编写简单的映射文件并提交至后台任务的方式,将对应数仓中的数据接入到DataFinder数据库中。本文将详细介绍数仓数据接入的注意事项和操作步骤。
细分 | 详细说明 |
---|---|
环境与功能要求 |
如有疑问,可以联系技术人员进行咨询。 |
数据接入任务类型 | 当前版本仅支持接入数仓中的Kafka或者HDFS中的数据,其中:
|
数据格式 |
|
网络要求 | DataFinder集群与您的Kafka和HDFS在内网中网络联通。 |
实时和离线数据转换映射的配置方式是一致的,数据读写方式不同,下文中先介绍实时和离线模版,再一起介绍数据转换映射的配置方式
数据接入配置文件主要包含以下几个部分:
env { execution.parallelism = {executionParallelism} job.mode = "STREAMING" checkpoint.interval = 2000 execution.checkpoint.timeout = 6000 execution.checkpoint.data-uri = "hdfs://vpc-minibase/user/rangers/data/flink/checkpoint/" } source { Kafka { bootstrap.servers = "192.168.6.24:9192,192.168.6.25:9192" #修改为客户kafka的bootstrap.servers topic = "test_topic_input" #修改为需要接入的客户topic consumer.group = "finder_data_group" #客户需要指定消费组,需要注意不同任务消费同一个topic时消费组不能一样 kafka.config = { max.poll.records = "2000" max.poll.interval.ms = "1000" auto.offset.reset = "earliest" } start_mode = "group_offsets" format = text result_table_name = "input_source" } } transform { EventMapper { process_parallelism = {processParallelism} source_table_name = "input_source" result_table_name = "output_distination" #数据映射下文中会详细说明 ***** } } sink { kafka { topic = "sdk_origin_event" bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192" #修改为datafinder集群的bootstrap.servers format = text kafka.request.timeout.ms = 60000 source_table_name = "output_distination" kafka.config = { acks = "all" request.timeout.ms = "60000" buffer.memory = "33554432" } } }
env { execution.parallelism = {executionParallelism} job.mode = "BATCH" } source { HdfsFile { fs.defaultFS = "hdfs://192.168.1.1:9000" #修改为客户实际的fs.defaultFS path = "/user/rangers/data/event" #修改为客户实际文件路径,可以是指定具体文件,也可以指定文件目录 file_format_type = "text" #修改为实际数据格式,text(json)、parquet result_table_name = "input_source" } transform { EventMapper { process_parallelism = {processParallelism} source_table_name = "input_source" result_table_name = "output_distination" job_mode = "BATCH" #数据映射下文中会详细说明 ***** } } sink { HdfsFile { path = "{result_path}" file_format_type = "parquet" result_table_name = "output_distination" hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml" fs.defaultFS = "hdfs://vpc-minibase" } }
数据映射配置写在对应配置文件模版的transfrom部分,详细的配置指导说明如下:
transform { DataTypeConvert { #对于HDFS离线导入场景,如果原始数据的格式是 parquet 的格式,需要配置DataTypeConvert 算子。 #其他场景无需配置此算子,删除此部分即可。 process_parallelism = 6 source_table_name = "input_source" result_table_name = "json_table" }, EventMapper { process_parallelism = {processParallelism} source_table_name = "json_table" result_table_name = "output_distination" job_mode = "BATCH" #数据映射编写位置,用来详细配置来源数据的字段与去向数据的字段映射关系,配置详情请参见下表。 } }
来源数据与去向数据字段映射的配置参数说明如下:
参数类型 | 配置说明 |
---|---|
necessary_field | 用来设置几个必须要有的字段,目前包含event、app_id、local_time_ms,配置格式如下。
|
id_field | 用来设置必须要有的id相关信息,内部至少需要有一个id信息,配置格式如下。
其中target_name为接入DataFinder去向数据表中的id字段,取值枚举值为:user_uniqe_id,anonymous_id以及其他在CDP侧创建的其他口径名。 |
common_field | 公共属性,可以配置为预置公共属性(名称和官网预置公共属性一致)和自定义公共属性,目标属性名(target_name)的校验规则和正常上报的属性名规则一致,配置格式如下。
其中可配置的属性相关字段及字段说明请参见下文的参考:属性相关字段配置说明章节。 |
param_field | 用来设置事件属性,配置格式如下。
其中可配置的属性相关字段及字段说明请参见下文的参考:属性相关字段配置说明章节。 |
user_field | 用来设置用户属性。 |
exclude | 配置在数据接入时,需要移除的字段。
|
filter_mode | 用于过滤需要导入的数据,当前不支持配置多个过滤条件。配置格式如下:
|
job_mode | 任务类型,默认值为STREAMING ,离线任务需要配置为BATCH。 |
output_file_type | 中间文件输出格式,需根据来源数据类型配置 :json、parquet |
put_all_mode | 当原始数据为一条完全平铺的数据,不存在任何嵌套的情况的时候,配置该字段,可以将剩余的所有的字段全部作为事件属性或者自定义公共属性来处理。处理策略:来源数据的字段名作为属性名、字段取值作为属性取值。
|
字段名 | 说明 | 是否必须 |
---|---|---|
source_name | 原始数据中的字段名。 注意 编写配置文件的时候,排在前面的 source_name 被使用是之后,排在后面的就不可以再使用了。 | 否 |
target_name | 需要映射为DataFinder侧的字段名。 | 是 |
value | 直接为某个DataFinder侧的字段指定取值,不依赖原始数据。 | 否 |
value_type | target_name取值的数据类型,与value字段没有关系,标识当前映射关系的v数据类型,默认为string类型。 | 否 |
parse_type | value_type为map时:
| 否 |
delimiter |
| 否 |
spliter | 如果上报的数据是 a:b&c:d 这种形式,需要将其拆开为两个属性 注意
| 否 |
id_type | 多口径场景下使用,默认是finder_uid | 否 |
operation_type | 仅针对用户属性字段user_field生效,表明用户属性的处理方式: | |
value_mapping | 针对预置事件或者其它属性value值需要映射的情况
| 否 |
conditional | 用于执行满足某种条件才需要进行字段的映射:
| 否 |
conditional_target_name | 满足 conditional 的情况下,可以使用指定的 source_name 或者 target_name | 否 |
conditional_source_name | 否 |
下载任务相关脚本。
登录DataFinder的部署服务器,单击脚本下载链接下载所有任务相关脚本并解压。
运行脚本,上传配置文件,获取并记录返回的配置文件ID。
上传配置文件命令示例
bash upload_file.sh [文件名] [项目id或者app_id] # [项目id或者app_id]:如果开启了多应用就用项目名,未开启多应用则用app_id
上传成功返回示例,其中data值即为配置文件ID,请记录对应取值,后续启动任务时需要使用。
{"code":0,"message":"success","data":24}
运行脚本,部署数据接入任务。部署任务成功后会自动启动数据接入任务。
部署任务命令示例
bash deploy.sh [任务名] [项目id或者app_id] [配置文件的ID] [任务类型] # [任务名]:仅限英文小写字符串,并且不能有特殊字符 # [项目id或者app_id]:如果开启了多应用就用项目名,未开启多应用则用app_id # [配置文件的ID]:上传配置文件成功之后返回的 id # [任务类型]:实时为0,离线为2
返回结果示例,启动成功会返回一个任务ID(data值),后期查询任务状态,停止、删除任务等,都需要使用到该任务ID。
{"code":0,"message":"success","data":77}
停止任务。
离线任务执行完毕后会自动停止;需手动停止数据接入任务时执行以下命令。
bash stop.sh [任务id] [项目id或者app_id]
启动任务
bash start.sh [任务id] [项目id或者app_id]
删除任务。
运行中的任务无法删除。
bash delete.sh [任务id] [项目id或者app_id]
{ source_name=test_a target_name=test_b }
{ source_name=test_a target_name=test_b value_type=list }
{ target_name=test_b value=x }
标识将原始数据中的 action 字段映射到 Finder 的 event 字段,并且 action 的取值为 AppIn 时,event 取值 app_launch、action 的取值为 AppOut 时,event 取值 app_terminate { source_name=action target_name=event value_mapping=[ { old_value="AppIn" mapping_value="app_launch" }, { old_value="AppOut" mapping_value="app_terminate" } ] }
表示在原始数据中的 sname 字段的值为 a 时,使用 yname 映射到 event;如果 sname 的值不为 a 时,使用 xname 映射到 event;conditional 可以写多个条件,多个条件遵循且的关系 { source_name=yname target_name=event conditional=[ [ { name=sname value=a } ] ] conditional_source_name=xname }
表示将原始数据中的 test_list 根据 , split 之后作为 array 类型赋值给 Finder 侧的 new_list。 { source_name=test_list target_name=new_list value_type=list delimiter="," }
{ source_name=test_list value_type=list delimiter="_" parse_type=2 } eg: "key1": ["a", "b"] 会转变为两个属性 key1_0: a key1_1: b
{ source_name="properties" value_type=map parse_type=0 } eg: { "properties": { "a1":"a1", "c": { "c1": "c1" } } } 会转变为两个属性: { "a1":"a1", "c1": "c1" }
{ source_name="properties" value_type=map parse_type=3 } eg: { "properties": { "a1":"a1", "c": { "c1": "c1" "c2": ["a", "b"] } } } 会转变为如下属性: { "a1":"a1", "c1": "c1", "c20": "a", "c21": "b", }
{ source_name="properties" value_type=map delimiter="_" parse_type=1 } eg: { "properties": { "a1":"a1", "c": { "c1": "c1" } } } 会转变为两个属性: { "properties_a1":"a1", "properties_c_c1": "c1" }
{ source_name="properties" value_type=map delimiter="_" parse_type=2 } eg: { "properties": { "a1":"a1", "c": { "c1": "c1" "c2": ["a", "b"] } } } 会转变为如下属性: { "properties_a1":"a1", "properties_c_c1": "c1", "properties_c_c2_0": "a", "properties_c_c2_1": "b", }
为快速说明如何编写配置映射,以下具一个示例,例如,您的数据格式如下:
{ "event_n": "app_start", //事件名 "user_identity": "user_123", //用户标识 "event_time": 1712738195000, //事件发生时间 "device_os": "andriod", //设备系统 "department": "工厂", //事件公共属性 "properties": { //事件属性 "level": 10, "param_1": "audi", "param_2": "value2" } }
并且,你有以下要求:
你的映射配置需要编写如下:
transform { EventMapper { process_parallelism = 6 source_table_name = "input_source" result_table_name = "output_distination" output_file_type = "parquet" #necessary_field用来设置几个必要的字段 necessary_field=[ { source_name="event_n" target_name="event" }, #此段标识原始数据中的event_n作为事件名 { source_name="event_time" target_name="local_time_ms" },#此段标识原始数据中的event_time作为事件时间 { target_name="app_id" value=10000000 } #此段标识需要导入的app_id ], #id_field用来设置用户id信息 id_field=[ { source_name="user_identity" target_name="user_unique_id" } #此段标识将user_identity作为user_unique_id ] #common_field用来设置事件公共属性 common_field=[ { source_name="device_os" target_name="os_name" }, #此段标识将device_os设置为事件公共属性os_name { source_name="department" target_name="department" } #此段标识将department设置为事件公共属性department ] #param_field用来设置事件属性 param_field=[ { source_name="properties.level" target_name="level" }, #此段标识将properties下的level设置为事件公共属性os_name { source_name="properties.param_1" target_name="car" } #此段标识将properties下的param_1设置为事件公共属性car ] } }
以下为一个复杂的映射配置,可以参考其来配置更为复杂的处理方式,建议参考上文示例中的较为简单的示例进行使用,过于复杂的场景可能因为配置错误不符合预期
env { # You can set flink configuration here execution.parallelism = 6 job.mode = "STREAMING" checkpoint.interval = 2000 execution.checkpoint.timeout = 6000 execution.checkpoint.data-uri = "hdfs://vpc-minibase/user/rangers/data/flink/checkpoint/" } source { Kafka { topic = "test_topic_input" bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192,192.168.6.25:9192,192.168.6.34:9192" format = text result_table_name = "input_source" start_mode = "group_offsets" consumer.group = "finder_integration_group" kafka.config = { max.poll.records = "2000" max.poll.interval.ms = "100" auto.offset.reset = "earliest" } } } transform { EventMapper { process_parallelism = 6 source_table_name = "input_source" result_table_name = "output_source" necessary_field=[ { source_name=log_time target_name=local_time_ms }, { source_name=project_name target_name=app_id value_mapping=[ { old_value=biz mapping_value=10000000 }, { old_value=acrm mapping_value=10000001 } ] }, { source_name=behavior target_name=event } ] id_field=[ { source_name=user_id target_name=anonymous_id conditional=[ [ { name="props.is_login" value=true } ] ] conditional_target_name=user_unique_id } ] param_field=[ { source_name=test_map_merge value_type=map delimiter="_" parse_type=1 }, { source_name=test_map_merge_list value_type=map delimiter="_" parse_type=2 }, { source_name=test_map_flatten value_type=map parse_type=0 }, { source_name=test_map_flatten_list value_type=map parse_type=3 }, { source_name=test_flatten_list value_type=list delimiter="_" parse_type=3 }, { source_name=test_string_list target_name=new_list value_type=list delimiter="," }, { source_name=test_url_list target_name=new_list delimiter="&" spliter="=" } { source_name=properties value_type=map parse_type=0 } ] exclude=[ { conditional=[ [ { name="props.ca" value=cv } ], [ { name="props.ca" value=cvv } ] ] fields=[ "props.cb" ] }, { fields=[ "props.ca" ] } ], common_field=[ { source_name="device_id" target_name=openudid conditional=[ [ { name="props.lib" value="iOS" } ] ] conditional_target_name=vendor_id }, { source_name="props.os" target_name=os_name }, { source_name="props.os_version" target_name=os_version } ] put_all_mode=event_params } } sink { kafka { topic = "sdk_origin_event" bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192,192.168.6.25:9192,192.168.6.34:9192" format = text kafka.request.timeout.ms = 60000 source_table_name = "output_source" kafka.config = { acks = "all" request.timeout.ms = "60000" buffer.memory = "33554432" } } }
{ "log_time": "2024-04-29 06:45:49", "project_name": "biz", "event": "test_event", "user_id": "asdasdasdasd", "device_id": "sadasdadasd", "props": { "is_login": true, "sdk_lib": "java", "ca": "cvvv", "cb": "cb", "cc": "cc", "asa": "123", "os": "ios", "os_version": "2.2" }, "test_map_merge": { "a": "a1", "b": { "b1": { "b11": "b11", "b12": "b12" }, "b2": 123 } }, "test_map_merge_list": { "a": { "a1": { "a11": "a11", "a12": [ "a", "b", "c" ] } } }, "test_map_flatten": { "a": "a1", "b": { "b1": { "b11": "b11", "b12": "b12" }, "b2": 123 } }, "test_map_flatten_list": { "a": { "a1": { "a11": "a11", "a12": [ "a", "b", "c" ] } } }, "test_flatten_list": [ "a", "b", "c" ], "test_string_list": "a,b,c", "test_url_list": "hhh=b&nohh=c", "pa": 123, "pb": [ 1, 2, 3 ] }
{ "server_time": 1714360805, "time": 1714344349, "event_date": "2024-04-29", "app_name": "app_66b7", "app_id": 10000000, "user": { "user_id": "asdasdasdasd", "ssid": "1729391053003443521", "web_id": "7360215241029852429" }, "params": { "device_model": "unknown", "timezone": "1000", "platform": "web", "os_version": "2.2", "network_carrier": "", "device_brand": "unknown", "os_name": "ios", "is_login": "已登录", "ad_id": 0, "width": 0, "height": 0, "app_version": null, "app_channel": null, "browser": null, "resolution": null, "language": null, "loc_city_id": "None", "ab_version": [], "mp_platform_app_version": null, "a11": "a11", "b12": "b12", "a120": "a", "b11": "b11", "a121": "b", "lib": "java", "type": "track", "test_map_merge_b_b1_b12": "b12", "test_map_merge_b_b1_b11": "b11", "test_flatten_list_2": "c", "test_flatten_list_1": "b", "test_flatten_list_0": "a", "a122": "c", "cb": "cb", "cc": "cc", "a": "a1", "is_login_id": "true", "test_map_merge_a": "a1", "hhh": "b", "asa": "123", "test_map_merge_list_a_a1_a12_2": "c", "test_map_merge_list_a_a1_a12_1": "b", "nohh": "c", "test_map_merge_list_a_a1_a12_0": "a", "test_map_merge_list_a_a1_a11": "a11", "b2": 123, "pa": 123, "test_map_merge_b_b2": 123, "pb": [ "1", "2", "3" ], "new_list": [ "a", "b", "c" ] }, "items": {}, "local_time_ms": 1714344349000, "user_id": "asdasdasdasd", "ssid": "1729391053003443521", "web_id": "7360215241029852429" }