本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。
需要 Python 3.12 或更高版本的支持。
推荐使用 PyMySQL Driver 最新版本 v1.1.2。
可以通过如下命令,获取最新版本的 PyMySQL。
python3 -m pip install PyMySQL
ByteHouse 支持通过 IAM 用户或数据库用户连接 PyMySQL。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
更多 IAM 用户和数据库用户的介绍请参见以下文档:
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:
参数 | 使用 IAM 用户连接 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息。 |
port | 配置为固定值 3306。 |
user & password |
|
database | 配置为连接 ByteHouse 的数据库名称。 |
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:
参数 | 使用数据库用户连接 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息。 |
port | 配置为固定值 3306。 |
user & password |
|
database | 配置为连接 ByteHouse 的数据库名称。 |
本章节介绍通过 PyMySQL 程序连接 ByteHouse 的基本用法,您可以在程序 Github 主页 获取最新的文档和发布版本信息。使用时注意替换连接语句中的 {Host}、{Password}、{User}、{Database}、{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息。
keepAlive,如需使用该功能,需配置相关参数,可参考配置 KeepAlive 章节配置。可参考下面样例设置 ByteHouse 连接信息。
host = "{Host}" port = 3306 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" db_config = { 'host': host, 'port': port, 'user': user, 'password': password, 'database': database, 'charset': 'utf8mb4', }
创建每一个连接后,设置计算组配置。
connection.cursor().execute(f"set virtual_warehouse = '{virtual_warehouse_id}'")
如在连接级别(connection)需使用 keepAlive 功能,执行以下配置并启用。
def get_conn(): conn = pymysql.connect( host=bytehouse_host, user=bytehouse_username, port=3306, password=bytehouse_password, database=bytehouse_db, connect_timeout=10 ) return conn sock = mysql_conn._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) if sys.platform == "darwin": # macOS uses TCP_KEEPALIVE for idle time sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 60) else: # Linux and others sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 60)
import uuid from datetime import datetime import pymysql host = "{Host}" port = 3306 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" db_config = { 'host': host, # 填写 HOST 地址 'port': port, 'user': user, 'password': password, # API key 密钥 'database': database, # 数据库名称 'charset': 'utf8mb4', } try: connection = pymysql.connect(**db_config) connection.cursor().execute(f"set virtual_warehouse = '{virtual_warehouse_id}'") with connection: with connection.cursor() as client: client.execute("DROP DATABASE IF EXISTS bhpythontest") client.execute('DROP DATABASE IF EXISTS bhpythontest') client.execute('CREATE DATABASE IF NOT EXISTS bhpythontest') client.execute( """ CREATE TABLE IF NOT EXISTS bhpythontest.example ( Col1 UInt8 , Col2 String , Col3 FixedString(3) , Col4 UUID , Col5 Map(String, UInt8) , Col6 Array(String) , Col7 Tuple(String, UInt8, Array(Map(String, String))) KV , Col8 DateTime ) Engine = CnchMergeTree() ORDER BY tuple() """) # 准备插入数据 values_list = [] for i in range(1): value = ( 42, # uint8(42) 'ClickHouse', 'Inc', str(uuid.uuid4()), "{'key': 1}", # Map(String, UInt8) "['Q', 'W', 'E', 'R', 'T', 'Y']", # Array(String) "('String Value', 5, [{'key': 'value'},{'key': 'value'},{'key': 'value'}])", # Tuple datetime.now().strftime('%Y-%m-%d %H:%M:%S') ) values_list.append(value) # 批量插入 insert_sql = "INSERT INTO bhpythontest.example (Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)" client.executemany(insert_sql, values_list) print("数据插入成功,插入了1000条记录") # 查询一条记录 select_sql = "SELECT * FROM bhpythontest.example LIMIT 1" result = client.execute(select_sql) result = client.fetchone() if result: print("查询结果:") print(f"Col1: {result[0]}") print(f"Col2: {result[1]}") print(f"Col3: {result[2]}") print(f"Col4: {result[3]}") print(f"Col5: {result[4]}") print(f"Col6: {result[5]}") print(f"Col7: {result[6]}") print(f"Col8: {result[7]}") # 删除表 drop_sql = "DROP TABLE IF EXISTS bhpythontest.example" client.execute(drop_sql) print("表已删除") client.close() except Exception as e: print(f"操作失败: {e}")