You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用Google ADK Runner与Session Service时API密钥缺失错误排查

问题排查与解决:Google ADK Runner/Session Service 缺失API密钥错误

问题原因

你之前通过adk create配置的API密钥,在使用adk web运行普通Agent时会被工具自动读取加载,但当你手动实例化Agent并配合RunnerSession Service使用时,必须显式给Agent传入API密钥参数——这种自定义代码场景下,ADK不会自动读取配置文件里的密钥,导致调用Google AI API时凭证缺失,触发错误。

解决方法

有两种可靠的修复方式:

方式1:直接在Agent初始化时传入API密钥

在创建root_agent时,添加api_key参数,填入你通过adk create配置的密钥:

root_agent = Agent(
    name="agent_47",
    model="gemini-1.5-flash-latest",
    description="Agent that provides which method to fry a food item",
    instruction="""You are an agent that master at frying things. you understand from the user that which item that,
            that they want to fry and gave instruction about it.
                 """,
    api_key="你的Google AI API密钥"  # 填入实际密钥
)

方式2:从环境变量读取API密钥(推荐)

为避免硬编码密钥,更安全的做法是从环境变量读取(adk create会自动把密钥存入GOOGLE_API_KEY环境变量):

import os

root_agent = Agent(
    name="agent_47",
    model="gemini-1.5-flash-latest",
    description="Agent that provides which method to fry a food item",
    instruction="""You are an agent that master at frying things. you understand from the user that which item that,
            that they want to fry and gave instruction about it.
                 """,
    api_key=os.getenv("GOOGLE_API_KEY")  # 从环境变量读取密钥
)

修改后的完整代码

import asyncio
import os  # 新增环境变量模块导入

from google.adk.runners import Runner
from google.adk.agents import Agent
from google.adk.sessions import InMemorySessionService
from google.genai import types

root_agent = Agent(
    name="agent_47",
    model="gemini-1.5-flash-latest",
    description="Agent that provides which method to fry a food item",
    instruction="""You are an agent that master at frying things. you understand from the user that which item that,
            that they want to fry and gave instruction about it.
                 """,
    api_key=os.getenv("GOOGLE_API_KEY")  # 新增API密钥参数
)

APP_NAME = "Fryer"
USER_ID = "U_1"
SESSION_ID = "session_1"

session_service = InMemorySessionService()

async def init_session(app_name:str,user_id:str,session_id:str) -> InMemorySessionService:
    session = await session_service.create_session(
        app_name=app_name,
        user_id=user_id,
        session_id=session_id
    )
    print(f"session created: App='{app_name}', User='{user_id}', Session='{session_id}'")
    return session

session = asyncio.run(init_session(APP_NAME,USER_ID,SESSION_ID))

runner = Runner(
    agent=root_agent,
    app_name=APP_NAME,
    session_service=session_service
)

async def call_agent_async(query: str,runner,user_id,session_id):
    print(f"\n>>> User Query: {query}")

    content = types.Content(role='user',parts=[types.Part(text=query)])

    final_response_text = "agent did not produce a final response"

    async for event in runner.run_async(user_id=user_id, session_id=session_id,new_message=content):
        print(f"[Event] Author: {event.author}, Type: {type(event).__name__}, Final: {event.is_final_response()}, Content: {event.content}")

        if event.is_final_response():
            if event.content and event.content.parts:
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate:
                final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"

            break
    print(f"<<<Agent Response: {final_response_text}")

async def run_conversation():
    await call_agent_async("chicken",
                               runner=runner,
                               user_id=USER_ID,
                               session_id=SESSION_ID)

if __name__ == "__main__":
    try:
        asyncio.run(run_conversation())
    except Exception as e:
        print(f"an error occurred: {e}")

内容的提问来源于stack exchange,提问作者trueharmonyalan

火山引擎 最新活动