You need to enable JavaScript to run this app.
导航

Query SDK

最近更新时间2024.04.23 14:32:48

首次发布时间2022.02.24 10:16:51

1. 简介

LAS Query Java SDK 帮助 LAS 用户更加轻松地通过 Java 语言使用 LAS 查询服务,目前主要功能包括 任务提交/取消、任务信息获取、结果获取、上传资源等。
本文提供了上述功能的示例代码,方便您参考使用。

2. 概念说明

系统概念

  • Endpoint:表示 LAS 对外服务的 API 域名
公网 Endpoint内网 Endpoint
las.volcengineapi.comopen.volcengineapi.com
  • Region:表示 LAS 的数据中心所在的物理区域

目前 LAS 支持的地域如下表所示:

Region(中文名称)Region
华北-北京cn-beijing
华东-上海cn-shanghai
华南-广州cn-guangzhou
  • Access Key / Secret Access Key:访问火山引擎 API 的密钥;用户可以通过火山引擎的 密钥管理 页面获取到 Access Key 和 Secret Access Key

内部概念

  • Schema:一个可以包含 数据表、资源、UDF 等的集合空间概念

  • Resource:表示资源,目前分为 Jar、File、ZIP、PyFile 四种类型

  • Task:定义某次任务的执行信息,包括 查询 SQL、执行方式(同步/异步)、任务名、参数等信息

  • Job:表示某次 Task 执行生成的任务实例

  • Result:表示某次 Job 的运行结果

  • ResultSchema:运行结果的 Schema 信息

  • Record:表示运行结果的结果集中的一行记录

3. 安装 SDK

要求:

  • Java 1.8 或更高版本

  • Maven

通过在 Maven 项目中增加依赖项的方式安装 SDK,在 pom.xml 中加入相应依赖即可。

<repository>
	<id>bytedance-public</id>
	<name>bytedance maven</name>
	<url>https://artifact.bytedance.com/repository/releases</url>
</repository>
<dependency>
  <groupId>com.volcengine</groupId>
  <artifactId>las-sdk-core</artifactId>
  <version>1.0.2.0-public</version>
</dependency>
4. 快速入门

4.1 初始化客户端

LAS SDK 目前仅提供一种静态初始化客户端的方式,通过配置 endpoint,region,Access Key,Secret Access Key 进行初始化:

String endpoint = "https://las.volcengineapi.com"; // 按实际需求填写
String region = "cn-beijing"; // 按实际需求填写
String accessKey = "Your Access Key"; // 用户ak
String secretKey  = "Your Secret Key"; // 用户sk

LASClient las = new LASClient(
    new Builder(new StaticCredentials(ak, sk))
        .endpoint(endpoint)
        .region(region)
        .build()
);

LASClient 客户端是后续调用各种 LAS 功能的入口,当前版本 LASClient 提供如下 API 接口:

4.2 第一个查询

初始化 Client 完成后,可通过执行相关 Task(目前支持 SQL,SparkJar 两种任务类型)来进行任务执行。
如下为一个进行简单 SQL 查询的例子:

try {
    String sql =  "public_sample_dataset.date_dim_1g";
    
    // 同步执行查询
    SQLTask sqlTask = new SQLTask.Builder(sql)
        .name("first query task")
        .sync(true)
        .build();
    Job job = las.execute(sqlTask);
    
    // 获取结果
    if (job.isSuccess()) {
        System.out.println("Successfully execute the sql task. The result records show as below: ");
        Result result = job.getResult();
        for (Record record : result.iterateAll()) {
            System.out.println(record);
        }
    }
}
catch (LASException ex) {
    // LASException 为 runtime 异常,无需强制 check
    System.out.println("Error in executing sql task. error = " + ex);
}
5. 更多示例

本节将以代码示例的形式展示更多 LAS 功能的使用方式。

5.1 提交/取消任务

5.1.1 提交 SQL 任务

SQLTask 是用于执行 SQL 查询任务的接口。主要提供如下四个参数:

参数类型是否必须描述使用
sqlStringYsql 语句Builder 构造器传入
nameStringN任务名;如果不指定会以 SQLTask_${current_timestamp} 的方式生成调用.name()
conf<TaskConf, String>N用于指定任务参数;默认为空调用 .addConf()
syncbooleanN同步/异步执行;true 表示同步;用户不指定情况默认为同步调用 .sync()

示例:

package com.volcengine.las.example;

import com.volcengine.las.Job;
import com.volcengine.las.LAS;
import com.volcengine.las.LASClient;
import com.volcengine.las.LASClientOptions;
import com.volcengine.las.Record;
import com.volcengine.las.Result;
import com.volcengine.las.auth.StaticCredentials;
import com.volcengine.las.conf.BasicTaskConf;
import com.volcengine.las.exception.LASException;
import com.volcengine.las.task.SQLTask;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

public final class ExecuteSQLTask {

  private final static String sql = "select * from minreng_test_db_2.las_table_non_index_partitioned_y where date='today' limit 10";

  public static void main(String[] args) {
    LAS client = getClientInstance();

    // 同步执行任务
    syncExecuteSQLTask(client);

    // 异步执行任务
    asyncExecuteSQLTask(client);
  }

  public static void syncExecuteSQLTask(LAS client) {
    try {      
      SQLTask syncTask = new SQLTask.Builder(sql)
          .name("sdk_test: " + System.currentTimeMillis())
          .sync(true)
          .build();
      Job job = client.execute(syncTask);
      if (job.isSuccess()) {
        System.out.println("Successfully execute the sql task. The result records:");
        Result result = job.getResult();
        for (Record record : result.iterateAll()) {
          System.out.println(record);
        }
      }
    }
    catch (LASException ex) {
      System.out.println("Error in executing sql task synchronously. error = " + ex);
    }
  }

  public static void asyncExecuteSQLTask(LAS client) {
    try {
      SQLTask asyncTask = new SQLTask.Builder(sql)
          .name("sdk_test: " + System.currentTimeMillis())
          .sync(false)
          .build();
      // 异步执行提交后立即返回
      Job job = client.execute(asyncTask);
      System.out.println("Submit the SQL task asynchronously.");

      // 异步执行提交后可进行其他逻辑处理,
      job.waitForSuccess(Duration.of(2, ChronoUnit.MINUTES));
      if (job.isSuccess()) {
        System.out.println("Successfully execute the sql task. The result records:");
        Result result = job.getResult();
        for (Record record : result.iterateAll()) {
          System.out.println(record);
        }
      }
    }
    catch (LASException ex) {
      System.out.println("Error in executing sql task asynchronously. error = " + ex);
    }
  }

  private static LAS getClientInstance() {
    StaticCredentials credentials = new StaticCredentials(
        "Your Access Key",
        "Your Secret Key"
    );
    LASClientOptions options = new LASClientOptions.Builder(credentials)
        .build();
    return new LASClient(options);
  }
}

支持的 SQL 语法请参照 LAS SQL语法
同步/异步任务提交的差异请参照 5.1.5 同步/异步执行

5.1.2 提交 SparkJar 任务

SparkJar 任务为用户提供了通过编写 Spark 应用进行定制化数据分析需求的支持。详见 Spark Jar 作业 文档。
SparkTask 是 SDK 提供 SparkJar 任务执行的接口:

参数类型是否必须描述使用
jarJarResourceInfoY任务执行时使用的 SparkJar 资源Builder 构造器第一个参数传入
mainClassStringYSpark application 的 main classBuilder 构造器第二个参数传入
mainArgsListNspark application 的 main function 参数;不传默认为 empty list调用 .mainArgs()
nameStringN任务名;如果不指定会以 SparkJarTask_${current_time} 的方式生成调用 .name()
conf<TaskConf, String>N封装 set k=v 语句,用于指定任务参数;默认为空调用 .addConf()
syncbooleanN同步/异步执行;true 表示同步;用户不指定情况默认为同步调用 .sync()

示例:

package com.volcengine.las.example;

import com.volcengine.las.Job;
import com.volcengine.las.LAS;
import com.volcengine.las.LASClient;
import com.volcengine.las.LASClientOptions;
import com.volcengine.las.auth.StaticCredentials;
import com.volcengine.las.exception.LASException;
import com.volcengine.las.resource.JarResourceInfo;
import com.volcengine.las.task.SparkTask;
import com.volcengine.las.task.SparkTask.Builder;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;

public final class ExecuteSparkJarTask {

  public static void main(String[] args) {
    LAS client = getClientInstance();

    // 执行 spark jar 任务
    executeSparkJarTask(client);
  }

  public static void executeSparkJarTask(LAS client) {
    String schema = "test_sunxh";
    String resourceName = "sparkjar";
    String mainClass = "SparkJar";
    List<String> mainArgs = Collections.emptyList();

    SparkTask sparkTask = new Builder(JarResourceInfo.of(schema, resourceName), mainClass)
        .name("spark jar example application")
        .mainArgs(mainArgs)
        .build();

    try {
      Job job = client.execute(sparkTask);
      job.waitFor(
          Duration.of(3, ChronoUnit.SECONDS),
          Duration.of(2, ChronoUnit.MINUTES),
          instance -> instance.getTrackingUrl() != null
      );
      System.out.println("Tracking url: " + job.getTrackingUrl());

      job.waitForFinished(Duration.of(2, ChronoUnit.MINUTES));
      System.out.println("The task executed successfully.");
    }
    catch (LASException ex) {
      System.out.println("The task failed. error = " + ex);
    }
  }

  private static LAS getClientInstance() {
    StaticCredentials credentials = new StaticCredentials(
        "Your Access Key",
        "Your Secret Key"
    );
    LASClientOptions options = new LASClientOptions.Builder(credentials)
        .build();
    return new LASClient(options);
  }
}

5.1.3 同步/异步执行

通过配置 Task 的 sync 参数进行控制:

// 同步,阻塞直至任务状态变为 成功/失败/取消 等终止状态
SQLTask syncTask = new SQLTask.Builder(sql)
    .name("sync sql task")
    .sync(true)
    .build();
Job job = client.execute(syncTask);

// 异步,立刻返回
SQLTask asyncTask = new SQLTask.Builder(sql)
    .name("async sql task")
    .sync(false)
    .build();
Job job = client.execute(syncTask);

// 后续可通过 refresh 任务状态信息检查是否完成
job.refresh();
if (job.isSuccess()) {
    // ... ...
}

5.1.4 取消任务

// 取消任务可以通过 job 实例;也可以通过 LasClient 进行取消
job.cancel();
lasClient.cancel(job);

5.2 查看任务实例相关信息

5.2.1 获取任务实例

可以根据任务 ID 进行任务实例的获取:

Job job = lasClient.getJob(jobId);

5.2.2 获取引擎侧任务执行 UI

从拿到的任务实例获取任务对应的 Spark UI / presto UI 页面:

job.getTrackingUrl();

5.2.3 等待任务

异步调用后,如果想重新同步阻塞等待任务到达某种状态,可以尝试调用 waitFor() 函数:

// 等待任务成功
job.waitForSuccess();

// 或自定义等待任务满足某种条件
job.waitFor(
    Duration.of(3, ChronoUnit.SECONDS),    // 轮询检查条件的时间间隔
    Duration.of(2, ChronoUnit.MINUTES),    // 等待超时时间
    instance -> instance.getTrackingUrl() != null    // 等待退出条件
);

5.3 获取查询结果

对 job 实例调用 getResult() 获取任务的查询结果:

package com.volcengine.las.example;

import com.volcengine.las.Field;
import com.volcengine.las.Job;
import com.volcengine.las.LAS;
import com.volcengine.las.LASClient;
import com.volcengine.las.LASClientOptions;
import com.volcengine.las.Record;
import com.volcengine.las.Result;
import com.volcengine.las.Schema;
import com.volcengine.las.auth.StaticCredentials;
import com.volcengine.las.conf.BasicTaskConf;
import com.volcengine.las.task.SQLTask;

public final class QueryResults {

  public static void main(String[] args) {
    LAS client = getClientInstance();

    // query results
    queryResults(client);
  }

  public static void queryResults(LAS client) {
    String sql = "SELECT * FROM public_sample_dataset.store_sales_1g";

    SQLTask syncTask = new SQLTask.Builder(sql)
        .sync(true)
        .build();
    Job job = client.execute(syncTask);

    Result result = job.getResult();
    Schema schema = result.getSchema();
    for (Record record : result.iterateAll()) {
      StringBuilder row = new StringBuilder( Row[ );
      for (Field field : schema.getFields()) {
        row.append(
            String.format("%s: %s = %s,",
                field.getName(),
                field.getType(),
                record.get(field.getName())
            )
        );
      }
      if (row.indexOf(",") > 0) {
        row.deleteCharAt(row.length()-1);
      }
      row.append("]");
      System.out.println(row);
    }
  }

  private static LAS getClientInstance() {
    StaticCredentials credentials = new StaticCredentials(
        "Your Access Key",
        "Your Secret Key"
    );
    LASClientOptions options = new LASClientOptions.Builder(credentials)
        .build();
    return new LASClient(options);
  }
}

5.4 上传资源/查看资源列表

上传资源通过 XXXResourceInfo.of()进行指定:

package com.volcengine.las.example;

import com.volcengine.las.LAS;
import com.volcengine.las.LASClient;
import com.volcengine.las.LASClientOptions;
import com.volcengine.las.auth.StaticCredentials;
import com.volcengine.las.resource.FileResourceInfo;
import java.io.File;
import java.net.URL;
import java.util.Arrays;

public final class UploadResource {

  public static void main(String[] args) {
    LAS client = getClientInstance();

    // upload resources
    uploadResource(client);
    
    // list resources
    listResources(client);
  }

  public static void uploadResource(LAS client) {
    String path = "/path/to/your/file.txt";
    String schema = "test_scm";
    String resourceName = "test_file_1";
    String description = "测试文件";

    File file = new File(path);
    FileResourceInfo resourceRef = FileResourceInfo.of(schema, resourceName, description);
    client.upload(resourceRef, file);
    System.out.println(String.format("Success in uploading resource: %s - %s", resourceRef, file));
  }

    public static void listResources(LAS client) {
      String schema = "test_scm";
      Page<Resource> resources = client.listResources(schema);
    
      // print all pages
      for (Resource resource : resources.iterateAll()) {
        System.out.println("Resource: " + resource);
      }
    }

  private static LAS getClientInstance() {
    StaticCredentials credentials = new StaticCredentials(
        "Your Access Key",
        "Your Secret Key"
    );
    LASClientOptions options = new LASClientOptions.Builder(credentials)
        .build();
    return new LASClient(options);
  }
}

目前 LAS 支持四种资源类型:

  • File -> 调用 FileResourceInfo.of()

  • Zip -> 调用 ZipResourceInfo.of()

  • PyFlie -> 调用 PyFileResourceInfo.of()

  • Jar -> 调用 JarResourceInfo.of()

5.5 UDF 的创建和使用

UDF 的创建目前 SDK 侧只支持通过 SQLTask 进行执行。详见语法文档的 Create Function 部分。

package com.volcengine.las.example;


import com.volcengine.las.Job;
import com.volcengine.las.LAS;
import com.volcengine.las.LASClient;
import com.volcengine.las.LASClientOptions;
import com.volcengine.las.Record;
import com.volcengine.las.auth.StaticCredentials;
import com.volcengine.las.conf.BasicTaskConf;
import com.volcengine.las.task.SQLTask;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

public final class CreateUDF {

  public static void main(String[] args) {
    LAS client = getClientInstance();

    createUDF(client);

    useUDF(client);
  }

  public static void createUDF(LAS client) {
    String sql = "CREATE FUNCTION test_scm.sortstring2 AS 'com.bytedance.hive.udf.dp.SortString' using jar 'testudf.new'";
    SQLTask syncTask = new SQLTask.Builder(sql)
        .sync(true)
        .build();
    client.execute(syncTask);
  }

  public static void useUDF(LAS client) {
    String sql = "select test_scm.sortstring2(name) from testudf.test1 where date='20210928' limit 10";
    SQLTask syncTask = new SQLTask.Builder(sql)
        .sync(false)
        .build();
    Job job = client.execute(syncTask);
    job.waitForSuccess(Duration.of(2, ChronoUnit.MINUTES));
    for (Record record : job.getResult().iterateAll()) {
      System.out.println(record);
    }
  }

  private static LAS getClientInstance() {
    StaticCredentials credentials = new StaticCredentials(
        "Your Access Key",
        "Your Secret Key"
    );
    LASClientOptions options = new LASClientOptions.Builder(credentials).build();
    return new LASClient(options);
  }
}

5.6 执行异常

任务异常将会以 LAS Exception(unchecked exception,无需显式 catch)的形式进行抛出,exception message 内携带具体的执行错误信息。

6. 常见问题 FAQ

Q:执行分区表查询时,报 No partition predicate found for table 'xxx' 错误

目前 LAS 对分区表查询必须指定具体的分区条件,加上具体的分区筛选条件即可成功执行,例如:where `date` = '20200218'`

Q:为什么使用getResult()获取的查询结果有行数限制?

出于数据安全考虑,目前 SDK 的getResult()方法获取到的结果会对行数进行限制

Q:无法上传超过 2G 的资源

目前不支持超过 2G 的资源的上传

Q:连接 LAS Endpoint 超时

火山引擎内部访问建议将 https://las.volcengineapi.com 替换为 https://open.volcengineapi.com