You need to enable JavaScript to run this app.
导航
客户数据接入【私有化】
最近更新时间:2024.05.29 16:21:46首次发布时间:2022.06.07 14:31:53

火山引擎增长分析为降低客户实时数据接入及历史数据导入中ETL的工作量,提供了通过编写简单的映射文件并提交后台任务的方式,将客户数据接入到增长分析数据库中。下文中将详细介绍适用范围及适用方式

适用范围说明

支持将客户自定义格式的事件或用户数据导入datafinder

组件要求

当前版本仅支持数据在客户kafka或者客户hdfs,kafka中数据实时持续消费,hdfs中数据一次性导入。

数据格式要求

数据必须为json格式,其中hdfs中的离线数据可以压缩

网络要求

datafinder集群可以内网访问到客户kafka和hdfs,例如kafka可以通过broker.server访问(192.168..:9092);hdfs可以通过fs.defaultFS路径访问(hdfs://192.168..:9000/path)

配置文件说明

实时和离线数据转换映射的配置方式是一致的,数据读写方式不同,下文中先介绍实时和离线模版,再一起介绍数据转换映射映射的配置方式

实时模版-接入kafka数据

注意:仅能修改模版中标黄处标蓝处为注释并在实际使用时删除。

env {
      execution.parallelism = 3
      job.mode = "STREAMING"
      checkpoint.interval = 2000
      execution.checkpoint.timeout = 6000
      execution.checkpoint.data-uri = "hdfs://vpc-minibase/user/rangers/data/flink/checkpoint/" 
    }
    source {
      Kafka {
        bootstrap.servers = "192.168.6.24:9192,192.168.6.25:9192" #修改为客户kafka的bootstrap.servers,填写2个即可
        topic = "test_topic_input" #修改为需要接入的客户topic
        consumer.group = "finder_data_group"   #客户需要指定消费组,需要注意不同任务消费同一个topic时消费组不能一样    
        kafka.config = {
          max.poll.records = "2000"
          max.poll.interval.ms = "1000"
          auto.offset.reset = "earliest"
        }
        start_mode = "group_offsets"
        format = text
        result_table_name = "input_source"
      }  
    }
    
    transform {
      EventMapper {
        process_parallelism = 6
        source_table_name = "input_source"
        result_table_name = "output_distination"
        #数据映射下文中会详细说明
        *****
      }
    }
    
    sink {
      kafka {
          topic = "sdk_origin_event"
          bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192" #修改为datafinder集群的bootstrap.servers
          format = text
          kafka.request.timeout.ms = 60000
          source_table_name = "output_distination"
          kafka.config = {
            acks = "all"
            request.timeout.ms = "60000"
            buffer.memory = "33554432"
          }
      }
    }

离线模版-接入hdfs数据

注意:仅能修改模版中标黄处标蓝处为注释并在实际使用时删除。

env {
      execution.parallelism = 6
      job.mode = "BATCH"
    }
    source {
      source {
        HdfsFile {
            fs.defaultFS = "hdfs://192.168.1.1:9000" #修改为客户实际的fs.defaultFS
            path = "/user/rangers/data/event" #修改为客户实际文件路径,可以是指定具体文件,也可以指定文件目录
            file_format_type = "text"
            result_table_name = "input_source"
        } 
    }
    
    transform {
      EventMapper {
        process_parallelism = 6
        source_table_name = "input_source"
        result_table_name = "output_distination"
        #数据映射下文中会详细说明
        *****
      }
    }
    
    sink {
       HdfsFile {
        path = "/user/rangers/data/integration/"
        file_format_type = "text"
        result_table_name = "output_distination"
        hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml"
        fs.defaultFS = "hdfs://vpc-minibase"
      }
    }

数据映射配置

数据映射配置写在模版的transfrom下:

transform {
      EventMapper {
        process_parallelism = 6
        source_table_name = "input_source"
        result_table_name = "output_distination"
       数据映射编写位置
      }
    }

具体示例

为快速说明如何编写配置映射,以下具一个示例,例如,您的数据格式如下:

{
  "event_n": "app_start", //事件名
  "user_identity": "user_123", //用户标识
  "event_time": 1712738195000, //事件发生时间
  "device_os": "andriod", //设备系统
  "department": "工厂", //事件公共属性
  "properties": { //事件属性
    "level": 10,
    "param_1": "audi",
    "param_2": "value2"
  }
}

并且,你有以下要求:

  1. 将该topic数据导入到10000000这个app中
  2. 将device_os这个字段映射为datafinder的预置公共属性os_name中
  3. 将department映射为一个事件公共属性
  4. 将properties下的level导入为一个一般事件属性
  5. 将properties下的param_1导入为一个一般事件属性,并且属性名映射为car

你的映射配置需要编写如下:

transform {
      EventMapper {
        process_parallelism = 6
        source_table_name = "input_source"
        result_table_name = "output_distination"
        
        #necessary_field用来设置几个必要的字段
        necessary_field=[ 
            {
                source_name="event_n"
                target_name="event"
            }, #此段标识原始数据中的event_n作为事件名
            {
                source_name="event_time"
                target_name="local_time_ms"
            },#此段标识原始数据中的event_time作为事件时间
            {
                target_name="app_id"
                value=10000000
            } #此段标识需要导入的app_id
        ],
        
        #id_field用来设置用户id信息
        id_field=[ 
            {
                source_name="user_identity"
                target_name="user_unique_id"               
            } #此段标识将user_identity作为user_unique_id
        ]
        
        #id_field用来设置事件公共属性
        common_field=[ 
            {
                source_name="device_os"
                target_name="os_name"
            }, #此段标识将device_os设置为事件公共属性os_name
            {
                source_name="department"
                target_name="department"
            } #此段标识将department设置为事件公共属性department
        ]
        
        #id_field用来设置事件属性
         param_field=[
            {
                source_name="properties.level"
                target_name="level"
            }, #此段标识将properties下的level设置为事件公共属性os_name
            {
                source_name="properties.param_1"
                target_name="car"
            }  #此段标识将properties下的param_1设置为事件公共属性car
         ]
      }
   }

外层字段说明

上文示例中的necessary_field、id_field这些字段就是外层字段

字段类型

说明

necessary_field

必须要有的字段,目前包含event_name,app_id,local_time_ms

id_field

必须要有的类型,内部至少需要有一个id信息,可以是user_unique_id和anonymous_id

common_field

公共属性

param_field

事件属性

user_field

用户属性

exclude_field

原始数据中需要移除的字段

filter_mode

用于过滤自己需要的数据
filter_mode {
filter={ a=c
}
filter_on_true=true
}
filter_on_true = true 表示收到的数据中存在一个属性 a ,并且他的值等于 c,那么就将其过滤掉
filter_on_true = false 表示收到的数据中存在一个属性 a ,并且他的等于 c,那么就不将其过滤掉,否则将其过滤掉

put_all_mode

当原始数据为一条完全平铺的数据,不存在任何嵌套的情况的时候,配置该字段,可以将剩余的所有的字段全部作为事件属性或者自定义公共属性来处理,。
可选值为
自定义事件公共属性:custom_common_params、
事件属性:event_params

finder_common_param

list 类型,用于标识 Finder 公共属性:
finder_common_param=[
"platform",
"app_name",
"app_id",
"carrier",
"os_name",
"os_version",
"app_version",
"sdk_version",
"package",
"language",
"ab_version",
"client_ip",
"app_channel",
"app_version_minor",
"network_type",
"creative_id",
"ad_id",
"access",
"device_brand",
"device_model",
"browser",
"browser_version",
"campaign_id",
"app_region",
"app_language",
"resolution",
"region",
"timezone",
"utm_campaign",
"utm_source",
"utm_medium",
"utm_content",
"utm_term",
"user_agent",
"platform",
"device_manufacturer",
"height",
"width",
"cpu_abi",
"vendor_id",
"openudid",
"referrer",
"referrer_host",
"idfv",
"idfa",
"oaid",
"gaid",
"caid",
]

属性描述字段说明

上文中类似source_name、target_name、value这些就是属性描述字段

字段名

说明

是否必须

source_name

原始数据中的字段名

否(未指定value的情况下,一定需要)

target_name

需要映射为Finder侧的字段名

value

直接为某个字段指定值,不依赖原始数据

value_type

value类型,与value字段没有关系,标识当前映射关系的value类型,默认为string类型

parse_type

value_type为map时可用,
0: 将该map下的属性都作为事件属性
1: 需要拼接key,将拼接好的key作为事件属性
比如:

"key":{
 "k1": "v1",
 "k2":"v2"
}

以"_"作为拼接符:
key_k1="v1"
key_k2="v2"

delimiter

  1. 需要多层key作为输出的名字的时候指定拼接符,配合 value_type=map 使用
  2. 需要将一个字符串类型转换为 list 类型,比如 a,b,c -> [a, b, c] 时指定分隔符,配合 value_type=list 使用
  3. 用于配合 spliter 来拆分 kv 对

spliter

如果上报的数据是 a:b&c:d这种形式,需要将其拆开为两个属性
a=b
c=d
则需要指定 delimiter = &、spliter = :,使用这种方式拆分的属性只能是 string 类型。
spliter 不为空 && delimiter不为空的时候,就会执行该逻辑。
spliter 可以和 delimiter 一样。

id_type

多口径场景下使用,默认是finder_uid

operation_type

针对用户属性生效:
set
setonce
unset
append
remove

value_mapping

针对预置事件或者其它属性value值需要映射的情况
比如:

表示将原始数据里的pv这个值映射为Finder的predefine_pageview事件

value_mapping={
    pv=predefine_pageview
}

conditional

用于执行满足某种条件才需要进行字段的映射:
conditional={
event=a
}

conditional_target_name

满足 conditional 的情况下,可以使用指定的 source_name 或者 target_name

conditional_source_name

附录中会给出一个复杂的映射配置,可以参考其来配置更为复杂的处理方式,建议参考上文示例中的较为简单的示例进行使用,过于复杂的场景可能因为配置错误不符合预期

配置文件使用

下载以下脚本

上传配置

bash upload_file.sh [文件名] [项目id或者app_id]
# [项目id或者app_id] 如果开启了多应用就用项目名,为开启多应用则用app_id

配置上传成功会返回配置文件的ID,启动任务时需要用到

{"code":0,"message":"success","data":24}

部署任务
部署任务后会自动启动

bash deploy.sh [任务名] [项目id或者app_id]  [配置文件的ID] [任务类型]

#[任务名]:仅限英文小写字符串,并不能有特殊字符
#[任务类型]:实时为0,离线为2

如果启动成功会返回一个任务id,后期查询任务状态,停止、删除任务等,都需要使用到该任务id

{"code":0,"message":"success","data":77}

停止任务

bash stop.sh [任务id] [项目id或者app_id] 

启动任务

bash start.sh [任务id] [项目id或者app_id] 

删除任务
任务在运行中时无法删除

bash stop.sh  [任务id] [项目id或者app_id] 

附录
  1. 一个复杂的映射配置
env {
      # You can set flink configuration here
      execution.parallelism = 6
      job.mode = "STREAMING"
      checkpoint.interval = 2000
      execution.checkpoint.timeout = 6000
      execution.checkpoint.data-uri = "hdfs://vpc-minibase/user/rangers/data/flink/checkpoint/" 
    }
    source {
      Kafka {
        topic = "test_topic_input"
        bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192,192.168.6.25:9192,192.168.6.34:9192"
        format = text
        result_table_name = "fake"
        start_mode = "group_offsets"
        consumer.group = "finder_seatunnel_group"
        kafka.config = {
          max.poll.records = "2000"
          max.poll.interval.ms = "100"
          auto.offset.reset = "earliest"
        }
      }  
    }
    
    transform {
      EventMapper {
        process_parallelism = 18
        source_table_name = "fake"
        result_table_name = "fake1"
        filter_mode={
            is_sa=true
        }
        exclude_field=[
            "properties.$app_state",
            "properties.$is_first_day",
            "properties.$is_first_time",
            "properties.$receive_time",
            "properties.$wifi",
            "properties.$province",
            "properties.$viewport_height",
            "properties.$viewport_position",
            "properties.$viewport_width",
            "properties.$item_join",
            "properties.$receive_time",
            "properties.$lib_plugin_version",
            "properties.$ios_install_source",
            "properties.$channel_device_info",
            "properties.$ios_install_disable_callback",
            "properties.$is_channel_callback_event",
            "properties.$channel_extra_information"
        ],
        necessary_field=[
            {
                source_name=type
                target_name=event
                conditional=[
                    {
                        name=type
                        value=track
                    }
                ]    
                conditional_source_name=event
                value_mapping=[
                    {
                        old_value="$AppStart"
                        mapping_value="app_launch"
                    },
                    {
                        old_value="$AppInstall"
                        mapping_value="app_launch"
                    },
                    {
                        old_value="$AppEnd"
                        mapping_value="app_terminate"
                    },
                    {
                        old_value="$AppViewScreen"
                        mapping_value="app_pageview"
                    },
                    {
                        old_value="$AppClick"
                        mapping_value="app_click"
                    },
                    {
                        old_value="$AppInstall"
                        mapping_value="properties.$activation"
                    },
                    {
                        old_value="AppCrash"
                        mapping_value="properties.$crash"
                    },
                    {
                        old_value="$MPLaunch"
                        mapping_value="app_launch"
                    },
                    {
                        old_value="$MPShow"
                        mapping_value="app_launch"
                    },
                    {
                        old_value="$MPShare"
                        mapping_value="on_share"
                    },
                    {
                        old_value="$MPViewScreen"
                        mapping_value="predefine_pageview"
                    },
                    {
                        old_value="$MPHide"
                        mapping_value="app_terminate"
                    },
                    {
                        old_value="$MPClick"
                        mapping_value="bav2b_click"
                    },
                    {
                        old_value="$MPAddFavorites"
                        mapping_value="on_addtofavorites"
                    },
                    {
                        old_value="$MPPageLeave"
                        mapping_value="predefine_pageview_hide"
                    },
                    {
                        old_value="$pageview"
                        mapping_value="predefine_pageview"
                    },
                    {
                        old_value="$WebClick"
                        mapping_value="bav2b_click"
                    },
                    {
                        old_value="profile_set"
                        mapping_value="__profile_set"
                    }
                    {
                        old_value="profile_set_once"
                        mapping_value="__profile_set_once"
                    },
                    {
                        old_value="profile_increment"
                        mapping_value="__profile_increment"
                    }
                    {
                        old_value="profile_append"
                        mapping_value="__profile_append"
                    },
                    {
                        old_value="profile_unset"
                        mapping_value="__profile_unset"
                    }
                    {
                        old_value="item_set"
                        mapping_value="__item_set"
                    },
                    {
                        old_value="item_delete"
                        mapping_value="__item_unset"
                    }
                ]
            },
            {
                source_name=time
                target_name=local_time_ms
                value_type=long
            },
            {
                source_name=recv_time
                target_name=server_time
                value_type=long
            },
            {
                source_name=project
                target_name=app_id
                value_mapping=[
                    {
                        old_value=ebiz_test
                        mapping_value=10000001
                    }
                ]          
            }
        ],
        common_field=[
            {
                source_name=time_free
                target_name=__is_history
            },
            {
                source_name="$device_id"
                target_name=openudid
                conditional={
                    name="properties.$lib"
                    value='iOS'
                }
                conditional_target_name=vendor_id
            },
            {
                source_name="properties.$app_crashed_reason"
                target_name=app_crashed_reason
            },
            {
                source_name="identities.$identity_idfv"
                target_name=vendor_id
            },
            {
                source_name="identities.$identity_idfv"
                target_name=idfv
            },
            {
                source_name="identities.$identity_idfa"
                target_name=idfa
            },
            {
                source_name="identities.$identity_android_id"
                target_name=openudid
            },
            {
                source_name="identities.$identity_caid"
                target_name=caid
            },
            {
                source_name="identities.$identity_gaid"
                target_name=gaid
            },
            {
                source_name="identities.$identity_gaid"
                target_name=oaid
            },
            {
                source_name="properties.$app_version"
                target_name=app_version
            },
            {
                source_name="properties.$timezone_offset"
                target_name=timezone_offset
            },
            {
                source_name="properties.$user_agent"
                target_name=user_agent
            },
            {
                source_name="properties.$source_package_name"
                target_name=package
            },
            {
                source_name="properties.$model"
                target_name=device_model
            },
            {
                source_name="properties.$brand"
                target_name=device_brand
            },
            {
                source_name="properties.$manufacturer"
                target_name=device_manufacturer
            },
            {
                source_name="properties.$os"
                target_name=os_name
            },
            {
                source_name="properties.$os_version"
                target_name=os_version
            },
            {
                source_name="properties.$screen_height"
                target_name=height
            },{
                source_name="properties.$screen_width"
                target_name=width
            },
            {
                source_name="properties.$browser"
                target_name=browser
            },
            {
                source_name="properties.$browser_version"
                target_name=browser_version
            },
            {
                source_name="properties.$network_type"
                target_name=access
            },
            {
                source_name="properties.$latest_utm_source"
                target_name=utm_source
            },
            {
                source_name="properties.$latest_utm_medium"
                target_name=utm_medium
            },
            {
                source_name="properties.$latest_utm_term"
                target_name=utm_term
            },
            {
                source_name="properties.$latest_utm_content"
                target_name=utm_content
            },
            {
                source_name="properties.$latest_utm_campaign"
                target_name=utm_campaign
            },
            {
                source_name="properties.$channel_name"
                target_name=channel
            },
            {
                source_name="properties.$channel_ad_id"
                target_name=ad_id
            },
            {
                source_name="properties.$channel_campaign_id"
                target_name=campaign_id
            },
            {
                source_name="properties.$ip"
                target_name=client_ip
            },
            {
                source_name="properties.$referrer"
                target_name=referrer
            },
            {
                source_name="properties.$referrer_host"
                target_name=referrer_host
            },
            {
                source_name="properties.$screen_orientation"
                target_name=screen_orientation
            },
            {
                source_name="properties.$lib"
                target_name=sdk_lib
            },
            {
                source_name="properties.$lib_version"
                target_name=sdk_version
            },
        ],
        param_field=[
            {
                source_name="properties.$resume_from_background"
                target_name=is_background
            },
            {
                source_name="properties.$event_duration"
                target_name=session_duration
            },
            {
                source_name="properties.$url_path"
                target_name=path
            },
            {
                source_name="properties.$latest_share_distinct_id"
                target_name=query_from_user_unique_id
            },
            {
                source_name="properties.$latest_share_depth"
                target_name=query_share_depth
            },
                    {
                source_name="properties.$latest_share_distinct_id"
                target_name=query_from_user_unique_id
            },
            {
                source_name="properties.$latest_share_depth"
                target_name=query_share_depth
            },
            {
                source_name="properties.$test_list"
                value_type=list
                delimiter=","
            },
            {
                source_name="properties.$test_url"
                delimiter=","
                spliter=","
            },
            {
                source_name="properties.$test_url1"
                delimiter="&"
                spliter="="
            },
            {
                source_name="properties"
                value_type=map
                parse_type=0
            }
        ],
        id_field=[
            {
                source_name=distinct_id
                target_name=anonymous_id
                conditional=[
                    {
                        name="properties.$is_login_id"
                        value=true
                    }
                ]    
                conditional_target_name=user_unique_id
            }
    
        ]
      }
    }
    
sink {
      kafka {
          topic = "test_topic_output"
          bootstrap.servers = "192.168.6.23:9192,192.168.6.24:9192,192.168.6.25:9192,192.168.6.34:9192"
          format = text
          kafka.request.timeout.ms = 60000
          source_table_name = "fake1"
          kafka.config = {
            acks = "all"
            request.timeout.ms = "60000"
            buffer.memory = "33554432"
          }
      }
    }