You need to enable JavaScript to run this app.
文档中心
E-MapReduce

E-MapReduce

复制全文
下载 pdf
Presto
连接 Presto
复制全文
下载 pdf
连接 Presto

Java JDBC

环境准备

连接 Presto 队列

  1. 加载 EMR Serverless Presto JDBC 驱动 。

    Class.forName("com.facebook.presto.jdbc.PrestoDriver");
    
  2. 创建 EMR Serverless Presto Connection。

    // las_queue_name 默认值为 public-queue-presto 即 Presto 公共队列
    String url = "jdbc:presto://${endpoint}:8080/${Catalog}/${Schema}?region=cn-beijing&sessionProperties=presto_queue_name:${your-queue-name};compute_group_name:${your-compute-group-name}" 
    
    Properties properties = new Properties();
    properties.setProperty("user", ${User});
    properties.setProperty("password", ${Password});
    Connection connection = DriverManager.getConnection(url, properties);
    
  3. JDBC 连接串参数说明。

    参数

    必填

    说明

    JDBC 服务地址

    EMR Serverless Presto JDBC 服务:
    北京:101.126.3.218:8080
    上海:180.184.140.244:8080
    广州:180.184.214.17:8080
    柔佛:101.47.30.220:8080

    Catalog

    Catalog 名称,可以使用 hive、hudi、iceberg 和 delta

    Schema

    数据库名称

    region

    EMR Serverless Presto 服务所在地域, 目前支持 cn-beijing、cn-shanghai、cn-guangzhou、ap-southeast-1,请根据实际情况填写

    user

    火山引擎 API 密钥管理中的 Access Key ID

    password

    火山引擎 API 密钥管理中的 Secret Access Key

    presto_queue_name

    EMR Serverless Presto 所在队列名称, 默认值为公共队列: public-queue-presto

    compute_group_name

    EMR Serverless Presto 所在计算组名称

  4. 查询数据完整代码示例。

    import com.facebook.presto.jdbc.PrestoStatement;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.util.Properties;
     
    public void demo() {
        try {
            Class.forName("com.facebook.presto.jdbc.PrestoDriver");
        } catch (ClassNotFoundException e) {
            System.out.println("Failed to load Presto JDCB driver...");
            return;
        }
    
        final String ak = "火山引擎 API 密钥管理中的 Access Key ID";
        final String sk = "火山引擎 API 密钥管理中的 Secret Access Key";
        final String jdbcUrl = "jdbc:presto://${endpoint}:8080/hive/serverless_presto_schema?region=cn-beijing";
        Properties properties = new Properties();
        properties.setProperty("user", ak);
        properties.setProperty("password", sk);
        try (PrestoConnection connection = (PrestoConnection) DriverManager.getConnection(url, properties)) {
            ResultSet resultSet = connection.createStatement().executeQuery("select name from serverless_presto_table");
            while (resultSet.next()) {
                System.out.println(resultSet.getString(1));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
    
  5. 编译执行代码。

    说明

    以上代码编译以后,可以将 Jar 包提交到服务器上执行。

Python JDBC

方式1:Pyhive

# -*- coding: utf-8 -*-
import requests
from pyhive import presto

if __name__ == "__main__":
    ak = 'xxxxxxxx'
    sk = 'xxxxxxxx'

    request_session = requests.Session()
    request_session.headers.update({
        'X-Presto-Extra-Credential':  f"access_key_id={ak},secret_access_key={sk}"  # 设置认证信息
    })

    presto_config = {
        'presto_queue_name': 'xxx',
        'compute_group_name': 'xxx'
    }
    host = 'xxxxx'
    conn = presto.connect(host, port='8080', username = 'xxx', session_props=presto_config, requests_session=request_session)
    cur = conn.cursor()
    query = 'select 1'
    cur.execute(query)
    result = cur.fetchall()
    for row in result:
        print(row)

方式2:Prestodb

import prestodb

ak = 'xxx'
sk = 'xxxx'

if __name__ == '__main__':
    headers = {'X-Presto-Extra-Credential':  f"access_key_id={ak},secret_access_key={sk}"}
    conn=prestodb.dbapi.connect(
        host='xxx',
        port=8080,
        user='xxx',
        catalog='xxx',
        schema='xxx',
        http_headers=headers,
        session_properties={
            'presto_queue_name': 'xxxx',
            'compute_group_name': 'xxxx'
        }
    )
    cur = conn.cursor()
    cur.execute('select 1')
    rows = cur.fetchall()
    for row in result:
        print(row)

Go Client
package main

import (
        "database/sql"
        "fmt"
        "log"
        "net/http"

        "github.com/prestodb/presto-go-client/presto"
)

type prestoRT struct {
        base http.RoundTripper
}

func (r *prestoRT) RoundTrip(req *http.Request) (*http.Response, error) {
        req.Header.Set("X-Presto-User", "abc")
        req.Header.Set(
                "X-Presto-Extra-Credential",
                "access_key_id=您的accesskey,secret_access_key=您的secretKey",
        )
        req.Header.Add("X-Presto-Session", "presto_queue_name=您的队列名称")
        req.Header.Add("X-Presto-Session", "compute_group_name=您的计算组名称")

        return r.base.RoundTrip(req)
}

func main() {
        httpClient := &http.Client{
                Transport: &prestoRT{
                        base: http.DefaultTransport,
                },
        }
        
        dsn := "http://xxxx:8080?catalog=xxx&schema=xxx&custom_client=my_custom_client"

        presto.RegisterCustomClient("my_custom_client", httpClient)
        db, err := sql.Open("presto", dsn)
        if err != nil {
                log.Fatalf("open presto failed: %v", err)
        }
        defer db.Close()

        rows, err := db.Query("SHOW CATALOGS")
        if err != nil {
                log.Fatalf("query failed: %v", err)
        }
        defer rows.Close()

        for rows.Next() {
                var catalog string
                if err := rows.Scan(&catalog); err != nil {
                        log.Fatalf("scan failed: %v", err)
                }
                fmt.Println("catalog =", catalog)
        }
}

使用流程:

  1. 基于自定义拦截器创建HTTP客户端,注册到Presto客户端列表
  2. 构造DSN连接串:指定Presto服务地址、默认catalog、默认schema,关联自定义HTTP客户端
  3. 用标准Go database/sql接口操作Presto:示例中执行了SHOW CATALOGS查询并打印返回结果

您需根据实际情况替换修改参数:accesskeysecretKey、队列名称、计算组名称、Presto服务地址、catalog、schema。

最近更新时间:2026.04.17 11:25:58
这个页面对您有帮助吗?
有用
有用
无用
无用