该功能主要为了方便用户将LAS计算结果以 CSV/Parquet 格式写入至LasFs/TOS,方便后续直接通过LasFs/TOS进行数据处理。
通过在 SQL语句 或 SDK conf 中指定下列参数进行开启:
参数名 | 参数取值 | 说明 | 是否必须 | 默认值 |
---|---|---|---|---|
las.query.result.save.enabled | true / false | 是否开启结果写入LasFs | Y | 默认不开启该功能 |
las.query.result.save.mode | overwrite / errorifexists | 写入模式
| N | 默认errorifexists |
las.query.result.save.path | lasfs:/path/to/result/dir/ | 结果保存目标路径 | Y | - |
las.query.result.format | csv / parquet | 结果格式 | N | 默认parquet |
las.query.result.partition.num | 正数 | 结果集的分片数 | N | 不设置默认为最后一次shuffle的partition数 |
说明
该功能会存储任务的全量结果集
开启该功能则无法通过SDK getResult()
接口获取结果
通过控制台 SQL查询 提交
set las.query.result.save.enabled=true; set las.query.result.save.path=lasfs:/public/test_results_save/; set las.query.result.format=csv; set las.query.result.save.mode=overwrite; set las.query.result.partition.num=10; SELECT * FROM `public_sample_dataset`.`store_1g` limit 100000;
set las.query.result.save.enabled=true; set las.query.result.format=csv; set las.query.result.save.mode=overwrite; set las.query.result.partition.num=10; set las.query.result.save.path=tos://test_bucket/tos_test/test1; set las.query.result.save.credentials.ak=${tos桶对应账号的AK}; set las.query.result.save.credentials.sk=${tos桶对应账号的SK}; SELECT * FROM `public_sample_dataset`.`store_1g` limit 10;
通过Query SDK提交:
import com.volcengine.las.Job; import com.volcengine.las.LAS; import com.volcengine.las.LASClient; import com.volcengine.las.LASClientOptions; import com.volcengine.las.auth.StaticCredentials; import com.volcengine.las.exception.LASException; import com.volcengine.las.task.SQLTask; public final class Test { public static void main(String[] args) { try { LAS client = getClientInstance(); String sql = "SELECT * FROM `public_sample_dataset`.`store_1g` limit 100000;"; SQLTask syncTask = new SQLTask.Builder(sql) .name("Task results stored to LasFs test") .addConf("las.query.result.save.enabled", "true") .addConf("las.query.result.save.path", "lasfs:/public/test_results_save/") .addConf("las.query.result.format", "csv") .addConf("las.query.result.save.mode", "overwrite") .addConf("las.query.result.partition.num", "10") .sync(true) .build(); Job job = client.execute(syncTask); } catch (LASException ex) { ex.printStackTrace(); } } private static LAS getClientInstance() { StaticCredentials credentials = new StaticCredentials( "${user ak}", "${user sk}" ); LASClientOptions options = new LASClientOptions.Builder(credentials) .build(); return new LASClient(options); } }