使用Google ADK Runner与Session Service时API密钥缺失错误排查
问题排查与解决:Google ADK Runner/Session Service 缺失API密钥错误
问题原因
你之前通过adk create配置的API密钥,在使用adk web运行普通Agent时会被工具自动读取加载,但当你手动实例化Agent并配合Runner、Session Service使用时,必须显式给Agent传入API密钥参数——这种自定义代码场景下,ADK不会自动读取配置文件里的密钥,导致调用Google AI API时凭证缺失,触发错误。
解决方法
有两种可靠的修复方式:
方式1:直接在Agent初始化时传入API密钥
在创建root_agent时,添加api_key参数,填入你通过adk create配置的密钥:
root_agent = Agent( name="agent_47", model="gemini-1.5-flash-latest", description="Agent that provides which method to fry a food item", instruction="""You are an agent that master at frying things. you understand from the user that which item that, that they want to fry and gave instruction about it. """, api_key="你的Google AI API密钥" # 填入实际密钥 )
方式2:从环境变量读取API密钥(推荐)
为避免硬编码密钥,更安全的做法是从环境变量读取(adk create会自动把密钥存入GOOGLE_API_KEY环境变量):
import os root_agent = Agent( name="agent_47", model="gemini-1.5-flash-latest", description="Agent that provides which method to fry a food item", instruction="""You are an agent that master at frying things. you understand from the user that which item that, that they want to fry and gave instruction about it. """, api_key=os.getenv("GOOGLE_API_KEY") # 从环境变量读取密钥 )
修改后的完整代码
import asyncio import os # 新增环境变量模块导入 from google.adk.runners import Runner from google.adk.agents import Agent from google.adk.sessions import InMemorySessionService from google.genai import types root_agent = Agent( name="agent_47", model="gemini-1.5-flash-latest", description="Agent that provides which method to fry a food item", instruction="""You are an agent that master at frying things. you understand from the user that which item that, that they want to fry and gave instruction about it. """, api_key=os.getenv("GOOGLE_API_KEY") # 新增API密钥参数 ) APP_NAME = "Fryer" USER_ID = "U_1" SESSION_ID = "session_1" session_service = InMemorySessionService() async def init_session(app_name:str,user_id:str,session_id:str) -> InMemorySessionService: session = await session_service.create_session( app_name=app_name, user_id=user_id, session_id=session_id ) print(f"session created: App='{app_name}', User='{user_id}', Session='{session_id}'") return session session = asyncio.run(init_session(APP_NAME,USER_ID,SESSION_ID)) runner = Runner( agent=root_agent, app_name=APP_NAME, session_service=session_service ) async def call_agent_async(query: str,runner,user_id,session_id): print(f"\n>>> User Query: {query}") content = types.Content(role='user',parts=[types.Part(text=query)]) final_response_text = "agent did not produce a final response" async for event in runner.run_async(user_id=user_id, session_id=session_id,new_message=content): print(f"[Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}") if event.is_final_response(): if event.content and event.content.parts: final_response_text = event.content.parts[0].text elif event.actions and event.actions.escalate: final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}" break print(f"<<<Agent Response: {final_response_text}") async def run_conversation(): await call_agent_async("chicken", runner=runner, user_id=USER_ID, session_id=SESSION_ID) if __name__ == "__main__": try: asyncio.run(run_conversation()) except Exception as e: print(f"an error occurred: {e}")
内容的提问来源于stack exchange,提问作者trueharmonyalan




