You need to enable JavaScript to run this app.
导航

Kafka消息订阅及推送

最近更新时间2024.03.28 11:53:22

首次发布时间2024.03.28 11:53:22

1. 功能概述

VeCDP产品提供强大的开放能力,支持通过内置Kafka对外输出的VeCDP系统内的数据资产。用户可以通过监测Kafka消息,及时了解标签、分群等数据变更,赋能更多企业业务系统。

2. 消息订阅配置说明

topic规范

cdp的kafka topic是按集团拆分的,topic格式如下:

cdp_dataAsset_orgId_${org_id}

截止到1.21,如果想使用cdp的消息总线消费事件,cdp只会建一个默认的集团topic cdp_dataAsset_orgId_1。如果默认集团id不为1,或者新增集团需要重新手动建立新的topic

消费 topic

目前支持kafka消息推送,以集团粒度进行消息的分发,部署时默认构建topic为 cdp_dataAsset_orgId_1 ,该topic仅支持消费集团id为1,客户可在终端进入kafka目录通过以下命令进行消费调试

//消费kafka
/opt/tiger/kafka_2.11-2.1.1/bin/kafka-console-consumer.sh --bootstrap-server $(sd config kafka_vpc) --from-beginning --topic cdp_dataAsset_orgId_1

新建topic

如果客户需要在其他集团下进行消息推送,需联系前场构建新的topic,以集团2为例,具体步骤如下

  • 新建topic名称:cdp_dataAsset_orgId_2,2代表集团id
//创建kafka
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181/kafka_vpc_lf  --replication-factor 1 --partitions 1 --topic cdp_dataAsset_orgId_${org_id}

//修改环境变量
DATA_ASSET_KAFKA_TOPIC="cdp_dataAsset_orgId_1,cdp_dataAsset_orgId_${org_id}"

3. 元数据格式规范

说明

  1. Kafka全部以标准json格式发送,key(属性)采用蛇形命名法。
  2. 下表规范了字段是否必填,所有消息都会默认遵守。
  3. 所有字段的数据类型首字母大写,由于json可表达的格式有限,所以对字段格式做了收敛,可选枚举值:String,Long,Double,Bool,Object(非必要不用,主要用于占位),Array[T]
  4. resource_type 是数据资产分类,全大写,非数据资产(比如资产输出任务)可以没有该字段。
  5. 所有消息强制向前兼容。
  6. 只允许 可选类型 向 必填类型 转换,不许 必填类型 向 可选类型 转换,如果有类似的需求只能新增字段。
  7. 说明文档里面没有说明的属性,不建议用户使用,可忽略。
  8. 不建议用枚举值承接所有属性。

属性

属性名称

是否必填

说明

demo

全体

事件名称

_event_name

事件发生时间

_event_timestamp

所属项目ID

project_id

一般都会有,但是对于一些集团粒度的消息是没有的,比如idm相关

主体ID

subject_id

按规范是必填字段,不满足的消息会单独标注

资源ID

id

标签目录ID,标签ID,分群ID

资源名称(最新)

name

资源类型

resource_type

SEGMENT 分群
TAG 标签
TARGETID 一个主体非baseid
BASEID 一个主体下的baseid

增删

被谁操作

creator,updater

被谁操作

updater

变更字段前后值

changes

  1. 建议规范,并非强制规范

"changes": [
{"field_name": "name", "data_type":"String", "before": "seg_1", "after": "seg_2"},
{"field_name": "description", "data_type":"String", "before": "seg description before value", "after": "desc after value"}
]

变更字段

changes[index].field_name

变更字段-前

changes[index].before

变更字段-后

changes[index].after

变更字段类型

changes[index].data_type

String,Long,Double,Bool

3.1 标签

序号

事件名称

事件说明

属性

属性展示名

属性类型

是否必填

属性值含义或示例

1

cdp.label.domain.create

新建目录

_event_name

事件名称

String

cdp.label.domain.create

_event_timestamp

变更时间

Long

1684117946617

project_id

所属项目ID

Long

1

subject_id

主体ID

Long

1

resource_type

资源类型

String

TAG

id

目录的ID

Long

1

name

目录名称

String

test_name

domain_parent_id

目录父节点ID

Long

1

2

cdp.label.domain.delete

删除目录

_event_name

事件名称

String

cdp.label.domain.delete

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

TAG

id

目录的ID

Long

name

目录的显示名

String

3

cdp.label.domain.move

移动目录

_event_name

事件名称

String

cdp.label.domain.move

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

TAG

id

目录的ID

Long

name

目录的名称

String

domain_parent_id

目录的ID(最新)

Long

changes

信息变更

Array[Object]

4

cdp.label.domain.rename

重命名目录

_event_name

事件名称

String

cdp.label.domain.rename

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

TAG

id

目录的ID

Long

domain_parent_id

目录的ID

Long

name

目录的显示名称(最新)

String

changes

信息变更

Array[Object]

5

cdp.label.label.create

新建标签

_event_name

事件名称

String

cdp.label.label.create

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

TAG

id

目录的ID

Long

name

标签名称

String

create_type

标签创建方式

String

rule(规则)
import(导入)
manual(人工)
logic (排序)
combine (运算)
ml_model(机器学习模型)
etl_model(数据清洗模型)
hive_sql(hive sql标签)
clickhouse_sql (ch sql标签)
multi_stage(多阶段)
rfm (rfm)
preference(偏好)

data_type_name

标签数据类型

String

bigint, array_bigint,double, array_double,date, array_date,datetime, array_datetime,String, array_String

domain_id

所属目录ID

Long

domain_name

所属目录名称

String

creator

标签创建人

String

6

cdp.label.label.delete

删除标签
{"id":11,"name":"dasddasdas","creator":"admin","_event_id":"31771d6a-8795-41da-9906-bc00f29b06ef","_event_name":"cdp.label.label.create","_event_source":"vpc-profile-meta-db59ccd44-pmn72","_event_timestamp":1697011027068,"_traffic_group":"","project_id":2,"subject_id":1,"resource_type":"LABEL","create_type":"rule","data_type_name":"string","domain_id":-2}

_event_name

事件名称

String

cdp.label.label.delete

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

TAG

updater

变更操作人

String

id

目录的ID

Long

name

标签名称

String

7

cdp.label.label.modify

修改标签

_event_name

事件名称

String

cdp.label.label.modify

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

TAG

updater

变更操作人

String

id

目录的ID

Long

name

标签名称

String

changes

信息变更

Array[Object]

"changes": [
{"field_name": "name", "data_type":"String", "before": "seg_1", "after": "seg_2"},
{"field_name": "description", "data_type":"String", "before": "seg description before value", "after": "desc after value"}
]

8

cdp.label.instance.status

标签任务状态变更

_event_name

事件名称

String

cdp.label.instance.status

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

TAG

id

标签ID

Long

name

标签名称

String

task_result

任务结果

"task_result": {
"status": "Failed",
"err_msg": "error message",
"task_time": "2023-08-28 00:00:00"
}
}

status

状态结果

Long

Sucess成功Failed失败

err_msg

报错信息

String

task_time

任务更新时间

String

3.2 标签快照

序号

事件名称

事件说明

属性

属性展示名

属性类型

是否必填

属性值含义或示例

1

cdp.label.label.snapshot

标签快照,定时按租户发送

_event_name

事件名称

String

cdp.label.label.snapshot

_event_timestamp

变更时间

Long

1684117946617

project_id

所属项目ID

Long

1

subject_id

主体ID

Long

1

resource_type

资源类型

String

TAG

domain_id

目录的ID

Long

1

domain_name

目录名称

String

test_name

compute_type

是否是实时标签

String

可选值:offline, realtime

data_type_name

标签数据类型

String

可选值:
bigint, array_bigint
double, array_double
date, array_date
datetime, array_datetime
String, array_String

entity_id

实体id

Long

status

标签状态

Long

状态机:0-正常,1-删除,其他-不可用

latest_pdate

标签最后一次更新日期

String

标签初次跑的时候没有值

is_mau_tag

是否是全量标签

Bool

id

标签id

Long

name

标签名

String

owner

标签创建者

String

updater

最后一次更新人

String

update_time

最后一次更新时间

String

"2022-11-09 17:13:20" # 实际发送的时候不会有双引号

create_time

创建时间

String

"2022-11-09 17:13:20" # 实际发送的时候不会有双引号

3.3 分群

序号

事件名称

事件说明

属性名称

属性展示名

属性类型

是否必填

属性值含义或示例

1

cdp.seg.segment.create

新建分群

_event_name

事件名称

String

B列

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

SEGMENT

id

目录的ID

Long

name

分群名称

String

id_type

保存ID类型

String

seg_type

分群创建方式

String

私域类型
Conditional,规则分群
Uploaded, 上传分群
ABI,ABI分群
Lookalike,Lookalike分群(旧版)
ChildSeg,子包分群
SubjectTrans,主体转换分群
FeatureRecommendation,标签推荐导出分群
InsightExport,旧版洞察导出分群
InsightExportV2,新版洞察导出分群
Finder,Finder分群
PrivateLookalike,私域lookalike分群
Model, 私域模型分群
SqlExport,Sql导出分群
RealtimeConditional,实时规则分群
PrivateCluster,聚类模型分群
PrivateClusterChild,聚类模型子包分群
ManualRealtime,人工分群
公域类型
PublicConditional,公域规则分群
PublicModelResult,公域模型分群
PublicLookalike,公域lookalike分群
PrivateTransToPublic,私转公分群
PublicCluster,公域聚类模型分群
PublicClusterChild,公域聚类模型子包分群

creator

分群创建人

String

owner

2

cdp.seg.segment.delete

删除分群

_event_name

事件名称

String

B列

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

SEGMENT

updater

变更操作人

String

id

分群ID

Long

name

分群名称

String

3

cdp.seg.segment.modify

修改分群

_event_name

事件名称

String

B列

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

SEGMENT

updater

变更操作人

String

id

分群ID

Long

name

分群名称

String

changes

信息变更

Array[Object]

"changes": [
{"field_name": "name", "data_type":"String", "before": "seg_1", "after": "seg_2"},
{"field_name": "description", "data_type":"String", "before": "seg description before value", "after": "desc after value"}
]

4

cdp.seg.segment.task

分群任务状态变更

_event_name

事件名称

String

B列

_event_timestamp

变更时间

Long

project_id

所属项目ID

Long

subject_id

主体ID

Long

resource_type

资源类型

String

SEGMENT

id

分群包ID

Long

name

分群名称

String

task_result

任务结果

Object

"task_result": {
"status": "Failed",
"err_msg": "error message",
"task_time": "2023-08-28 00:00:00"
}
}

status

状态结果

String

Sucess成功Failed失败

err_msg

报错信息

String

task_time

任务更新时间

String

task_id

任务id

Long

3.4 资产输出任务

事件名称

事件说明

属性名称

属性展示名

属性类型

属性值含义或示例

cdp.asset.data.task

_event_name

事件类型

String

cdp.asset.data.task

_event_timestamp

变更时间

long

id

任务id

Long

name

任务名称

String

project_id

项目id

Long

org_id

集团id

Long

type

输出类型

String

version_id

版本号

Long

status

状态

String

path

路径

String

c_date

业务日期

String

create_time

创建时间

String

end_time

结束时间

String

extra

附加信息,目前有schemaInfo

Object

"extra": {
"schema_info": [
{
"asset_id": "1",
"asset_name": "baseid",
"asset_type": "BASEID",
"asset_type_name": "基准Id",
"col_name": "base_id",
"col_type": "string"
}
]
}

schema_info

asset_id

资产Id

asset_name

资产名称

asset_type

资产类型

asset_type_name

资产类型中文名

col_name

输出列名

col_type

输出类型