You need to enable JavaScript to run this app.
导航

Kafka消息订阅及推送

最近更新时间2023.11.23 15:57:24

首次发布时间2023.09.15 22:16:58

1. 功能概述

VeCDP产品提供强大的开放能力,支持通过内置Kafka对外输出的VeCDP系统内的数据资产。用户可以通过监测Kafka消息,及时了解标签、分群等数据变更,赋能更多企业业务系统。

2. 消息订阅配置说明

topic规范

cdp的kafka topic是按集团拆分的,topic格式如下:

cdp_dataAsset_orgId_${org_id}

截止到1.21,如果想使用cdp的消息总线消费事件,cdp只会建一个默认的集团topic `cdp_dataAsset_orgId_1`。如果默认集团id不为1,或者新增集团需要重新手动建立新的topic

消费 topic

目前支持kafka消息推送,以集团粒度进行消息的分发,部署时默认构建topic为 cdp_dataAsset_orgId_1 ,该topic仅支持消费集团id为1,客户可在终端进入kafka目录通过以下命令进行消费调试

//消费kafka
/opt/tiger/kafka_2.11-2.1.1/bin/kafka-console-consumer.sh --bootstrap-server $(sd config kafka_vpc) --from-beginning --topic cdp_dataAsset_orgId_1

新建topic

如果客户需要在其他集团下进行消息推送,需联系前场构建新的topic,以集团2为例,具体步骤如下

  • 新建topic名称:cdp_dataAsset_orgId_2,2代表集团id

//创建kafka
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181/kafka_vpc_lf  --replication-factor 1 --partitions 1 --topic cdp_dataAsset_orgId_${org_id}

//修改环境变量
DATA_ASSET_KAFKA_TOPIC="cdp_dataAsset_orgId_1,cdp_dataAsset_orgId_${org_id}"
3. 元数据格式规范

说明

  1. Kafka全部以标准json格式发送,key(属性)采用蛇形命名法。

  2. 下表规范了字段是否必填,所有消息都会默认遵守。

  3. 所有字段的数据类型首字母大写,由于json可表达的格式有限,所以对字段格式做了收敛,可选枚举值:String,Long,Double,Bool,Object(非必要不用,主要用于占位),Array[T]

  4. resource_type 是数据资产分类,全大写,非数据资产(比如资产输出任务)可以没有该字段。

  5. 所有消息强制向前兼容。

  6. 只允许 可选类型 向 必填类型 转换,不许 必填类型 向 可选类型 转换,如果有类似的需求只能新增字段。

  7. 说明文档里面没有说明的属性,不建议用户使用,可忽略。

  8. 不建议用枚举值承接所有属性。

属性属性名称是否必填说明demo
全体事件名称_event_name
事件发生时间_event_timestamp
所属项目IDproject_id一般都会有,但是对于一些集团粒度的消息是没有的,比如idm相关
主体IDsubject_id按规范是必填字段,不满足的消息会单独标注
资源IDid标签目录ID,标签ID,分群ID
资源名称(最新)name

资源类型

resource_type

SEGMENT 分群
TAG 标签
TARGETID 一个主体非baseid
BASEID 一个主体下的baseid

增删被谁操作creator,updater
被谁操作updater

变更字段前后值

changes

1. 建议规范,并非强制规范

"changes": [
{"field_name": "name", "data_type":"String", "before": "seg_1", "after": "seg_2"},
{"field_name": "description", "data_type":"String", "before": "seg description before value", "after": "desc after value"}
]

变更字段changes[index].field_name
变更字段-前changes[index].before
变更字段-后changes[index].after
变更字段类型changes[index].data_typeString,Long,Double,Bool

3.1 标签

序号事件名称事件说明属性属性展示名属性类型是否必填属性值含义或示例
1cdp.label.domain.create新建目录_event_name事件名称Stringcdp.label.domain.create
_event_timestamp变更时间Long1684117946617
project_id所属项目IDLong1
subject_id主体IDLong1
resource_type资源类型StringTAG
id目录的IDLong1
name目录名称Stringtest_name
domain_parent_id目录父节点IDLong1
2cdp.label.domain.delete删除目录_event_name事件名称Stringcdp.label.domain.delete
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringTAG
id目录的IDLong
name目录的显示名String
3cdp.label.domain.move移动目录_event_name事件名称Stringcdp.label.domain.move
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringTAG
id目录的IDLong
name目录的名称String
domain_parent_id目录的ID(最新)Long
changes信息变更Array[Object]
4cdp.label.domain.rename重命名目录_event_name事件名称Stringcdp.label.domain.rename
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringTAG
id目录的IDLong
domain_parent_id目录的IDLong
name目录的显示名称(最新)String
changes信息变更Array[Object]
5cdp.label.label.create新建标签_event_name事件名称Stringcdp.label.label.create
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringTAG
id目录的IDLong
name标签名称String

create_type

标签创建方式

String

rule(规则)
import(导入)
manual(人工)
logic (排序)
combine (运算)
ml_model(机器学习模型)
etl_model(数据清洗模型)
hive_sql(hive sql标签)
clickhouse_sql (ch sql标签)
multi_stage(多阶段)
rfm (rfm)
preference(偏好)

data_type_name标签数据类型Stringbigint, array_bigint,double, array_double,date, array_date,datetime, array_datetime,String, array_String
domain_id所属目录IDLong
domain_name所属目录名称String
creator标签创建人String

6

cdp.label.label.delete

删除标签
{"id":11,"name":"dasddasdas","creator":"admin","_event_id":"31771d6a-8795-41da-9906-bc00f29b06ef","_event_name":"cdp.label.label.create","_event_source":"vpc-profile-meta-db59ccd44-pmn72","_event_timestamp":1697011027068,"_traffic_group":"","project_id":2,"subject_id":1,"resource_type":"LABEL","create_type":"rule","data_type_name":"string","domain_id":-2}

_event_name

事件名称

String

cdp.label.label.delete

_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringTAG
updater变更操作人String
id目录的IDLong
name标签名称String
7cdp.label.label.modify修改标签_event_name事件名称Stringcdp.label.label.modify
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringTAG
updater变更操作人String
id目录的IDLong
name标签名称String

changes

信息变更

Array[Object]

"changes": [
{"field_name": "name", "data_type":"String", "before": "seg_1", "after": "seg_2"},
{"field_name": "description", "data_type":"String", "before": "seg description before value", "after": "desc after value"}
]

8cdp.label.instance.status标签任务状态变更_event_name事件名称Stringcdp.label.instance.status
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringTAG
id标签IDLong
name标签名称String

task_result

任务结果

"task_result": {
"status": "Failed",
"err_msg": "error message",
"task_time": "2023-08-28 00:00:00"
}
}

status状态结果LongSucess成功Failed失败
err_msg报错信息String
task_time任务更新时间String

3.2 标签快照

序号事件名称事件说明属性属性展示名属性类型是否必填属性值含义或示例
1cdp.label.label.snapshot标签快照,定时按租户发送_event_name事件名称Stringcdp.label.label.snapshot
_event_timestamp变更时间Long1684117946617
project_id所属项目IDLong1
subject_id主体IDLong1
resource_type资源类型StringTAG
domain_id目录的IDLong1
domain_name目录名称Stringtest_name
compute_type是否是实时标签String可选值:offline, realtime

data_type_name

标签数据类型

String

可选值:
bigint, array_bigint
double, array_double
date, array_date
datetime, array_datetime
String, array_String

entity_id实体idLong
status标签状态Long状态机:0-正常,1-删除,其他-不可用
latest_pdate标签最后一次更新日期String标签初次跑的时候没有值
is_mau_tag是否是全量标签Bool
id标签idLong
name标签名String
owner标签创建者String
updater最后一次更新人String
update_time最后一次更新时间String"2022-11-09 17:13:20" # 实际发送的时候不会有双引号
create_time创建时间String"2022-11-09 17:13:20" # 实际发送的时候不会有双引号

3.3 分群

序号事件名称事件说明属性名称属性展示名属性类型是否必填属性值含义或示例
1cdp.seg.segment.create新建分群_event_name事件名称StringB列
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringSEGMENT
id目录的IDLong
name分群名称String
id_type保存ID类型String

seg_type

分群创建方式

String

私域类型
Conditional,规则分群
Uploaded, 上传分群
ABI,ABI分群
Lookalike,Lookalike分群(旧版)
ChildSeg,子包分群
SubjectTrans,主体转换分群
FeatureRecommendation,标签推荐导出分群
InsightExport,旧版洞察导出分群
InsightExportV2,新版洞察导出分群
Finder,Finder分群
PrivateLookalike,私域lookalike分群
Model, 私域模型分群
SqlExport,Sql导出分群
RealtimeConditional,实时规则分群
PrivateCluster,聚类模型分群
PrivateClusterChild,聚类模型子包分群
ManualRealtime,人工分群
公域类型
PublicConditional,公域规则分群
PublicModelResult,公域模型分群
PublicLookalike,公域lookalike分群
PrivateTransToPublic,私转公分群
PublicCluster,公域聚类模型分群
PublicClusterChild,公域聚类模型子包分群

creator分群创建人Stringowner
2cdp.seg.segment.delete删除分群_event_name事件名称StringB列
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringSEGMENT
updater变更操作人String
id分群IDLong
name分群名称String
3cdp.seg.segment.modify修改分群_event_name事件名称StringB列
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringSEGMENT
updater变更操作人String
id分群IDLong
name分群名称String

changes

信息变更

Array[Object]

"changes": [
{"field_name": "name", "data_type":"String", "before": "seg_1", "after": "seg_2"},
{"field_name": "description", "data_type":"String", "before": "seg description before value", "after": "desc after value"}
]

4cdp.seg.segment.task分群任务状态变更_event_name事件名称StringB列
_event_timestamp变更时间Long
project_id所属项目IDLong
subject_id主体IDLong
resource_type资源类型StringSEGMENT
id分群包IDLong
name分群名称String

task_result

任务结果

Object

"task_result": {
"status": "Failed",
"err_msg": "error message",
"task_time": "2023-08-28 00:00:00"
}
}

status状态结果StringSucess成功Failed失败
err_msg报错信息String
task_time任务更新时间String
task_id任务idLong

3.4 资产输出任务

事件名称事件说明属性名称属性展示名属性类型属性值含义或示例
cdp.asset.data.task_event_name事件类型Stringcdp.asset.data.task
_event_timestamp变更时间long
id任务idLong
name任务名称String
project_id项目idLong
org_id集团idLong
type输出类型String
version_id版本号Long
status状态String
path路径String
c_date业务日期String
create_time创建时间String
end_time结束时间String

extra

附加信息,目前有schemaInfo

Object

"extra": {
"schema_info": [
{
"asset_id": "1",
"asset_name": "baseid",
"asset_type": "BASEID",
"asset_type_name": "基准Id",
"col_name": "base_id",
"col_type": "string"
}
]
}

schema_info
asset_id资产Id
asset_name资产名称
asset_type资产类型
asset_type_name资产类型中文名
col_name输出列名
col_type输出类型