加载 EMR Serverless Presto JDBC 驱动 。
Class.forName("com.facebook.presto.jdbc.PrestoDriver");
创建 EMR Serverless Presto Connection。
// las_queue_name 默认值为 public-queue-presto 即 Presto 公共队列 String url = "jdbc:presto://${endpoint}:8080/${Catalog}/${Schema}?region=cn-beijing&sessionProperties=presto_queue_name:${your-queue-name};compute_group_name:${your-compute-group-name}" Properties properties = new Properties(); properties.setProperty("user", ${User}); properties.setProperty("password", ${Password}); Connection connection = DriverManager.getConnection(url, properties);
JDBC 连接串参数说明。
参数 | 必填 | 说明 |
|---|---|---|
JDBC 服务地址 | 是 | EMR Serverless Presto JDBC 服务: |
Catalog | 是 | Catalog 名称,可以使用 hive、hudi、iceberg 和 delta |
Schema | 是 | 数据库名称 |
region | 是 | EMR Serverless Presto 服务所在地域, 目前支持 cn-beijing、cn-shanghai、cn-guangzhou、ap-southeast-1,请根据实际情况填写 |
user | 是 | 火山引擎 API 密钥管理中的 Access Key ID |
password | 是 | 火山引擎 API 密钥管理中的 Secret Access Key |
presto_queue_name | 否 | EMR Serverless Presto 所在队列名称, 默认值为公共队列: public-queue-presto |
compute_group_name | 否 | EMR Serverless Presto 所在计算组名称 |
查询数据完整代码示例。
import com.facebook.presto.jdbc.PrestoStatement; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.util.Properties; public void demo() { try { Class.forName("com.facebook.presto.jdbc.PrestoDriver"); } catch (ClassNotFoundException e) { System.out.println("Failed to load Presto JDCB driver..."); return; } final String ak = "火山引擎 API 密钥管理中的 Access Key ID"; final String sk = "火山引擎 API 密钥管理中的 Secret Access Key"; final String jdbcUrl = "jdbc:presto://${endpoint}:8080/hive/serverless_presto_schema?region=cn-beijing"; Properties properties = new Properties(); properties.setProperty("user", ak); properties.setProperty("password", sk); try (PrestoConnection connection = (PrestoConnection) DriverManager.getConnection(url, properties)) { ResultSet resultSet = connection.createStatement().executeQuery("select name from serverless_presto_table"); while (resultSet.next()) { System.out.println(resultSet.getString(1)); } } catch (Exception e) { e.printStackTrace(); } }
编译执行代码。
说明
以上代码编译以后,可以将 Jar 包提交到服务器上执行。
# -*- coding: utf-8 -*- import requests from pyhive import presto if __name__ == "__main__": ak = 'xxxxxxxx' sk = 'xxxxxxxx' request_session = requests.Session() request_session.headers.update({ 'X-Presto-Extra-Credential': f"access_key_id={ak},secret_access_key={sk}" # 设置认证信息 }) presto_config = { 'presto_queue_name': 'xxx', 'compute_group_name': 'xxx' } host = 'xxxxx' conn = presto.connect(host, port='8080', username = 'xxx', session_props=presto_config, requests_session=request_session) cur = conn.cursor() query = 'select 1' cur.execute(query) result = cur.fetchall() for row in result: print(row)
import prestodb ak = 'xxx' sk = 'xxxx' if __name__ == '__main__': headers = {'X-Presto-Extra-Credential': f"access_key_id={ak},secret_access_key={sk}"} conn=prestodb.dbapi.connect( host='xxx', port=8080, user='xxx', catalog='xxx', schema='xxx', http_headers=headers, session_properties={ 'presto_queue_name': 'xxxx', 'compute_group_name': 'xxxx' } ) cur = conn.cursor() cur.execute('select 1') rows = cur.fetchall() for row in result: print(row)
package main import ( "database/sql" "fmt" "log" "net/http" "github.com/prestodb/presto-go-client/presto" ) type prestoRT struct { base http.RoundTripper } func (r *prestoRT) RoundTrip(req *http.Request) (*http.Response, error) { req.Header.Set("X-Presto-User", "abc") req.Header.Set( "X-Presto-Extra-Credential", "access_key_id=您的accesskey,secret_access_key=您的secretKey", ) req.Header.Add("X-Presto-Session", "presto_queue_name=您的队列名称") req.Header.Add("X-Presto-Session", "compute_group_name=您的计算组名称") return r.base.RoundTrip(req) } func main() { httpClient := &http.Client{ Transport: &prestoRT{ base: http.DefaultTransport, }, } dsn := "http://xxxx:8080?catalog=xxx&schema=xxx&custom_client=my_custom_client" presto.RegisterCustomClient("my_custom_client", httpClient) db, err := sql.Open("presto", dsn) if err != nil { log.Fatalf("open presto failed: %v", err) } defer db.Close() rows, err := db.Query("SHOW CATALOGS") if err != nil { log.Fatalf("query failed: %v", err) } defer rows.Close() for rows.Next() { var catalog string if err := rows.Scan(&catalog); err != nil { log.Fatalf("scan failed: %v", err) } fmt.Println("catalog =", catalog) } }
使用流程:
database/sql接口操作Presto:示例中执行了SHOW CATALOGS查询并打印返回结果您需根据实际情况替换修改参数:accesskey、secretKey、队列名称、计算组名称、Presto服务地址、catalog、schema。