You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

启用Kerberos的Cloudera集群:HBase与SAS集成读写问题

解决方案:Kerberized Cloudera HBase集群与SAS服务器的Python集成(带过滤条件读写)

我碰到过不少类似的场景——在Kerberized的Cloudera集群里用Python通过Thrift访问HBase时,高级过滤条件的支持确实是个痛点,Thrift 1.x和基于它的HappyBase都对SingleColumnValueFilter这类过滤器支持有限。下面给你几个经过验证的解决方案,按推荐程度排序:

方案1:切换到HBase REST API + Python Requests(最易实现)

HBase REST API对Kerberos认证支持完善,并且完整覆盖了所有HBase过滤规则,包括你需要的SingleColumnValueFilter。具体步骤如下:

  1. 在Cloudera集群启用HBase REST服务
    登录Cloudera Manager,找到HBase服务,进入配置页,启用HBase REST角色,并确保Kerberos认证配置正确(服务主体、密钥表等)。

  2. 在SAS服务器安装依赖库
    打开终端执行:

    pip install requests requests-kerberos
    
  3. 编写带过滤条件的Python代码
    示例代码实现SingleColumnValueFilter过滤查询:

    import requests
    from requests_kerberos import HTTPKerberosAuth, OPTIONAL
    
    # 替换为你的HBase REST服务地址和表名
    HBASE_REST_URL = "http://hbase-rest-node.example.com:8080/my_hbase_table/scanner"
    
    # 构造XML格式的过滤条件(SingleColumnValueFilter示例)
    filter_payload = """
    <Scanner batch="100">
      <filter>
        <SingleColumnValueFilter>
          <column>my_cf:my_column</column>
          <comparator>
            <BinaryComparator>
              <value>target_value</value>
            </BinaryComparator>
          </comparator>
          <compareOp>EQUAL</compareOp>
          <filterIfMissing>true</filterIfMissing>
        </SingleColumnValueFilter>
      </filter>
    </Scanner>
    """
    
    # 发送请求创建扫描器(Kerberos认证)
    create_scanner_resp = requests.post(
        HBASE_REST_URL,
        data=filter_payload,
        headers={"Content-Type": "application/xml"},
        auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL)
    )
    create_scanner_resp.raise_for_status()
    
    # 获取扫描器ID,循环读取结果
    scanner_id = create_scanner_resp.headers["Location"].split("/")[-1]
    scanner_url = f"{HBASE_REST_URL}/{scanner_id}"
    
    try:
        while True:
            result_resp = requests.get(scanner_url, auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL))
            if not result_resp.content:
                break
            # 处理返回的XML结果(可根据需求解析为字典或其他格式)
            print(result_resp.text)
    finally:
        # 关闭扫描器释放资源
        requests.delete(scanner_url, auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL))
    

    注意:确保SAS服务器已配置Kerberos客户端,能通过kinit获取到访问HBase REST服务的有效票据。

方案2:使用HBase Thrift 2.x客户端(性能更优)

Cloudera集群支持HBase Thrift 2.x版本,相比老旧的Thrift 1.x,它提供了完整的HBase API支持,包括所有过滤条件。

  1. 启用HBase Thrift2服务
    在Cloudera Manager中找到HBase服务,启用Thrift2角色,配置Kerberos认证(服务主体、SASL配置等)。

  2. 生成Thrift2 Python客户端代码

    • 从对应版本的HBase源码中获取hbase.thrift文件(比如Cloudera CDP 7.x对应HBase 2.x)
    • 使用Thrift编译器生成Python客户端代码:
      thrift --gen py hbase.thrift
      
    • 将生成的gen-py目录中的代码复制到SAS服务器的Python项目中。
  3. 编写Thrift2客户端代码
    示例代码实现带SingleColumnValueFilter的查询:

    from thrift import Thrift
    from thrift.transport import TSocket, TTransport
    from thrift.protocol import TBinaryProtocol
    from hbase import Hbase
    from hbase.ttypes import *
    
    # 替换为你的集群信息
    THRIFT2_HOST = "hbase-thrift2-node.example.com"
    THRIFT2_PORT = 9090
    HBASE_SERVICE_PRINCIPAL = "hbase/hbase-thrift2-node.example.com@EXAMPLE.COM"
    CLIENT_PRINCIPAL = "sas_service_account@EXAMPLE.COM"
    KRB5_CONF_PATH = "/etc/krb5.conf"
    
    # 初始化Kerberos认证的Thrift传输
    transport = TSocket.TSocket(THRIFT2_HOST, THRIFT2_PORT)
    transport = TTransport.TSaslClientTransport(
        transport,
        HBASE_SERVICE_PRINCIPAL,
        'GSSAPI',
        CLIENT_PRINCIPAL,
        KRB5_CONF_PATH
    )
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = Hbase.Client(protocol)
    
    try:
        transport.open()
    
        # 构造SingleColumnValueFilter
        filter_obj = SingleColumnValueFilter(
            family=b'my_cf',
            qualifier=b'my_column',
            op=CompareOp.EQUAL,
            comparator=BinaryComparator(value=b'target_value'),
            filterIfMissing=True
        )
    
        # 打开扫描器并读取数据
        scanner_id = client.scannerOpenWithFilter(
            tableName=b'my_hbase_table',
            filter=filter_obj
        )
    
        while True:
            rows = client.scannerGet(scanner_id)
            if not rows:
                break
            for row in rows:
                print(f"Row Key: {row.row.decode('utf-8')}")
                for col_name, cell in row.columns.items():
                    print(f"  Column: {col_name.decode('utf-8')}, Value: {cell.value.decode('utf-8')}")
    
        # 关闭扫描器
        client.scannerClose(scanner_id)
    except Thrift.TException as tx:
        print(f"Thrift Error: {tx.message}")
    finally:
        transport.close()
    

    注意:确保生成的客户端代码与集群HBase版本完全匹配,避免兼容性问题。

方案3:自定义内部代理服务(适用于网络受限场景)

如果SAS服务器无法直接访问集群的REST或Thrift服务,可以在集群内部部署一个轻量的Python代理服务(比如用Flask/FastAPI):

  • 代理服务运行在集群节点上,使用PySpark或HBase Java客户端(通过JPype调用)直接访问HBase,这两种方式都完整支持所有过滤条件。
  • SAS服务器通过HTTP请求向代理服务传递过滤规则和读写指令,代理服务处理后返回结果。

示例PySpark代码片段(读取带过滤的HBase表):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("HBaseProxyService") \
    .config("spark.hadoop.hbase.zookeeper.quorum", "zk-node1.example.com,zk-node2.example.com") \
    .getOrCreate()

# 读取HBase表并应用过滤条件
hbase_df = spark.read.format("org.apache.hadoop.hbase.spark") \
    .option("hbase.table", "my_hbase_table") \
    .option("hbase.columns.mapping", "key STRING, my_cf:my_column STRING") \
    .load() \
    .filter(col("my_cf:my_column") == "target_value")

# 将结果转为字典列表返回给SAS服务器
result = [row.asDict() for row in hbase_df.collect()]

内容的提问来源于stack exchange,提问作者Srini Ravi

火山引擎 最新活动