Documentation Index
Fetch the complete documentation index at: https://agno-v2-ab-home-page-updates-5-16.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Create a Python file
run_agents.py
import asyncio
import json
import httpx
from agno.client import AgentOSClient
from agno.run.agent import RunCompletedEvent, RunContentEvent
async def run_agent_non_streaming():
"""Execute a non-streaming agent run."""
print("=" * 60)
print("Non-Streaming Agent Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
config = await client.aget_config()
if not config.agents:
print("No agents available")
return
agent_id = config.agents[0].id
print(f"Running agent: {agent_id}")
result = await client.run_agent(
agent_id=agent_id,
message="What is 2 + 2? Explain your answer briefly.",
)
print(f"\nRun ID: {result.run_id}")
print(f"Content: {result.content}")
print(f"Tokens: {result.metrics.total_tokens if result.metrics else 'N/A'}")
async def run_agent_streaming():
"""Execute a streaming agent run."""
print("\n" + "=" * 60)
print("Streaming Agent Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
config = await client.aget_config()
if not config.agents:
print("No agents available")
return
agent_id = config.agents[0].id
print(f"Streaming from agent: {agent_id}")
print("\nResponse: ", end="", flush=True)
async for event in client.run_agent_stream(
agent_id=agent_id,
message="Tell me a short joke.",
):
if isinstance(event, RunContentEvent):
print(event.content, end="", flush=True)
elif isinstance(event, RunCompletedEvent):
pass
print("\n")
async def run_agent_background_resumable():
"""Start a background streaming run and reconnect via /resume."""
print("\n" + "=" * 60)
print("Background Resumable Agent Run (SSE)")
print("=" * 60)
BASE_URL = "http://localhost:7777"
async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:
agents = (await client.get("/agents")).json()
agent_id = agents[0]["id"]
# Phase 1: Start a background streaming run, disconnect after a few events
run_id = None
session_id = None
last_event_index = None
print("Starting background stream...")
async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
form_data = {
"message": "Write a detailed story about a brave knight.",
"stream": "true",
"background": "true",
}
async with client.stream("POST", f"/agents/{agent_id}/runs", data=form_data) as response:
event_count = 0
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n\n" in buffer:
event_str, buffer = buffer.split("\n\n", 1)
for line in event_str.strip().split("\n"):
if not line.startswith("data: "):
continue
data = json.loads(line[6:])
if data.get("run_id") and not run_id:
run_id = data["run_id"]
if data.get("session_id") and not session_id:
session_id = data["session_id"]
if data.get("event_index") is not None:
last_event_index = data["event_index"]
event_count += 1
print(f" [{event_count}] index={data.get('event_index')} event={data.get('event')}")
if event_count >= 5:
break
if event_count >= 5:
break
if event_count >= 5:
break
print(f"\nDisconnected after {event_count} events (last_event_index={last_event_index})")
# Phase 2: Simulate being away
await asyncio.sleep(2)
# Phase 3: Reconnect via /resume
print("\nReconnecting via /resume...")
form_data = {}
if last_event_index is not None:
form_data["last_event_index"] = str(last_event_index)
if session_id:
form_data["session_id"] = session_id
async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
async with client.stream(
"POST", f"/agents/{agent_id}/runs/{run_id}/resume", data=form_data
) as response:
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n\n" in buffer:
event_str, buffer = buffer.split("\n\n", 1)
for line in event_str.strip().split("\n"):
if not line.startswith("data: "):
continue
data = json.loads(line[6:])
event_type = data.get("event")
if event_type in ("catch_up", "replay", "subscribed"):
print(f" [META] {event_type}")
else:
print(f" [RESUME] index={data.get('event_index')} event={event_type}")
print("\nDone!")
async def main():
await run_agent_non_streaming()
await run_agent_streaming()
await run_agent_background_resumable()
if __name__ == "__main__":
asyncio.run(main())
Start an AgentOS Server
Make sure you have an AgentOS server running on port 7777. See Creating Your First OS for setup instructions.