asyncio aiohttp API请求中携带Token设置请求头的正确方法及ClientConnectorCertificateError问题排查
解决aiohttp添加Bearer认证后出现的ClientConnectorCertificateError及正确请求头设置
Hey Ericp, let's tackle your issues one by step:
1. 先搞定ClientConnectorCertificateError错误
这个报错本质是SSL证书验证失败,常见原因和对应解决方案:
- 目标API用了自签名证书或不被系统默认CA信任的证书:
- 临时调试可以禁用SSL验证(注意:生产环境绝对不推荐,存在安全风险),在创建
ClientSession时添加TCPConnector配置:async with aiohttp.ClientSession(headers=headers, connector=aiohttp.TCPConnector(ssl=False)) as session: - 生产环境请指定可信的CA证书文件,保证请求安全性:
import ssl ssl_context = ssl.create_default_context(cafile="/path/to/your/ca-cert.pem") async with aiohttp.ClientSession(headers=headers, connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
- 临时调试可以禁用SSL验证(注意:生产环境绝对不推荐,存在安全风险),在创建
- 也可能是网络代理导致证书链无法正常验证,检查代理配置是否正确。
2. 正确设置Bearer Token请求头的姿势
你的代码思路没问题,但有几个优化点让代码更规范高效:
- 全局请求头复用:你已经在
ClientSession初始化时传入了headers,那么session.get()里就不需要重复传headers=headers,Session会自动为所有请求带上全局头 - 更简洁的字符串拼接:用f-string替代
format(),可读性更强 - 确保token有效性:确认
token变量是完整的、无多余空格/换行的有效字符串
修正后的完整代码
import aiohttp import asyncio import time import json # 别忘了导入json模块! token = "your-actual-token-here" # 替换成你的真实Token file_name_list = ["your-contract-id-1", "your-contract-id-2"] # 替换成你的实际合约列表 # 全局请求头,包含Bearer认证 headers = { 'accept': 'application/json', 'Authorization': f'Bearer {token}' } async def get_cch(session, url): async with session.get(url) as resp: # 无需重复传headers,Session会自动携带 # 先检查响应状态码,避免后续json解析失败 if resp.status != 200: print(f"Request failed for {url}, status code: {resp.status}") return None cch = await resp.json() contract_id = cch['data']['contractId'] # 用with语句操作文件,自动关闭文件句柄 with open(f'{contract_id}.json', 'w') as f: f.write(json.dumps(cch)) return contract_id # 返回有意义的信息,方便后续追踪 async def main(): # 添加SSL处理,这里用临时禁用示例,生产环境请替换为CA证书方式 connector = aiohttp.TCPConnector(ssl=False) async with aiohttp.ClientSession(headers=headers, connector=connector) as session: tasks = [] start_date = '2018-01-01' end_date = '2018-01-05' # 优化循环写法,直接遍历列表元素,不用索引取值 for contract in file_name_list: url = (f'https://apinergia.somenergia.coop/cch/{contract}' f'?tariff=None&type=tg_cchfact&from_={start_date}&to_={end_date}&limit=30000') tasks.append(asyncio.create_task(get_cch(session, url))) # 用create_task替代ensure_future,更符合现代asyncio规范 original_cch = await asyncio.gather(*tasks) # 打印成功保存的合约ID for contract_id in original_cch: if contract_id: print(f"Saved data for contract: {contract_id}") if __name__ == "__main__": start_time = time.time() asyncio.run(main()) # 用asyncio.run启动主协程,替代直接await main(),兼容性更好 print(f"--- {time.time() - start_time} seconds ---")
额外小提示
- 建议处理响应状态码,如果API返回4xx/5xx错误,提前捕获避免json解析报错
- 如果有大量文件写入操作,推荐使用
aiofiles库做异步文件写入,避免阻塞事件循环 - 注意API的请求频率限制,并行请求不要过多,可能会被限流,可以用
asyncio.Semaphore控制并发数
内容的提问来源于stack exchange,提问作者Ericp




