您可以通过开源的 MySQL JDBC 驱动程序连接到 ByteHouse 云数仓版。
JDK 版本:Java v1.8.0_261 或更高版本 (需要 TLS v1.3 支持)。
MySQL JDBC Driver 版本: 推荐使用 v8.3.0 及以上版本。
请单击前往 MySQL Connector 页面,下载 MySQL JDBC Driver v8.3.0。
通过 Maven 方式使用 MySQL JDBC 驱动的示例如下:
<dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.3.0</version> </dependency>
ByteHouse 支持通过 IAM 用户或数据库用户连接 MySQL JDBC Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
更多 IAM 用户和数据库用户的介绍请参见以下文档:
参数 | 配置说明 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看对应信息。详情请参见步骤二:配置网络信息。 |
port | 配置为固定值 3306。 |
user & password | 使用 API Key 作为 user 和 password。获取 API Key 请参见获取 API Key。
|
database | 配置为连接 ByteHouse 数据库名称。您可登录 ByteHouse 控制台,在数据库页面查看并复制数据库。 |
virtual_warehouse_id | 可选配置,配置为计算组名。如果您需要指定计算组进行查询和写入,可配置该参数。 |
参数 | 配置说明 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制域名信息,详情请参见步骤二:配置网络信息。 |
port | 固定为 3306。 |
user & password |
|
database | 配置为连接 ByteHouse 数据库名称。您可登录 ByteHouse控制台,在数据库页面查看并复制数据库。 |
virtual_warehouse_id | 可选配置,配置为计算组名。如果您需要指定计算组进行查询和写入,可配置该参数。 |
您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。
keepAlive,如需使用该功能,需新建 KeepAliveMysqlSocketFactory 类,可参考配置 KeepAlive 章节配置,该功能需使用 JDK11。可参考下面代码连接至 ByteHouse,使用时注意替换连接语句中的 {Host}、{Password}、{User}、{Database}、{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息。
import java.sql.*; import java.util.Properties; import java.util.UUID; public class MysqlJDBC { public static void main(String[] args) { String host = "{Host}"; int port = 3306; String virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"; String password = "{Password}"; String user = "{User}"; String database = "{Database}"; // 通过 properties 设置一些链接的基本参数 String url = String.format("jdbc:mysql://%s:%d", host, port); Properties properties = new Properties(); properties.setProperty("useSSl", "true"); properties.setProperty("verifyServerCertificate", "false"); properties.setProperty("enabledTLSProtocols", "TLSv1.2"); properties.setProperty("user", user); properties.setProperty("password", password); properties.setProperty("database", database); // 建议添加以下参数,可提高性能 // 默认情况下,MySQL 的 prearestatement 在执行 insert batch 时,会将批量数据拆分为单条并通过 insert 语句逐行执行, // 添加以下参数后,驱动会将批量数据拼接成一条 SQL 语句并执行,提高查询效率 properties.setProperty("rewriteBatchedStatements", "true"); properties.setProperty("useServerPrepStmts", "false"); Connection conn = null; Statement initStatement = null; try { conn = DriverManager.getConnection(url, properties); initStatement = conn.createStatement(); initStatement.execute(String.format("SET virtual_warehouse='%s'", virtual_warehouse_id)); // 指定特定计算组 } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (initStatement != null) initStatement.close(); if (conn != null) conn.close(); } catch (SQLException e) { e.printStackTrace(); } } }
import java.sql.*; import java.util.Properties; import java.util.UUID; public class CKJDBC { public static void main(String[] args) { String host = "{Host}"; int port = 3306; String virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"; String password = "{Password}"; String user = "{User}"; String database = "{Database}"; String url = String.format("jdbc:mysql://%s:%d/?useSSl=true&verifyServerCertificate=false&enabledTLSProtocols=TLSv1.2&user=%s&password=%s&database=%s&rewriteBatchedStatements=true&useServerPrepStmts=false", // 建议使用 rewriteBatchedStatements=true&useServerPrepStmts=false 来提高批量插入效率 host, port, user, password, database); Connection conn = null; Statement initStatement = null; try { conn = DriverManager.getConnection(url, properties); initStatement = conn.createStatement(); initStatement.execute(String.format("SET virtual_warehouse='%s'", virtual_warehouse_id)); // 指定特定计算组 } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (initStatement != null) initStatement.close(); if (conn != null) conn.close(); } catch (SQLException e) { e.printStackTrace(); } } }
如需使用 keepAlive 功能,请使用 JDK 11,执行以下代码配置并启用。
继承 MySQL 的 StandardSocketFactory 类。
package com.bytehouse.factory; import com.mysql.cj.conf.PropertySet; import com.mysql.cj.protocol.StandardSocketFactory; import jdk.net.ExtendedSocketOptions; import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; public class KeepAliveMysqlSocketFactory extends StandardSocketFactory { private static final Logger LOG = Logger.getLogger(KeepAliveMysqlSocketFactory.class.getName()); // TCP Keep-Alive parameter(unit:second) private static final int KEEPALIVE_IDLE = 60; private static final int KEEPALIVE_INTERVAL = 30; private static final int KEEPALIVE_COUNT = 60; @Override public Socket createSocket(PropertySet props) { Socket sock = super.createSocket(props); configureKeepAlive(sock); return sock; } private void configureKeepAlive(Socket sock) { try { // 启用 Keep-Alive sock.setKeepAlive(true); // 设置 TCP_KEEPIDLE (例如,60 秒) sock.setOption(ExtendedSocketOptions.TCP_KEEPIDLE, KEEPALIVE_IDLE); // 设置 TCP_KEEPINTVL (例如, 30 秒) sock.setOption(ExtendedSocketOptions.TCP_KEEPINTERVAL, KEEPALIVE_INTERVAL); // 设置 TCP_KEEPCNT (例如, 5 probes) sock.setOption(ExtendedSocketOptions.TCP_KEEPCOUNT, KEEPALIVE_COUNT); LOG.info("TCP keep-alive options set successfully."); } catch (Exception e) { LOG.log(Level.WARNING, "can not configure Keep-Alive", e); } } }
在连接 URL 中增加自定义参数 Factory。
jdbc:mysql://<host>:3306/<database>?socketFactory=com.bytehouse.factory.KeepAliveMysqlSocketFactory
与 ByteHouse 建立连接后,您可以使用以下代码创建数据库、表。
public static void dropDatabase(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String dropDbQuery = "Drop DATABASE IF EXISTS bhjdbctest"; stmt.execute(dropDbQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void createDatabase(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String createDbQuery = "CREATE DATABASE IF NOT EXISTS bhjdbctest"; stmt.execute(createDbQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void createTable(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String createTableQuery = "CREATE TABLE IF NOT EXISTS bhjdbctest.orders\n (" + "OrderID String, OrderName String, OrderPriority Int8)" + " engine = CnchMergeTree() partition by OrderID order by OrderID"; stmt.execute(createTableQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } }
您可以使用以下代码写入数据。
public static void insertTable(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String insertQuery = "INSERT INTO bhjdbctest.orders VALUES ('54895','Apple',12)"; stmt.executeUpdate(insertQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } }
使用 PreparedStatement 批量数据写入。
public static void insertBatch(Connection connection) { PreparedStatement pstmt = null; String insertQuery = "INSERT INTO bhjdbctest.orders (OrderID, OrderName, OrderPriority) VALUES (?,'Apple',?)"; try { pstmt = connection.prepareStatement(insertQuery); int insertBatchSize = 10; for (int i = 0; i < insertBatchSize; i++) { pstmt.setString(1, "ID" + i); pstmt.setInt(2, i); pstmt.addBatch(); } pstmt.executeBatch(); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (pstmt != null) pstmt.close(); } catch (SQLException e) { e.printStackTrace(); } } }
您可以使用以下代码查询数据。
public static void selectTable(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String selectTableQuery = "SELECT * FROM bhjdbctest.orders"; ResultSet rs = stmt.executeQuery(selectTableQuery); ResultSetMetaData rsmd = rs.getMetaData(); int columnsNumber = rsmd.getColumnCount(); while (rs.next()) { for (int i = 1; i <= columnsNumber; i++) { if (i > 1) System.out.print(", "); String columnValue = rs.getString(i); System.out.print(columnValue); } System.out.println(); } } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
如果您在使用过程中遇到 Unsupported or unrecognized SSL message 报错,这是 Java 版本过低导致的,Java 1.8.0_261 **** 以下版本会出现此报错。
您可通过以下方式解决:
如果您在使用过程中遇到 DB::Exception: No local available worker group for vw-xxxx 报错,这是计算组尚未启动导致的。
您可通过以下方式解决:
登录 ByteHouse 云数仓版控制台,在计算组页面,查看您使用的计算组是否在 正在运行的状态。
如果您在使用过程中遇到 Exception: unknown error: Code: UNAUTHENTICATED 报错,这可能是 user与password 配置错误导致的。
您可通过以下方式解决:
再次确认您配置的user与password是否正确。
xxx.yyy doesn't exist如果您在使用过程中遇到 DB::Exception: Database xxx.yyy doesn't exist 报错,这可能是数据库名称配置错误导致的。
您可通过以下方式解决:
请确认连接信息中配置的数据库名称是否正确。
如果您在使用过程中遇到 Caused by: javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate) 报错,这可能是 TLS 版本不兼容导致的。
您可通过以下方式解决:
请在连接 URL 后面增加一个参数:enabledTLSProtocols=TLSv1.2,TLSv1.3。
举例如下:
String url = String.format("jdbc:mysql://%s:%d/%s?enabledTLSProtocols=TLSv1.2,TLSv1.3", host, port, database);