启用Kerberos的Cloudera集群:HBase与SAS集成读写问题
我碰到过不少类似的场景——在Kerberized的Cloudera集群里用Python通过Thrift访问HBase时,高级过滤条件的支持确实是个痛点,Thrift 1.x和基于它的HappyBase都对SingleColumnValueFilter这类过滤器支持有限。下面给你几个经过验证的解决方案,按推荐程度排序:
方案1:切换到HBase REST API + Python Requests(最易实现)
HBase REST API对Kerberos认证支持完善,并且完整覆盖了所有HBase过滤规则,包括你需要的SingleColumnValueFilter。具体步骤如下:
在Cloudera集群启用HBase REST服务
登录Cloudera Manager,找到HBase服务,进入配置页,启用HBase REST角色,并确保Kerberos认证配置正确(服务主体、密钥表等)。在SAS服务器安装依赖库
打开终端执行:pip install requests requests-kerberos编写带过滤条件的Python代码
示例代码实现SingleColumnValueFilter过滤查询:import requests from requests_kerberos import HTTPKerberosAuth, OPTIONAL # 替换为你的HBase REST服务地址和表名 HBASE_REST_URL = "http://hbase-rest-node.example.com:8080/my_hbase_table/scanner" # 构造XML格式的过滤条件(SingleColumnValueFilter示例) filter_payload = """ <Scanner batch="100"> <filter> <SingleColumnValueFilter> <column>my_cf:my_column</column> <comparator> <BinaryComparator> <value>target_value</value> </BinaryComparator> </comparator> <compareOp>EQUAL</compareOp> <filterIfMissing>true</filterIfMissing> </SingleColumnValueFilter> </filter> </Scanner> """ # 发送请求创建扫描器(Kerberos认证) create_scanner_resp = requests.post( HBASE_REST_URL, data=filter_payload, headers={"Content-Type": "application/xml"}, auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL) ) create_scanner_resp.raise_for_status() # 获取扫描器ID,循环读取结果 scanner_id = create_scanner_resp.headers["Location"].split("/")[-1] scanner_url = f"{HBASE_REST_URL}/{scanner_id}" try: while True: result_resp = requests.get(scanner_url, auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL)) if not result_resp.content: break # 处理返回的XML结果(可根据需求解析为字典或其他格式) print(result_resp.text) finally: # 关闭扫描器释放资源 requests.delete(scanner_url, auth=HTTPKerberosAuth(mutual_authentication=OPTIONAL))注意:确保SAS服务器已配置Kerberos客户端,能通过
kinit获取到访问HBase REST服务的有效票据。
方案2:使用HBase Thrift 2.x客户端(性能更优)
Cloudera集群支持HBase Thrift 2.x版本,相比老旧的Thrift 1.x,它提供了完整的HBase API支持,包括所有过滤条件。
启用HBase Thrift2服务
在Cloudera Manager中找到HBase服务,启用Thrift2角色,配置Kerberos认证(服务主体、SASL配置等)。生成Thrift2 Python客户端代码
- 从对应版本的HBase源码中获取
hbase.thrift文件(比如Cloudera CDP 7.x对应HBase 2.x) - 使用Thrift编译器生成Python客户端代码:
thrift --gen py hbase.thrift - 将生成的
gen-py目录中的代码复制到SAS服务器的Python项目中。
- 从对应版本的HBase源码中获取
编写Thrift2客户端代码
示例代码实现带SingleColumnValueFilter的查询:from thrift import Thrift from thrift.transport import TSocket, TTransport from thrift.protocol import TBinaryProtocol from hbase import Hbase from hbase.ttypes import * # 替换为你的集群信息 THRIFT2_HOST = "hbase-thrift2-node.example.com" THRIFT2_PORT = 9090 HBASE_SERVICE_PRINCIPAL = "hbase/hbase-thrift2-node.example.com@EXAMPLE.COM" CLIENT_PRINCIPAL = "sas_service_account@EXAMPLE.COM" KRB5_CONF_PATH = "/etc/krb5.conf" # 初始化Kerberos认证的Thrift传输 transport = TSocket.TSocket(THRIFT2_HOST, THRIFT2_PORT) transport = TTransport.TSaslClientTransport( transport, HBASE_SERVICE_PRINCIPAL, 'GSSAPI', CLIENT_PRINCIPAL, KRB5_CONF_PATH ) protocol = TBinaryProtocol.TBinaryProtocol(transport) client = Hbase.Client(protocol) try: transport.open() # 构造SingleColumnValueFilter filter_obj = SingleColumnValueFilter( family=b'my_cf', qualifier=b'my_column', op=CompareOp.EQUAL, comparator=BinaryComparator(value=b'target_value'), filterIfMissing=True ) # 打开扫描器并读取数据 scanner_id = client.scannerOpenWithFilter( tableName=b'my_hbase_table', filter=filter_obj ) while True: rows = client.scannerGet(scanner_id) if not rows: break for row in rows: print(f"Row Key: {row.row.decode('utf-8')}") for col_name, cell in row.columns.items(): print(f" Column: {col_name.decode('utf-8')}, Value: {cell.value.decode('utf-8')}") # 关闭扫描器 client.scannerClose(scanner_id) except Thrift.TException as tx: print(f"Thrift Error: {tx.message}") finally: transport.close()注意:确保生成的客户端代码与集群HBase版本完全匹配,避免兼容性问题。
方案3:自定义内部代理服务(适用于网络受限场景)
如果SAS服务器无法直接访问集群的REST或Thrift服务,可以在集群内部部署一个轻量的Python代理服务(比如用Flask/FastAPI):
- 代理服务运行在集群节点上,使用PySpark或HBase Java客户端(通过JPype调用)直接访问HBase,这两种方式都完整支持所有过滤条件。
- SAS服务器通过HTTP请求向代理服务传递过滤规则和读写指令,代理服务处理后返回结果。
示例PySpark代码片段(读取带过滤的HBase表):
from pyspark.sql import SparkSession from pyspark.sql.functions import col spark = SparkSession.builder \ .appName("HBaseProxyService") \ .config("spark.hadoop.hbase.zookeeper.quorum", "zk-node1.example.com,zk-node2.example.com") \ .getOrCreate() # 读取HBase表并应用过滤条件 hbase_df = spark.read.format("org.apache.hadoop.hbase.spark") \ .option("hbase.table", "my_hbase_table") \ .option("hbase.columns.mapping", "key STRING, my_cf:my_column STRING") \ .load() \ .filter(col("my_cf:my_column") == "target_value") # 将结果转为字典列表返回给SAS服务器 result = [row.asDict() for row in hbase_df.collect()]
内容的提问来源于stack exchange,提问作者Srini Ravi




