您可以通过开源的 ClickHouse JDBC 驱动程序连接到 ByteHouse 云数仓版,连接后进行库表操作、数据写入查询等。本文为您介绍 ClickHouse JDBC 驱动连接 ByteHouse 云数仓版的主要操作流程和操作要点。
支持 Java 1.8.0_261 或更高版本 (需要 TLS v1.3 支持)。
应用程序代码直连 ByteHouse 时,推荐下载 ClickHouse JDBC v0.4.6,Java 环境建议使用 OpenJDK 11。
如果您通过其他框架(如 DataX)连接 ByteHouse,Driver 版本适配规则如下:
下载 ClickHouse JDBC 基础驱动:
驱动 | 已验证版本/注意事项 |
|---|---|
ClickHouse JDBC 基础驱动 | 0.4.6,驱动下载链接 |
Java | 已验证版本:OpenJDK 11 |
通过 Maven 方式使用 ClickHouse JDBC 驱动的示例如下:
<!-- https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc --> <dependency> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.4.6</version> </dependency>
通过 Gradle 方式使用 ClickHouse JDBC 驱动的示例如下:
// https://mvnrepository.com/artifact/com.clickhouse/clickhouse-jdbc implementation 'com.clickhouse:clickhouse-jdbc:0.4.6'
ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse JDBC Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
更多 IAM 用户和数据库用户的介绍请参见以下文档:
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:
参数 | 配置说明 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制域名信息,详情请参见步骤二:配置网络信息。 |
port | 固定为 8123。 |
{DATABASE} | ByteHouse 数据库名称。您可登录 ByteHouse 控制台,在数据库页面查看并复制数据库。 |
user & password | 登录 ByteHouse 数据库的用户名和密码。
|
virtual_warehoue_id | 可选配置,配置为计算组名。如果您需要指定计算组进行查询和写入,可配置该参数。 |
ssl | 开启安全协议 SSL。 注意 ByteHouse 需要加密认证,从而保护数据安全,因此您需打开 SSL。 |
参数 | 配置说明 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的租户管理 > 基本信息 > 网络信息中查看并复制域名信息,详情请参见步骤二:配置网络信息。 |
port | 固定为 8123。 |
{DATABASE} | ByteHouse 数据库名称。您可登录 ByteHouse控制台,在数据库页面查看并复制数据库。 |
user & password | 登录 ByteHouse 数据库的用户名和密码。
|
virtual_warehoue_id | 可选配置,配置为计算组名。如果您需要指定计算组进行查询和写入,可配置该参数。 |
ssl | 开启安全协议 SSL。 注意 ByteHouse 需要加密认证,从而保护数据安全,因此您需打开 SSL。 |
您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。
keepAlive,如需使用该功能,需新建 KeepAliveClickhouseSocketFactory 类,可参考配置 KeepAlive 章节配置,该功能需使用 JDK 11。可参考下面代码连接至 ByteHouse,使用时注意替换连接语句中的 {Host}、{Password}、{User}、{Database}、{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息。
import com.clickhouse.jdbc.ClickHouseDriver; import java.sql.*; import java.util.Properties; import java.util.UUID; public class CKJDBC { public static void main(String[] args) { String host = "{Host}"; int port = 8123; String password = "{API_KEY}"; String user = "{User}"; String database = "{Database}"; String virtual_warehoue_id = "{VIRTUAL_WAREHOUSE_ID}"; // 通过 properties 设置一些链接的基本参数 String url = String.format("jdbc:clickhouse://%s:%d", host, port); Properties properties = new Properties(); // 可以按需设置不同的计算组 properties.setProperty("custom_http_params", String.format("virtual_warehouse=%s",virtual_warehoue_id)); properties.setProperty("ssl", "true"); properties.setProperty("compress", "0"); properties.setProperty("user", user); properties.setProperty("password", password); properties.setProperty("database", database); // 按照 Java JDBC 标准接口创建 connection // try (Connection conn = DriverManager.getConnection(url, properties)) { // 按照 Clickhouse Driver 自身接口创建 connection Connection conn = null; Statement initStatement = null; try { conn = new ClickHouseDriver().connect(url, properties); dropDatabaseWithQueryId(conn); dropDatabase(conn); createDatabase(conn); createTable(conn); insertTable(conn); insertBatch(conn); selectTable(conn); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (initStatement != null) initStatement.close(); if (conn != null) conn.close(); } catch (SQLException e) { e.printStackTrace(); } } }
import com.clickhouse.jdbc.ClickHouseDriver; import java.sql.*; import java.util.Properties; import java.util.UUID; public class CKJDBC { public static void main(String[] args) { String host = "{Host}"; int port = 8123; String password = "{API_KEY}"; String user = "{User}"; String database = "{Database}"; String virtual_warehoue_id = "{VIRTUAL_WAREHOUSE_ID}"; String url = String.format("jdbc:clickhouse://%s:%d/?ssl=true&compress=0&user=%s&password=%s&database=%s&custom_http_params=virtual_warehouse=%s", host, port, user, password, database, virtual_warehoue_id); // 按照 Java JDBC 标准接口创建 connection // try (Connection conn = DriverManager.getConnection(url, properties)) { // 按照 Clickhouse Driver 自身接口创建 connection Connection conn = null; Statement initStatement = null; try { conn = new ClickHouseDriver().connect(url, properties); dropDatabaseWithQueryId(conn); dropDatabase(conn); createDatabase(conn); createTable(conn); insertTable(conn); insertBatch(conn); selectTable(conn); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (initStatement != null) initStatement.close(); if (conn != null) conn.close(); } catch (SQLException e) { e.printStackTrace(); } } }
如需使用 keepAlive 功能,请使用 JDK 11,执行以下代码配置并启用。
新建 KeepAliveClickhouseSocketFactory 类。
package com.bytehouse.factory; import com.clickhouse.client.AbstractSocketClient; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseSocketFactory; import com.clickhouse.client.ClickHouseSslContextProvider; import com.clickhouse.client.config.ClickHouseSslMode; import com.clickhouse.data.ClickHouseUtils; import jdk.net.ExtendedSocketOptions; import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; import org.apache.hc.client5.http.ssl.DefaultHostnameVerifier; import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.ssl.SSLContexts; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; import java.io.IOException; import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; public class KeepAliveClickhouseSocketFactory implements ClickHouseSocketFactory { private static final Logger LOG = Logger.getLogger(KeepAliveClickhouseSocketFactory.class.getName()); static final KeepAliveClickhouseSocketFactory instance = new KeepAliveClickhouseSocketFactory(); // TCP Keep-Alive 参数(单位:秒) private static final int KEEPALIVE_IDLE = 60; private static final int KEEPALIVE_INTERVAL = 30; private static final int KEEPALIVE_COUNT = 60; @Override public <T> T create(ClickHouseConfig config, Class<T> clazz) throws IOException, UnsupportedOperationException { if (config == null || clazz == null) { throw new IllegalArgumentException("Non-null configuration and class are required"); } else if (SSLConnectionSocketFactory.class.equals(clazz)) { return clazz.cast(new SSLSocketFactory(config)); } else if (PlainConnectionSocketFactory.class.equals(clazz)) { return clazz.cast(new SocketFactory(config)); } throw new UnsupportedOperationException(ClickHouseUtils.format("Class %s is not supported", clazz)); } private static void configureKeepAlive(Socket sock) { try { // 启用 Keep-Alive sock.setKeepAlive(true); // 设置 TCP_KEEPIDLE (例如 60 秒) sock.setOption(ExtendedSocketOptions.TCP_KEEPIDLE, KEEPALIVE_IDLE); // 设置 TCP_KEEPINTVL (例如 30 秒) sock.setOption(ExtendedSocketOptions.TCP_KEEPINTERVAL, KEEPALIVE_INTERVAL); // 设置 TCP_KEEPCNT (例如 5 probes) sock.setOption(ExtendedSocketOptions.TCP_KEEPCOUNT, KEEPALIVE_COUNT); LOG.info("TCP keep-alive options set successfully."); } catch (Exception e) { LOG.log(Level.WARNING, "can not configure Keep-Alive", e); } } @Override public boolean supports(Class<?> clazz) { return PlainConnectionSocketFactory.class.equals(clazz) || SSLConnectionSocketFactory.class.equals(clazz); } public KeepAliveClickhouseSocketFactory() { } static class SSLSocketFactory extends SSLConnectionSocketFactory { private final ClickHouseConfig config; private SSLSocketFactory(ClickHouseConfig config) throws SSLException { super(ClickHouseSslContextProvider.getProvider().getSslContext(SSLContext.class, config) .orElse(SSLContexts.createDefault()), config.getSslMode() == ClickHouseSslMode.STRICT ? new DefaultHostnameVerifier() : (hostname, session) -> true); // NOSONAR this.config = config; } @Override public Socket createSocket(HttpContext context) throws IOException { Socket sock = AbstractSocketClient.setSocketOptions(config, new Socket()); configureKeepAlive(sock); return sock; } public static SSLSocketFactory create(ClickHouseConfig config) throws SSLException { return new SSLSocketFactory(config); } } static class SocketFactory extends PlainConnectionSocketFactory { private final ClickHouseConfig config; private SocketFactory(ClickHouseConfig config) { this.config = config; } @Override public Socket createSocket(final HttpContext context) throws IOException { Socket sock = AbstractSocketClient.setSocketOptions(config, new Socket()); configureKeepAlive(sock); return sock; } public static SocketFactory create(ClickHouseConfig config) { return new SocketFactory(config); } } }
在连接 URL 中增加自定义参数 Factory 和 Enable socket_keepalive,设置为 APACHE_HTTP_CLIENT 的 http_connection_provider。
jdbc:clickhouse://<host>:8123/<database>?custom_socket_factory=com.bytehouse.factory.KeepAliveClickhouseSocketFactory&socket_keepalive=true&http_connection_provider=APACHE_HTTP_CLIENT
与 ByteHouse 建立连接后,您可以使用以下代码创建数据库、表。
public static void dropDatabase(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String dropDbQuery = "Drop DATABASE IF EXISTS bhjdbctest"; stmt.execute(dropDbQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void createDatabase(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String createDbQuery = "CREATE DATABASE IF NOT EXISTS bhjdbctest"; stmt.execute(createDbQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void createTable(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String createTableQuery = "CREATE TABLE IF NOT EXISTS bhjdbctest.orders\n (" + "OrderID String, OrderName String, OrderPriority Int8)" + " engine = CnchMergeTree() partition by OrderID order by OrderID"; stmt.execute(createTableQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } }
您可以使用以下代码写入数据。
public static void insertTable(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String insertQuery = "INSERT INTO bhjdbctest.orders VALUES ('54895','Apple',12)"; stmt.executeUpdate(insertQuery); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } }
使用 PreparedStatement 批量数据写入。
public static void insertBatch(Connection connection) { PreparedStatement pstmt = null; String insertQuery = "INSERT INTO bhjdbctest.orders (OrderID, OrderName, OrderPriority) VALUES (?,'Apple',?)"; try { pstmt = connection.prepareStatement(insertQuery); int insertBatchSize = 10; for (int i = 0; i < insertBatchSize; i++) { pstmt.setString(1, "ID" + i); pstmt.setInt(2, i); pstmt.addBatch(); } pstmt.executeBatch(); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (pstmt != null) pstmt.close(); } catch (SQLException e) { e.printStackTrace(); } } }
您可以使用以下代码查询数据。
public static void selectTable(Connection connection) { Statement stmt = null; try { stmt = connection.createStatement(); String selectTableQuery = "SELECT * FROM bhjdbctest.orders"; ResultSet rs = stmt.executeQuery(selectTableQuery); ResultSetMetaData rsmd = rs.getMetaData(); int columnsNumber = rsmd.getColumnCount(); while (rs.next()) { for (int i = 1; i <= columnsNumber; i++) { if (i > 1) System.out.print(", "); String columnValue = rs.getString(i); System.out.print(columnValue); } System.out.println(); } } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
设置 query ID 有助于追踪查询执行情况,方便后续调试或管理查询任务。
如果您需要为某条查询语句设置 query ID,可参考以下代码,添加 stmt.unwrap(ClickHouseRequest.class).query(dropDbQuery,String.format("customized_%s", UUID.randomUUID())).execute(); 代码。
public static void dropDatabaseWithQueryId(Connection connection) { ClickHouseStatement stmt = null; try { stmt = (ClickHouseStatement)connection.createStatement(); String dropDbQuery = "Drop DATABASE IF EXISTS bhjdbctest"; stmt.unwrap(ClickHouseRequest.class).query(dropDbQuery,String.format("customized_%s", UUID.randomUUID())).execute(); } catch (SQLException ex) { ex.printStackTrace(); } finally { try { if (stmt != null) stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } }
如果您在使用过程中遇到 Unsupported or unrecognized SSL message 报错,这是 Java 版本过低导致的,Java 1.8.0_261 **** 以下版本会出现此报错。
您可通过以下方式解决:
如果您在使用过程中遇到 DB::Exception: No local available worker group for vw-xxxx 报错,这是计算组尚未启动导致的。
您可通过以下方式解决:
登录 ByteHouse 云数仓版控制台,在计算组页面,查看您使用的计算组是否在 正在运行的状态。
如果您在使用过程中遇到 Exception: unknown error: Code: UNAUTHENTICATED 报错,这可能是 user与password 配置错误导致的。
您可通过以下方式解决:
再次确认您配置的user与password是否正确。
xxx.yyy doesn't exist如果您在使用过程中遇到 DB::Exception: Database xxx.yyy doesn't exist 报错,这可能是数据库名称配置错误导致的。
您可通过以下方式解决:
请确认连接信息中配置的数据库名称是否正确。
如果您在使用 DBeaver ClickHouse driver 连接 ByteHouse 过程中遇到以下报错:
We apologize, but this driver only works with ClickHouse servers 20.7 and above. Please consider to upgrade your server to a more recent version.
您可按照在连接串或连接参数中设置 server_time_zone 和 server_version 参数值解决该问题。
server_time_zone:设置为 Asia/Shanghai。如果您处于其他时区,可根据实际情况设置为对应的时区。server_version:设置为 21.8.7.1。