最近更新时间:2023.09.01 17:47:35
首次发布时间:2022.02.24 10:16:51
LAS Query Java SDK 帮助 LAS 用户更加轻松地通过 Java 语言使用 LAS 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便您参考使用。
系统概念
Endpoint:表示 LAS 对外服务的 API 域名
Region:表示 LAS 的数据中心所在的物理区域
目前 LAS 支持的地域和 API 域名如下表所示:
Region(中文名称) | Region | Endpoint |
---|---|---|
华北-北京 | cn-beijing | las.volcengineapi.com |
内部概念
Schema:一个可以包含 数据表、资源、UDF 等的集合空间概念
Resource:表示资源,目前分为 Jar、File、ZIP、PyFile 四种类型
Task:定义某次任务的执行信息,包括 查询 SQL、执行方式(同步/异步)、任务名、参数等信息
Job:表示某次 Task 执行生成的任务实例
Result:表示某次 Job 的运行结果
ResultSchema:运行结果的 Schema 信息
Record:表示运行结果的结果集中的一行记录
要求:
Java 1.8 或更高版本
Maven
通过在 Maven 项目中增加依赖项的方式安装 SDK,在 pom.xml 中加入相应依赖即可。
<repository> <id>bytedance-public</id> <name>bytedance maven</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository>
<dependency> <groupId>com.volcengine</groupId> <artifactId>las-sdk-core</artifactId> <version>1.0.1.0-public</version> </dependency>
LAS SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:
String endpoint = "https://las.volcengineapi.com"; // 默认endpoint String region = "cn-beijing"; // 默认region String accessKey = "Your Access Key"; // 用户ak String secretKey = "Your Secret Key"; // 用户sk LASClient las = new LASClient( new Builder(new StaticCredentials(ak, sk)) .endpoint(endpoint) .region(region) .build() );
LASClient
客户端是后续调用各种 LAS 功能的入口,当前版本 LASClient 提供如下 API 接口:
初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:
try { String sql = "public_sample_dataset.date_dim_1g"; // 同步执行查询 SQLTask sqlTask = new SQLTask.Builder(sql) .name("first query task") .sync(true) .build(); Job job = las.execute(sqlTask); // 获取结果 if (job.isSuccess()) { System.out.println("Successfully execute the sql task. The result records show as below: "); Result result = job.getResult(); for (Record record : result.iterateAll()) { System.out.println(record); } } } catch (LASException ex) { // LASException 为 runtime 异常,无需强制 check System.out.println("Error in executing sql task. error = " + ex); }
本节将以代码示例的形式展示更多 LAS 功能的使用方式。
SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下四个参数:
参数 | 类型 | 是否必须 | 描述 | 使用 |
---|---|---|---|---|
sql | String | Y | sql 语句 | Builder 构造器传入 |
name | String | N | 任务名;如果不指定会以 SQLTask_${current_timestamp} 的方式生成 | 调用.name() |
conf | <TaskConf, String> | N | 用于指定任务参数;默认为空 | 调用 .addConf() |
sync | boolean | N | 同步/异步执行;true 表示同步;用户不指定情况默认为同步 | 调用 .sync() |
示例:
package com.volcengine.las.example; import com.volcengine.las.Job; import com.volcengine.las.LAS; import com.volcengine.las.LASClient; import com.volcengine.las.LASClientOptions; import com.volcengine.las.Record; import com.volcengine.las.Result; import com.volcengine.las.auth.StaticCredentials; import com.volcengine.las.conf.BasicTaskConf; import com.volcengine.las.exception.LASException; import com.volcengine.las.task.SQLTask; import java.time.Duration; import java.time.temporal.ChronoUnit; public final class ExecuteSQLTask { private final static String sql = "select * from minreng_test_db_2.las_table_non_index_partitioned_y where date='today' limit 10"; public static void main(String[] args) { LAS client = getClientInstance(); // 同步执行任务 syncExecuteSQLTask(client); // 异步执行任务 asyncExecuteSQLTask(client); } public static void syncExecuteSQLTask(LAS client) { try { SQLTask syncTask = new SQLTask.Builder(sql) .name("sdk_test: " + System.currentTimeMillis()) .sync(true) .build(); Job job = client.execute(syncTask); if (job.isSuccess()) { System.out.println("Successfully execute the sql task. The result records:"); Result result = job.getResult(); for (Record record : result.iterateAll()) { System.out.println(record); } } } catch (LASException ex) { System.out.println("Error in executing sql task synchronously. error = " + ex); } } public static void asyncExecuteSQLTask(LAS client) { try { SQLTask asyncTask = new SQLTask.Builder(sql) .name("sdk_test: " + System.currentTimeMillis()) .sync(false) .build(); // 异步执行提交后立即返回 Job job = client.execute(asyncTask); System.out.println("Submit the SQL task asynchronously."); // 异步执行提交后可进行其他逻辑处理, job.waitForSuccess(Duration.of(2, ChronoUnit.MINUTES)); if (job.isSuccess()) { System.out.println("Successfully execute the sql task. The result records:"); Result result = job.getResult(); for (Record record : result.iterateAll()) { System.out.println(record); } } } catch (LASException ex) { System.out.println("Error in executing sql task asynchronously. error = " + ex); } } private static LAS getClientInstance() { StaticCredentials credentials = new StaticCredentials( "Your Access Key", "Your Secret Key" ); LASClientOptions options = new LASClientOptions.Builder(credentials) .build(); return new LASClient(options); } }
支持的 SQL 语法请参照 LAS SQL语法
同步/异步任务提交的差异请参照 5.1.5 同步/异步执行
SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业 文档。
SparkTask 是 SDK 提供 SparkJar 任务执行的接口:
参数 | 类型 | 是否必须 | 描述 | 使用 |
---|---|---|---|---|
jar | JarResourceInfo | Y | 任务执行时使用的 SparkJar 资源 | Builder 构造器第一个参数传入 |
mainClass | String | Y | Spark application 的 main class | Builder 构造器第二个参数传入 |
mainArgs | List | N | spark application 的 main function 参数;不传默认为 empty list | 调用 .mainArgs() |
name | String | N | 任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成 | 调用 .name() |
conf | <TaskConf, String> | N | 封装 set k=v 语句,用于指定任务参数;默认为空 | 调用 .addConf() |
sync | boolean | N | 同步/异步执行;true 表示同步;用户不指定情况默认为同步 | 调用 .sync() |
示例:
package com.volcengine.las.example; import com.volcengine.las.Job; import com.volcengine.las.LAS; import com.volcengine.las.LASClient; import com.volcengine.las.LASClientOptions; import com.volcengine.las.auth.StaticCredentials; import com.volcengine.las.exception.LASException; import com.volcengine.las.resource.JarResourceInfo; import com.volcengine.las.task.SparkTask; import com.volcengine.las.task.SparkTask.Builder; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.List; public final class ExecuteSparkJarTask { public static void main(String[] args) { LAS client = getClientInstance(); // 执行 spark jar 任务 executeSparkJarTask(client); } public static void executeSparkJarTask(LAS client) { String schema = "test_sunxh"; String resourceName = "sparkjar"; String mainClass = "SparkJar"; List<String> mainArgs = Collections.emptyList(); SparkTask sparkTask = new Builder(JarResourceInfo.of(schema, resourceName), mainClass) .name("spark jar example application") .mainArgs(mainArgs) .build(); try { Job job = client.execute(sparkTask); job.waitFor( Duration.of(3, ChronoUnit.SECONDS), Duration.of(2, ChronoUnit.MINUTES), instance -> instance.getTrackingUrl() != null ); System.out.println("Tracking url: " + job.getTrackingUrl()); job.waitForFinished(Duration.of(2, ChronoUnit.MINUTES)); System.out.println("The task executed successfully."); } catch (LASException ex) { System.out.println("The task failed. error = " + ex); } } private static LAS getClientInstance() { StaticCredentials credentials = new StaticCredentials( "Your Access Key", "Your Secret Key" ); LASClientOptions options = new LASClientOptions.Builder(credentials) .build(); return new LASClient(options); } }
通过配置 Task 的 sync 参数进行控制:
// 同步,阻塞直至任务状态变为 成功/失败/取消 等终止状态 SQLTask syncTask = new SQLTask.Builder(sql) .name("sync sql task") .sync(true) .build(); Job job = client.execute(syncTask); // 异步,立刻返回 SQLTask asyncTask = new SQLTask.Builder(sql) .name("async sql task") .sync(false) .build(); Job job = client.execute(syncTask); // 后续可通过 refresh 任务状态信息检查是否完成 job.refresh(); if (job.isSuccess()) { // ... ... }
// 取消任务可以通过 job 实例;也可以通过 LasClient 进行取消 job.cancel(); lasClient.cancel(job);
可以根据任务 ID 进行任务实例的获取:
Job job = lasClient.getJob(jobId);
从拿到的任务实例获取任务对应的 Spark UI / presto UI 页面:
job.getTrackingUrl();
异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 waitFor() 函数:
// 等待任务成功 job.waitForSuccess(); // 或自定义等待任务满足某种条件 job.waitFor( Duration.of(3, ChronoUnit.SECONDS), // 轮询检查条件的时间间隔 Duration.of(2, ChronoUnit.MINUTES), // 等待超时时间 instance -> instance.getTrackingUrl() != null // 等待退出条件 );
对 job 实例调用 getResult()
获取任务的查询结果:
package com.volcengine.las.example; import com.volcengine.las.Field; import com.volcengine.las.Job; import com.volcengine.las.LAS; import com.volcengine.las.LASClient; import com.volcengine.las.LASClientOptions; import com.volcengine.las.Record; import com.volcengine.las.Result; import com.volcengine.las.Schema; import com.volcengine.las.auth.StaticCredentials; import com.volcengine.las.conf.BasicTaskConf; import com.volcengine.las.task.SQLTask; public final class QueryResults { public static void main(String[] args) { LAS client = getClientInstance(); // query results queryResults(client); } public static void queryResults(LAS client) { String sql = "SELECT * FROM public_sample_dataset.store_sales_1g"; SQLTask syncTask = new SQLTask.Builder(sql) .sync(true) .build(); Job job = client.execute(syncTask); Result result = job.getResult(); Schema schema = result.getSchema(); for (Record record : result.iterateAll()) { StringBuilder row = new StringBuilder( Row[ ); for (Field field : schema.getFields()) { row.append( String.format("%s: %s = %s,", field.getName(), field.getType(), record.get(field.getName()) ) ); } if (row.indexOf(",") > 0) { row.deleteCharAt(row.length()-1); } row.append("]"); System.out.println(row); } } private static LAS getClientInstance() { StaticCredentials credentials = new StaticCredentials( "Your Access Key", "Your Secret Key" ); LASClientOptions options = new LASClientOptions.Builder(credentials) .build(); return new LASClient(options); } }
上传资源通过 XXXResourceInfo.of()
进行指定:
package com.volcengine.las.example; import com.volcengine.las.LAS; import com.volcengine.las.LASClient; import com.volcengine.las.LASClientOptions; import com.volcengine.las.auth.StaticCredentials; import com.volcengine.las.resource.FileResourceInfo; import java.io.File; import java.net.URL; import java.util.Arrays; public final class UploadResource { public static void main(String[] args) { LAS client = getClientInstance(); // upload resources uploadResource(client); // list resources listResources(client); } public static void uploadResource(LAS client) { String path = "/path/to/your/file.txt"; String schema = "test_scm"; String resourceName = "test_file_1"; String description = "测试文件"; File file = new File(path); FileResourceInfo resourceRef = FileResourceInfo.of(schema, resourceName, description); client.upload(resourceRef, file); System.out.println(String.format("Success in uploading resource: %s - %s", resourceRef, file)); } public static void listResources(LAS client) { String schema = "test_scm"; Page<Resource> resources = client.listResources(schema); // print all pages for (Resource resource : resources.iterateAll()) { System.out.println("Resource: " + resource); } } private static LAS getClientInstance() { StaticCredentials credentials = new StaticCredentials( "Your Access Key", "Your Secret Key" ); LASClientOptions options = new LASClientOptions.Builder(credentials) .build(); return new LASClient(options); } }
目前 LAS 支持四种资源类型:
File -> 调用 FileResourceInfo.of()
Zip -> 调用 ZipResourceInfo.of()
PyFlie -> 调用 PyFileResourceInfo.of()
Jar -> 调用 JarResourceInfo.of()
UDF 的创建目前 SDK 侧只支持通过 SQLTask 进行执行。详见语法文档的 Create Function 部分。
package com.volcengine.las.example; import com.volcengine.las.Job; import com.volcengine.las.LAS; import com.volcengine.las.LASClient; import com.volcengine.las.LASClientOptions; import com.volcengine.las.Record; import com.volcengine.las.auth.StaticCredentials; import com.volcengine.las.conf.BasicTaskConf; import com.volcengine.las.task.SQLTask; import java.time.Duration; import java.time.temporal.ChronoUnit; public final class CreateUDF { public static void main(String[] args) { LAS client = getClientInstance(); createUDF(client); useUDF(client); } public static void createUDF(LAS client) { String sql = "CREATE FUNCTION test_scm.sortstring2 AS 'com.bytedance.hive.udf.dp.SortString' using jar 'testudf.new'"; SQLTask syncTask = new SQLTask.Builder(sql) .sync(true) .build(); client.execute(syncTask); } public static void useUDF(LAS client) { String sql = "select test_scm.sortstring2(name) from testudf.test1 where date='20210928' limit 10"; SQLTask syncTask = new SQLTask.Builder(sql) .sync(false) .build(); Job job = client.execute(syncTask); job.waitForSuccess(Duration.of(2, ChronoUnit.MINUTES)); for (Record record : job.getResult().iterateAll()) { System.out.println(record); } } private static LAS getClientInstance() { StaticCredentials credentials = new StaticCredentials( "Your Access Key", "Your Secret Key" ); LASClientOptions options = new LASClientOptions.Builder(credentials).build(); return new LASClient(options); } }
任务异常将会以 LAS Exception(unchecked exception,无需显式 catch)的形式进行抛出,exception message 内携带具体的执行错误信息。
Q:执行分区表查询时,报 No partition predicate found for table 'xxx'
错误
目前 LAS 对分区表查询必须指定具体的分区条件,加上具体的分区筛选条件即可成功执行,例如:where `date` = '20200218'`
Q:为什么使用getResult()
获取的查询结果有行数限制?
出于数据安全考虑,目前 SDK 的getResult()
方法获取到的结果会对行数进行限制;如需获取全量结果数据,建议另外建表并将结果写入,然后再配合 LAS Tunnel 获取结果
Q:无法上传超过 2G 的资源
目前不支持超过 2G 的资源的上传
Q:连接 LAS Endpoint 超时
火山引擎内部访问建议将 https://las.volcengineapi.com 替换为 https://open.volcengineapi.com