You need to enable JavaScript to run this app.
导航

LAS SQL任务结果写入 LASFS/TOS

最近更新时间2024.04.23 19:09:33

首次发布时间2024.04.23 19:09:33

该功能主要为了方便用户将LAS计算结果以 CSV/Parquet 格式写入至LasFs/TOS,方便后续直接通过LasFs/TOS进行数据处理。

1 参数配置

通过在 SQL语句 或 SDK conf 中指定下列参数进行开启:

参数名参数取值说明是否必须默认值
las.query.result.save.enabledtrue / false是否开启结果写入LasFsY默认不开启该功能

las.query.result.save.mode

overwrite / errorifexists

写入模式

  • overwrite:覆盖该目录

  • errorifexists:目标目录存在数据则报错

N

默认errorifexists

las.query.result.save.pathlasfs:/path/to/result/dir/结果保存目标路径Y-
las.query.result.formatcsv / parquet结果格式N默认parquet
las.query.result.partition.num正数结果集的分片数N不设置默认为最后一次shuffle的partition数

说明

  • 该功能会存储任务的全量结果集

  • 开启该功能则无法通过SDK getResult() 接口获取结果

2 使用例子

  • 通过控制台 SQL查询 提交

    • 写lasfs
    set las.query.result.save.enabled=true;
    set las.query.result.save.path=lasfs:/public/test_results_save/;
    set las.query.result.format=csv;
    set las.query.result.save.mode=overwrite;
    set las.query.result.partition.num=10; 
    
    SELECT * FROM `public_sample_dataset`.`store_1g` limit 100000;
    
    • 写tos
    set las.query.result.save.enabled=true;
    set las.query.result.format=csv;
    set las.query.result.save.mode=overwrite;
    set las.query.result.partition.num=10; 
    set las.query.result.save.path=tos://test_bucket/tos_test/test1;
    set las.query.result.save.credentials.ak=${tos桶对应账号的AK};
    set las.query.result.save.credentials.sk=${tos桶对应账号的SK};
    
    SELECT * FROM `public_sample_dataset`.`store_1g` limit 10;
    
  • 通过Query SDK提交:

import com.volcengine.las.Job;
import com.volcengine.las.LAS;
import com.volcengine.las.LASClient;
import com.volcengine.las.LASClientOptions;
import com.volcengine.las.auth.StaticCredentials;
import com.volcengine.las.exception.LASException;
import com.volcengine.las.task.SQLTask;

public final class Test {

  public static void main(String[] args) {
    try {
      LAS client = getClientInstance();

      String sql = "SELECT * FROM `public_sample_dataset`.`store_1g` limit 100000;";
      SQLTask syncTask = new SQLTask.Builder(sql)
          .name("Task results stored to LasFs test")
          .addConf("las.query.result.save.enabled", "true")
          .addConf("las.query.result.save.path", "lasfs:/public/test_results_save/")
          .addConf("las.query.result.format", "csv")
          .addConf("las.query.result.save.mode", "overwrite")
          .addConf("las.query.result.partition.num", "10")
          .sync(true)
          .build();
      Job job = client.execute(syncTask);
    }
    catch (LASException ex) {
      ex.printStackTrace();
    }
  }

  private static LAS getClientInstance() {
    StaticCredentials credentials = new StaticCredentials(
        "${user ak}",
        "${user sk}"
    );

    LASClientOptions options = new LASClientOptions.Builder(credentials)
        .build();

    return new LASClient(options);
  }
}

3 查看结果

  • 在LasFs获取任务结果

  • 在TOS相应的目录下查看结果文件