在Databricks中如何将SQL查询结果的列存储为变量,用作API数据调用的参数
在Databricks中如何将SQL查询结果的列存储为变量,用作API数据调用的参数
我懂你现在的需求——把SQL查出来的每一行姓名数据,动态塞进API请求里,而不是像现在这样硬写单个值对吧?在Databricks里用PySpark就能轻松实现,我给你一步步拆解怎么做:
第一步:获取SQL查询结果
首先执行你的SQL语句,把结果转换成PySpark DataFrame,这样我们就能方便地遍历每一行数据:
# 执行SQL查询,拿到姓名数据的DataFrame name_df = spark.sql(""" Select FirstName, LastName, Title From Default.Name """)
第二步:遍历数据并动态调用API
接下来我们要把API的初始化逻辑(比如创建会话、绑定服务)放在循环外面,避免重复初始化浪费资源,然后遍历每一行数据,把姓名动态替换到请求参数里:
from zeep import Client from zeep.transports import Transport from requests import Session from requests.auth import HTTPBasicAuth from pyspark.sql.types import StructType, StructField, StringType, BooleanType from pyspark.sql import Row from datetime import datetime # 先拿到SQL结果的行列表(如果数据量不大的话用collect()就行) name_rows = name_df.collect() # 初始化API客户端(只做一次,别放到循环里!) wsdl_url = "Test.xml" session = Session() session.auth = HTTPBasicAuth('abc','password') transport = Transport(session=session) client = Client(wsdl=wsdl_url, transport=transport) service = client.bind("Test", "BasicHttpBinding_TestService") # 提前定义结果的Schema,方便后续生成结果DataFrame result_schema = StructType([ StructField("FirstName", StringType(), True), StructField("SurName", StringType(), True), StructField("response", StringType(), True), StructField("ETLApplyDateTime", StringType(), True) ]) # 用来存储所有API响应结果的列表 api_results = [] current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 遍历每一行姓名数据 for row in name_rows: first_name = row.FirstName.strip() # 去掉字符串前后的空格,避免API报错 last_name = row.LastName.strip() title = row.Title # 如果API需要用到头衔,也可以在这里取用 # 构造请求数据,把硬编码的名字换成动态变量 request_data = { 'userAuth': { 'Nickname':"Test" }, 'requestReference': 'test', 'request': { 'Subject': { 'Forename': first_name, # 动态传入FirstName 'Surname': last_name # 动态传入LastName }, 'Address': { 'AddressLine1': '123 Test Street', 'AddressLine2': '', 'AddressLine3': 'LONDON', 'Postcode': 'ABC 123' }, 'ConsentFlag': True } } try: # 发起API调用 response = service.PerformIDCheckV2(**request_data) # 把响应转换成字符串(根据实际响应结构调整,比如用json.dumps如果是JSON格式) response_str = str(response) # 把结果添加到列表里 api_results.append(Row( FirstName=first_name, SurName=last_name, response=response_str, ETLApplyDateTime=current_timestamp )) except Exception as e: # 处理API调用失败的情况,别让整个循环崩了 print(f"处理用户 {first_name} {last_name} 时API调用出错: {str(e)}") api_results.append(Row( FirstName=first_name, SurName=last_name, response=f"调用失败: {str(e)}", ETLApplyDateTime=current_timestamp )) # 把结果列表转换成DataFrame,方便后续保存或分析 final_result_df = spark.createDataFrame(api_results, schema=result_schema) # 可以显示结果或者保存到Databricks表 final_result_df.show() # final_result_df.write.mode("overwrite").saveAsTable("Default.API_Response_Results")
注意事项(划重点!)
- 如果你的SQL结果数据量很大,别用collect()!因为collect()会把所有数据拉到Driver节点,容易内存溢出。这时候改用
foreachPartition()分布式处理,每个分区在Executor上初始化API客户端,示例如下:def process_name_partition(partition): # 每个分区里初始化一次API客户端 session = Session() session.auth = HTTPBasicAuth('abc','password') transport = Transport(session=session) client = Client(wsdl=wsdl_url, transport=transport) service = client.bind("Test", "BasicHttpBinding_TestService") current_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") for row in partition: # 这里的处理逻辑和上面循环里的完全一样 first_name = row.FirstName.strip() last_name = row.LastName.strip() # 构造request_data、发起请求、处理结果... # 调用foreachPartition处理数据 name_df.foreachPartition(process_name_partition) - 注意API的并发限制,如果数据量很大,建议在循环里加个小延迟(比如
import time; time.sleep(1)),避免被API服务商限流。 - 如果地址信息也是动态的,直接把Address相关字段加到SQL查询里,然后同样动态传入request_data就行。
备注:内容来源于stack exchange,提问作者John Bryan




