You need to enable JavaScript to run this app.
导航

Python 程序通过 Thrift2 地址访问 HBase 实例

最近更新时间2024.01.02 16:20:33

首次发布时间2022.12.27 16:12:46

表格数据库 HBase 版默认提供了 ZK 连接地址,同时也支持 Thrift 多语言访问,Thrift 是 HBase 标准版实例中的一种服务组件,基于 Apache Thrift(多语言支持的通信框架)开发。本文介绍基于 Python 程序通过 Thrift2 地址访问 HBase 实例的操作步骤。

前提条件

  • 如需通过私网地址访问 HBase 实例,需同时满足如下要求:
    • 已购 ECS 服务器与 HBase 实例在相同私有网络 VPC 下。ECS 服务器的购买方法,请参见购买云服务器
    • 已将 ECS 服务器的 IP 地址添加至 HBase 中的白名单中。白名单设置方法,请参见编辑白名单
  • 如需通过公网地址访问 HBase 实例,需确保运行 Python 工具的设备 IP 地址已加入 HBase 实例的白名单中。白名单设置方法,请参见编辑白名单
  • 已在 ECS 实例或本地设备上安装 Python 程序,建议使用 Python 3.x.x 版本。您可以通过 python version 命令检查当前 Python 的版本。
  • 已在 ECS 实例或本地设备上安装 Thrift 服务,建议使用 0.14.2 或以上版本的 Thrift 服务。关于 Thrift 服务的更多详情,请参见 Apache Thrift

操作步骤

  1. 获取 HBase 实例的 Thrift2 连接地址。
    连接地址查看方法,请参见查看连接地址

    说明

    表格数据库 HBase 版默认未开通 Thrift2 地址,您需要先申请 Thrift2 连接地址,申请方法,请参见申请 Thrift2 连接地址

  2. 在 Python 程序中下载并安装 HBase Thrift 模块。
    1. 下载已编译好的 Thrift 文件(文件中包含了可供 Python 访问的 HBase Thrift 模块),并将其保存在已安装了 Thrift 服务的 ECS 实例或本地设备上。

      # 解压下载的 Thrift 文件
      tar -zxvf gen-py.tar.gz
      # 进入 gen-py 目录
      cd gen-py
      
    2. 配置 Python 环境变量,来引用 HBase Thrift 模块。将上述 gen-py 目录下的 hbase 中的文件移动到 Python 的安装目录中。
      命令如下。

      mv hbase/ /<ECS 实例或本地设备上 Python 的安装目录>/Python/3.8/lib/python/site-packages/
      
    3. 安装 HBase Thrift 模块。
      命令如下。

      pip install thrift
      
  3. 在 Python 程序中使用 Thrift2 地址访问 HBase 实例。
    代码如下。
    # encoding:utf-8
    # !/usr/bin/env python3
    
    import random
    
    # 通过 TTransport、TSocket 和 TBinaryProtocol 开启一个 Thrift2 连接。
    from thrift.transport import TTransport
    from thrift.transport import TSocket
    from thrift.protocol import TBinaryProtocol
    
    # 来自thrift --gen py hbase.thrift
    from hbase import THBaseService
    from hbase.ttypes import TPut, TColumnValue, TGet, TNamespaceDescriptor, TTableDescriptor, TColumnFamilyDescriptor, \
        TTableName, TScan
    
    
    class Demo:
        def __init__(self, host, port):
            socket = TSocket.TSocket(host=host, port=port)
            self.transport = TTransport.TFramedTransport(socket)
            protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
    
            self.client = THBaseService.Client(protocol)
            self.transport.open()
    
        def __del__(self):
            self.transport.close()
    
        def get_ns_list(self):
            return [ns.name for ns in self.client.listNamespaceDescriptors()]
    
        def namespace_exists(self, namespace):
            return namespace in self.get_ns_list()
    
        def create_ns(self, namespace):
            if not self.namespace_exists(namespace):
                self.client.createNamespace(TNamespaceDescriptor(name=namespace))
    
        def create_table(self, table_name, column_family, namespace="ns"):
            if not self.namespace_exists(namespace):
                self.client.createNamespace(TNamespaceDescriptor(name=namespace))
            if not self.table_exists(namespace, table_name):
                _table_name = TTableName(
                    ns=namespace.encode("utf8"), qualifier=table_name.encode("utf8")
                )
                self.client.createTable(
                    TTableDescriptor(
                        tableName=_table_name,
                        columns=[
                            TColumnFamilyDescriptor(name=column_family.encode("utf8"))
                        ],
                    ),
                    None,
                )
    
        def table_exists(self, namespace, table_name):
            namespace = namespace.encode("utf8")
            table_name = table_name.encode("utf8")
            _table_name = TTableName(ns=namespace, qualifier=table_name)
            return self.client.tableExists(_table_name)
    
        def put(self, namespace, table_name, row_key, family, qualifier, value):
            row_key, family = row_key.encode("utf8"), family.encode("utf8")
            qualifier, value = qualifier.encode("utf8"), value.encode("utf8")
            table_in_bytes = self._table_in_bytes(namespace, table_name)
            put = TPut(
                row=row_key,
                columnValues=[
                    TColumnValue(family=family, qualifier=qualifier, value=value)
                ],
            )
            self.client.put(table_in_bytes, put)
    
        def get(self, namespace, table_name, row_key):
            row_key = row_key.encode("utf8")
            table_in_bytes = self._table_in_bytes(namespace, table_name)
            get = TGet(row=row_key)
            result = self.client.get(table_in_bytes, get)
            return result
    
        def scan(self, namespace, table_name, start_row, stop_row):
            start_row, stop_row = start_row.encode("utf8"), stop_row.encode("utf8")
            table_in_bytes = self._table_in_bytes(namespace, table_name)
            scan = TScan(startRow=start_row, stopRow=stop_row)
            caching = 2
            results = []
            while True:
                last_result = None
                current_results = self.client.getScannerResults(
                    table_in_bytes, scan, caching
                )
                for result in current_results:
                    results.append(result)
                    last_result = result
                if last_result is None:
                    break
                else:
                    next_start_row = self._create_closest_row_after(last_result.row)
                    scan = TScan(startRow=next_start_row, stopRow=stop_row)
            return results
    
        def batch_put(self, namespace, table_name):
            """
            puts = [TPut(row="row2".encode("utf8"), columnValues=[TColumnValue(family="f".encode("utf8"),
            qualifier="q1".encode("utf8"), value="value2".encode("utf8"))]), ...]
            """
            puts = [TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"),
                                                                               qualifier="q1".encode("utf8"),
                                                                               value="value1".encode("utf8"))]),
                    TPut(row="row2".encode("utf8"), columnValues=[TColumnValue(family="cf1".encode("utf8"),
                                                                               qualifier="q2".encode("utf8"),
                                                                               value="value2".encode("utf8"))])]
            table_in_bytes = self._table_in_bytes(namespace, table_name)
            self.client.putMultiple(table=table_in_bytes, tputs=puts)
    
        def batch_get(self, namespace, table_name):
            """
            gets = [TGet(row="row2".encode("utf8")), ...]
            """
            gets = [TGet(row="row1".encode("utf8")), TGet(row="row2".encode("utf8"))]
            table_in_bytes = self._table_in_bytes(namespace, table_name)
            self.client.getMultiple(table_in_bytes, gets)
    
        def get_table_descriptor(self, ns, table):
            """
            res struct:
            {
                "cf1": {"attr1": value1, "attr2": value2, ....},
                "cf2": {"attr1": value1, "attr2": value2, ....},
                .....
            }
            """
            table = table.encode("utf8")
            ns = ns.encode("utf8")
            table = TTableName(ns=ns, qualifier=table)
            info = self.client.getTableDescriptor(table)
            res = {}
            if info and info.columns:
                for item in info.columns:
                    attributes = item.attributes
                    new_attr = {}
                    for key, value in attributes.items():
                        new_attr[self._bytes_to_str(key)] = self._bytes_to_str(value)
                    res[self._bytes_to_str(item.name)] = new_attr
    
            table_attr = {}
            for key, value in info.attributes.items():
                table_attr[self._bytes_to_str(key)] = self._bytes_to_str(value)
    
            return table_attr, res
    
        def delete(self, namespace, table):
            namespace, table = namespace.encode("utf8"), table.encode("utf8")
            # disable
            self.client.disableTable(TTableName(ns=namespace, qualifier=table))
            # drop
            self.client.deleteTable(TTableName(ns=namespace, qualifier=table))
    
        @staticmethod
        def _bytes_to_str(byte_str):
            return bytes.decode(byte_str)
    
        @staticmethod
        def _table_in_bytes(namespace, table_name):
            return "{namespace}:{table_name}".format(
                namespace=namespace, table_name=table_name
            ).encode("utf8")
    
        @staticmethod
        def _create_closest_row_after(row):
            array = bytearray(row)
            array.append(0x00)
            return bytes(array)
    
    
    def unit_test():
        client = Demo(host="xxx", port=9090)
        name_space = "ns6"
        table = "table6"
        column = "cf1"
        client.create_table(table, column, name_space)
        print(client.table_exists(name_space, table))
        for i in range(10):
            row_key = "row" + str(i)
            client.put(name_space, table, row_key, column, "name", "handsome_boy" + str(i))
        # client.batch_put(name_space, table)
        # scan
        start_row = "row0"
        stop_row = "row8"
        result = client.scan(name_space, table, start_row, stop_row)
        print(result)
        result = client.get(name_space, table, "row4")
        # client.batch_get(name_space, table)
        # result = client.get_table_descriptor(name_space, table)
        # client.delete(name_space, table)
        # result = client.table_exists(name_space, table)
        print(result)
    
    
    if __name__ == "__main__":
        unit_test()