You need to enable JavaScript to run this app.
ByteHouse云数仓版

ByteHouse云数仓版

复制全文
Python
PyMySQL
复制全文
PyMySQL

本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。

环境要求

需要 Python 3.12 或更高版本的支持。

推荐版本

推荐使用 PyMySQL Driver 最新版本 v1.1.2

使用限制
  • 暂不支持设置 query ID。
  • 暂不支持在连接层面设置计算组。
  • 不支持 ByteHouse 的 JSONB 和 Bitmap64 的数据类型。
  • 当前 ByteHouse MySQL 协议不支持 ComPrepare 协议。
  • 如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。

安装驱动

从 PyPI 安装

可以通过如下命令,获取最新版本的 PyMySQL。

python3 -m pip install PyMySQL

获取 ByteHouse 连接信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 PyMySQL。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

使用 IAM 用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:

参数

使用 IAM 用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为固定值 3306。

user & password

  • user:固定配置为 bytehouse
  • password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理 > 连接信息中获取API Key。详情请参见获取 API Key

database

配置为连接 ByteHouse 的数据库名称。

使用数据库用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户的方式连接到 ByteHouse。
通用参数说明如下:

参数

使用数据库用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为固定值 3306。

user & password

  • user 配置为 {accountID_or_accountName}::{username}[::{envID}],详情请参见步骤三:获取 ByteHouse 连接串信息
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库的用户名,可在 ByteHouse 控制台 > 权限管理 > 用户 > 查看数据库用户名
    • {envID}:可选配置,数据库所在的环境名称。如果使用 default 环境,可不配置;如需使用其他环境,需指定环境名称,配置时无需添加[]。您可登录 ByteHouse 控制台,在租户管理 > 基本信息 > 当前环境中获取。
      使用示例如下:
      • 配置环境 ID:21xxxxxxxx::demouser::demoenv
      • 不配置环境 ID:21xxxxxxxx::demouser
  • password:可联系管理员获取数据库账号的密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

database

配置为连接 ByteHouse 的数据库名称。

基本用法

本章节介绍通过 PyMySQL 程序连接 ByteHouse 的基本用法,您可以在程序 Github 主页 获取最新的文档和发布版本信息。使用时注意替换连接语句中的 {Host}{Password}{User}{Database}{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息

  • 超时时间配置:connect_timeout 默认为 10s、read_timeout 默认为 none、write_timeout 默认为 none。
  • 默认不支持 keepAlive,如需使用该功能,需配置相关参数,可参考配置 KeepAlive 章节配置。

连接 ByteHouse

可参考下面样例设置 ByteHouse 连接信息。

host = "{Host}"
port = 3306
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

db_config = {
    'host': host,  
    'port': port,
    'user': user,
    'password': password,  
    'database': database,  
    'charset': 'utf8mb4',
}

设置特定计算组

创建每一个连接后,设置计算组配置。

connection.cursor().execute(f"set virtual_warehouse = '{virtual_warehouse_id}'")

配置 KeepAlive

如在连接级别(connection)需使用 keepAlive 功能,执行以下配置并启用。

def get_conn():
    conn = pymysql.connect(
        host=bytehouse_host,
        user=bytehouse_username,
        port=3306,
        password=bytehouse_password,
        database=bytehouse_db,
        connect_timeout=10
    )
    return conn
sock = mysql_conn._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if sys.platform == "darwin":
    # macOS uses TCP_KEEPALIVE for idle time
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, 60)
else:
    # Linux and others
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 60)

连接与查询完整示例

import uuid
from datetime import datetime

import pymysql

host = "{Host}"
port = 3306
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

db_config = {
    'host': host,  # 填写 HOST 地址
    'port': port,
    'user': user,
    'password': password,  # API key 密钥
    'database': database,  # 数据库名称
    'charset': 'utf8mb4',
}
try:
    connection = pymysql.connect(**db_config)
    connection.cursor().execute(f"set virtual_warehouse = '{virtual_warehouse_id}'")
    with connection:
        with connection.cursor() as client:
            client.execute("DROP DATABASE IF EXISTS bhpythontest")
            client.execute('DROP DATABASE IF EXISTS bhpythontest')
            client.execute('CREATE DATABASE IF NOT EXISTS bhpythontest')
            client.execute(
            """
            CREATE TABLE IF NOT EXISTS bhpythontest.example (
                    Col1 UInt8
                , Col2 String
                , Col3 FixedString(3)
                , Col4 UUID
                , Col5 Map(String, UInt8)
                , Col6 Array(String)
                , Col7 Tuple(String, UInt8, Array(Map(String, String))) KV
                , Col8 DateTime
            ) Engine = CnchMergeTree() ORDER BY tuple()
            """)

            # 准备插入数据
            values_list = []
            for i in range(1):
                value = (
                    42,  # uint8(42)
                    'ClickHouse',
                    'Inc',
                    str(uuid.uuid4()),
                    "{'key': 1}",  # Map(String, UInt8)
                    "['Q', 'W', 'E', 'R', 'T', 'Y']",  # Array(String)
                    "('String Value', 5, [{'key': 'value'},{'key': 'value'},{'key': 'value'}])",  # Tuple
                    datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                )
                values_list.append(value)

            # 批量插入
            insert_sql = "INSERT INTO bhpythontest.example (Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8) VALUES(%s, %s, %s, %s, %s, %s, %s, %s)"
            client.executemany(insert_sql, values_list)
            print("数据插入成功,插入了1000条记录")

            # 查询一条记录
            select_sql = "SELECT * FROM bhpythontest.example LIMIT 1"
            result = client.execute(select_sql)
            result = client.fetchone()

            if result:
                print("查询结果:")
                print(f"Col1: {result[0]}")
                print(f"Col2: {result[1]}")
                print(f"Col3: {result[2]}")
                print(f"Col4: {result[3]}")
                print(f"Col5: {result[4]}")
                print(f"Col6: {result[5]}")
                print(f"Col7: {result[6]}")
                print(f"Col8: {result[7]}")

            # 删除表
            drop_sql = "DROP TABLE IF EXISTS bhpythontest.example"
            client.execute(drop_sql)
            print("表已删除")
            client.close()
except Exception as e:
    print(f"操作失败: {e}")

最近更新时间:2025.10.22 19:18:53
这个页面对您有帮助吗?
有用
有用
无用
无用