You need to enable JavaScript to run this app.
导航
PyMySQL
最近更新时间:2025.10.22 19:18:53首次发布时间:2024.08.01 20:24:07
复制全文
我的收藏
有用
有用
无用
无用

本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。

环境要求

需要 Python 3.12 或更高版本的支持。

推荐版本

推荐使用 PyMySQL Driver 最新版本 v1.1.2

使用限制
  • 暂不支持设置 query ID。
  • 暂不支持在连接层面设置计算组。
  • 不支持 ByteHouse 的 JSONB 和 Bitmap64 的数据类型。
  • 当前 ByteHouse MySQL 协议不支持 ComPrepare 协议。
  • 如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。

安装驱动

从 PyPI 安装

可以通过如下命令,获取最新版本的 PyMySQL。

python3 -m pip install PyMySQL

获取 ByteHouse 连接信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 PyMySQL。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

使用 IAM 用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:

参数

使用 IAM 用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为固定值 3306。

user & password

  • user:固定配置为 bytehouse
  • password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理 > 连接信息中获取API Key。详情请参见获取 API Key

database

配置为连接 ByteHouse 的数据库名称。

使用数据库用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:

参数

使用数据库用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为固定值 3306。

user & password

  • user 配置为 {accountID_or_accountName}::{username}[::{envID}],详情请参见步骤三:获取 ByteHouse 连接串信息
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库的用户名,可在 ByteHouse 控制台 > 权限管理 > 用户 > 查看数据库用户名
    • {envID}:可选配置,数据库所在的环境名称。如果使用 default 环境,可不配置;如需使用其他环境,需指定环境名称,配置时无需添加[]。您可登录 ByteHouse 控制台,在租户管理 > 基本信息 > 当前环境中获取。
      使用示例如下:
      • 配置环境 ID:21xxxxxxxx::demouser::demoenv
      • 不配置环境 ID:21xxxxxxxx::demouser
  • password:可联系管理员获取数据库账号的密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

database

配置为连接 ByteHouse 的数据库名称。

基本用法

本章节介绍通过 PyMySQL 程序连接 ByteHouse 的基本用法,您可以在程序 Github 主页 获取最新的文档和发布版本信息。使用时注意替换连接语句中的 {Host}{Password}{User}{Database}{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息

  • 超时时间配置:connect_timeout 默认为 10s、read_timeout 默认为 none、write_timeout 默认为 none。
  • 默认不支持 keepAlive,如需使用该功能,需配置相关参数,可参考配置 KeepAlive 章节配置。

连接 ByteHouse

可参考下面样例设置 ByteHouse 连接信息。

host = "{Host}"
port = 3306
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

db_config = {
    'host': host,  
    'port': port,
    'user': user,
    'password': password,  
    'database': database,  
    'charset': 'utf8mb4',
}

设置特定计算组

创建每一个连接后,设置计算组配置。

connection.cursor().execute(f"set virtual_warehouse = '{virtual_warehouse_id}'")

配置 KeepAlive

如在连接级别(connection)需使用 keepAlive 功能,执行以下配置并启用。

def get_conn():
    conn = pymysql.connect(
        host=bytehouse_host,
        user=bytehouse_username,
        port=3306,
        password=bytehouse_password,
        database=bytehouse_db,
        connect_timeout=10
    )
    return conn
sock = mysql_conn._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if sys.platform == "darwin":
    # macOS uses TCP_KEEPALIVE for idle time
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 60)
else:
    # Linux and others
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 60)

连接与查询完整示例

import uuid
from datetime import datetime

import pymysql

host = "{Host}"
port = 3306
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

db_config = {
    'host': host,  # 填写 HOST 地址
    'port': port,
    'user': user,
    'password': password,  # API key 密钥
    'database': database,  # 数据库名称
    'charset': 'utf8mb4',
}
try:
    connection = pymysql.connect(**db_config)
    connection.cursor().execute(f"set virtual_warehouse = '{virtual_warehouse_id}'")
    with connection:
        with connection.cursor() as client:
            client.execute("DROP DATABASE IF EXISTS bhpythontest")
            client.execute('DROP DATABASE IF EXISTS bhpythontest')
            client.execute('CREATE DATABASE IF NOT EXISTS bhpythontest')
            client.execute(
            """
            CREATE TABLE IF NOT EXISTS bhpythontest.example (
                    Col1 UInt8
                , Col2 String
                , Col3 FixedString(3)
                , Col4 UUID
                , Col5 Map(String, UInt8)
                , Col6 Array(String)
                , Col7 Tuple(String, UInt8, Array(Map(String, String))) KV
                , Col8 DateTime
            ) Engine = CnchMergeTree() ORDER BY tuple()
            """)

            # 准备插入数据
            values_list = []
            for i in range(1):
                value = (
                    42,  # uint8(42)
                    'ClickHouse',
                    'Inc',
                    str(uuid.uuid4()),
                    "{'key': 1}",  # Map(String, UInt8)
                    "['Q', 'W', 'E', 'R', 'T', 'Y']",  # Array(String)
                    "('String Value', 5, [{'key': 'value'},{'key': 'value'},{'key': 'value'}])",  # Tuple
                    datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                )
                values_list.append(value)

            # 批量插入
            insert_sql = "INSERT INTO bhpythontest.example (Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)"
            client.executemany(insert_sql, values_list)
            print("数据插入成功,插入了1000条记录")

            # 查询一条记录
            select_sql = "SELECT * FROM bhpythontest.example LIMIT 1"
            result = client.execute(select_sql)
            result = client.fetchone()

            if result:
                print("查询结果:")
                print(f"Col1: {result[0]}")
                print(f"Col2: {result[1]}")
                print(f"Col3: {result[2]}")
                print(f"Col4: {result[3]}")
                print(f"Col5: {result[4]}")
                print(f"Col6: {result[5]}")
                print(f"Col7: {result[6]}")
                print(f"Col8: {result[7]}")

            # 删除表
            drop_sql = "DROP TABLE IF EXISTS bhpythontest.example"
            client.execute(drop_sql)
            print("表已删除")
            client.close()
except Exception as e:
    print(f"操作失败: {e}")