You need to enable JavaScript to run this app.
导航
Java SDK 场景实践
最近更新时间:2025.08.26 20:15:36首次发布时间:2025.08.26 20:15:36
复制全文
我的收藏
有用
有用
无用
无用

使用自定义的 local_time_ms

Datafinder 分析事件发生时间,是根据上报事件在客户端的发生时间(即local_time_ms)来计算的。举个例子:如果一个用户在2022-09-01 23:58:00 发生了购买事件,但是这个事件由于网络延迟或者其他原因导致 Datafinder 实际在2022-09-02 00:03:00 才接受到这个事件,那么该事件在分析的时候,该事件还是被算是在2022-09-01发生的时间,而不是在2022-09-02发生的事件。
使用接口:

/**
  * 功能描述: 异步发送事件
  *
  *  @param  appId        应用id
  *  @param  custom       用户自定义公共参数
  *  @param  eventName    事件名称
  *  @param  eventParams  事件参数
  *  @param  userUniqueId 用户uuid
  *  @param  localTimeMs  事件发生时间,毫秒
  *  @return:  void
  *  @date:  2020/8/26 12:24
  */
void sendEvent(String userUniqueId, int appId, Map<String, Object> custom, String eventName, Map<String, Object> eventParams,long localTimeMs);

/**
  * 功能描述: 发送单条事件
  *
  *  @param  header      事件的公共属性,可以通过调用HeaderV3.Builder().build()构建一个header
  *  @param  eventName   事件名
  *  @param  eventParams 事件参数
  *  @param  localTimeMs  事件发生时间,毫秒
  *  @return:  void
  *  @date:  2020/9/28 22:00
  */
void sendEvent(Header header, String eventName, Map<String, Object> eventParams,long localTimeMs);

高性能场景

如果用户已经有自己的埋点,或者自己的服务端服务特别多,为了避免每个服务都集成DataFinder SDK,业务可以把自己的业务埋点都统一上报到一个kafka topic中去,然后新起一个集成了SDK的服务,来消费业务埋点信息,使用 SDK 提供的API把埋点信息上报到 DataFinder。流程如下:
Image

这种情况下,用户一般都是会把埋点的所有信息,包括埋点发生时间,上报到kafka。如果在使用服务端SDK的时候没有设置local_time_ms的话,事件发生时间会认为是SDK处理的时间,这个时间一般跟埋点的发生时间是有差异的。当系统繁忙,kafaka topic lag 的时候,这种差异就会更大,从而导致 DataFinder 上看到的同时段的事件量同预期相差比较大。

查看上报时延
  1. 服务端是上报到applog服务,可以在服务的granafa上进行看:

    http://{IP}:{port}/d/applog/applog?refresh=1m&orgId=1
    
  2. 可以通过curl的方式来获取平均时延。
    domain 需要替换成真实的值;app_id、user_unique_id 也替换成真实的app_id、user_unique_id。

  3. 可以多次执行以下命令:

    curl -o /dev/null -s -w 'Total: %{time_total}s\n'   --location --request POST '${domain}/sdk/log' \
    --header 'User-Agent: DataRangers Java SDK' \
    --header 'Content-Type: application/json' \
    --data '{
        "app_type": "app",
        "_format_name": "datarangers_svc_app_log_v3_server_sdk_1.5.6",
        "app_id": 10000000,
        "header": {
            "aid": 10000000,
            "custom": {
                "client": "fdaf",
                "__sdk_platform": "datarangers_sdk_1.5.6-release"
            },
            "device_id": 0,
            "timezone": 8,
            "tz_offset": 28800,
            "tz_name": "Asia/Shanghai",
            "user_unique_id": "dkqfn5ku9zk8",
            "app_id": 10000000
        },
        "user_unique_id": "xxxx",
        "event_v3": [
            {
                "event": "补贴查询",
                "params": {
                    "cardNum": "cardNum",
                    "year": "2021"
                },
                "local_time_ms": 1637734994778,
                "datetime": "2021-12-15 14:23:14"
            }
        ]
    }'
    
  4. 更多的curl方式:

    curl -w "@curl-format.txt" -o /dev/null -s  --location --request POST '${domain}/sdk/log' \
    --header 'User-Agent: DataRangers Java SDK' \
    --header 'Content-Type: application/json' \
    --data '{
        "app_type": "app",
        "_format_name": "datarangers_svc_app_log_v3_server_sdk_1.5.6",
        "app_id": {替换成真实的app_id},
        "header": {
            "custom": {
                "client": "fdaf",
                "__sdk_platform": "datarangers_sdk_1.5.6-release"
            },
            "device_id": 0,
            "timezone": 8,
            "tz_offset": 28800,
            "tz_name": "Asia/Shanghai",
            "user_unique_id": "{替换成真实的user_unique_id}",
            "app_id": {替换成真实的app_id}
        },
        "user_unique_id": "xxx",
        "event_v3": [
            {
                "event": "补贴查询",
                "params": {
                    "cardNum": "cardNum",
                    "year": "2021"
                },
                "local_time_ms": 1637734994778,
                "datetime": "2021-12-15 14:23:14"
            }
        ]
    }'
    

    curl-format.txt的文件内容是:

    time_namelookup:  %{time_namelookup}s\n
                time_connect:  %{time_connect}s\n
         time_appconnect:  %{time_appconnect}s\n
        time_pretransfer:  %{time_pretransfer}s\n
           time_redirect:  %{time_redirect}s\n
      time_starttransfer:  %{time_starttransfer}s\n
                         ----------\n
              time_total:  %{time_total}s\n
    

如何处理发送失败的数据

发送失败的数据默认是会写到 eventSavePath (默认是logs/)目录下的error-datarangers.log文件下,下面介绍下一般处理方式:

  1. 如果可以容忍部分失败的话,就不处理。
  2. 自定义失败处理逻辑,比如将失败的消息发送再发送到另外的kafka-topic中,当系统恢复了之后再进行消费补偿。
  3. 配合提供的logagent(仅私有化提供,联系客户经理来获取)来监听失败的文件,进行失败的补上报。

Spring 注入callback:

@Configuration
public class SdkConfiguration {

//  @Bean
  public Callback callback() {
    return new Callback() {

      @Override
      public void onFailed(FailedData failedData) {
      // handle  failed data
      // cause   exception是异常信息
      // message 是上报的完整报文
        System.out.println(failedData.getCause());
        System.out.println(failedData.getException());
        System.out.println(failedData.getMessage());
      }
    };
  }
}

或者在初始化collector的时候设置一个:

EventCollector appEventCollector =  new AppEventCollector("app", dataRangersSDKConfigPropertiesInfo, new Callback() {
  @Override
  public void onFailed(FailedData failedData) {
    // handle the failed data
  }
})

上报性能调试

如果发现上报的qps不满足需求,可以处理:

  1. 扩服务app_log的服务。
  2. 调试sdk的相关配置:
    1. 增加 threadCount 的数量,可以根据上报时延,简单快速估算下(并不一定是线性关系): 1s/时延 * threadCount * batchSize
    2. 使用批量上报的方式。
  3. 联系 DataFiner 运维人员进行扩容。