Documentation Index
Fetch the complete documentation index at: https://0g.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Agent Execution
Execute agent tasks and manage agent lifecycle in Python applications.Overview
Agent execution involves running agents in various contexts, from simple one-off tasks to long-running services. The 0G AI SDK provides flexible execution patterns to suit different application needs.Execution Patterns
Simple Execution
Basic agent execution for straightforward tasks:import asyncio
from zg_ai_sdk import create_agent
async def simple_execution():
agent = await create_agent({
'name': 'Task Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'task-memory',
'private_key': 'your-private-key'
})
await agent.init()
# Execute single task
result = await agent.ask('Explain quantum computing in simple terms')
return result
# Run the task
result = asyncio.run(simple_execution())
print(result)
Batch Execution
Execute multiple tasks efficiently:async def batch_execution(tasks):
agent = await create_agent({
'name': 'Batch Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'batch-memory',
'private_key': 'your-private-key'
})
await agent.init()
results = []
for i, task in enumerate(tasks):
print(f"Processing task {i+1}/{len(tasks)}: {task[:50]}...")
result = await agent.ask(task)
results.append({'task': task, 'result': result})
return results
Streaming Execution
Real-time execution with streaming responses:async def streaming_execution(prompt):
agent = await create_agent({
'name': 'Streaming Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'streaming-memory',
'private_key': 'your-private-key'
})
await agent.init()
def handle_chunk(chunk: str):
print(chunk, end='', flush=True)
result = await agent.stream_chat(prompt, handle_chunk)
return result
Examples
import asyncio
from datetime import datetime
from zg_ai_sdk import create_agent
class TaskAutomator:
def __init__(self, agent_config):
self.agent_config = agent_config
self.agent = None
self.task_history = []
async def initialize(self):
"""Initialize the agent"""
self.agent = await create_agent(self.agent_config)
await self.agent.init()
# Set up system prompt for task automation
self.agent.set_system_prompt('''
You are a task automation assistant. When given tasks:
1. Break down complex tasks into steps
2. Provide clear, actionable instructions
3. Estimate time requirements when possible
4. Identify potential issues or dependencies
''')
async def execute_task(self, task_description, context=None):
"""Execute a single task"""
if not self.agent:
await self.initialize()
# Add context if provided
full_prompt = task_description
if context:
full_prompt = f"Context: {context}\n\nTask: {task_description}"
# Execute task
start_time = datetime.now()
result = await self.agent.chat_with_context(full_prompt)
end_time = datetime.now()
# Record task execution
task_record = {
'task': task_description,
'context': context,
'result': result,
'start_time': start_time,
'end_time': end_time,
'duration': (end_time - start_time).total_seconds()
}
self.task_history.append(task_record)
# Store in agent memory
await self.agent.remember(
f'task_{len(self.task_history)}',
task_record
)
return task_record
async def execute_workflow(self, tasks):
"""Execute a series of related tasks"""
if not self.agent:
await self.initialize()
workflow_results = []
context = ""
for i, task in enumerate(tasks):
print(f"Executing step {i+1}: {task}")
# Use previous results as context
result = await self.execute_task(task, context)
workflow_results.append(result)
# Build context for next task
context += f"Previous step result: {result['result'][:200]}...\n"
return workflow_results
def get_task_summary(self):
"""Get summary of executed tasks"""
if not self.task_history:
return "No tasks executed yet."
total_tasks = len(self.task_history)
total_time = sum(task['duration'] for task in self.task_history)
avg_time = total_time / total_tasks
return {
'total_tasks': total_tasks,
'total_time': total_time,
'average_time': avg_time,
'recent_tasks': [task['task'] for task in self.task_history[-5:]]
}
async def main():
config = {
'name': 'Task Automator',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'task-automation',
'private_key': 'your-private-key'
}
automator = TaskAutomator(config)
# Execute single task
task_result = await automator.execute_task(
"Create a project plan for building a web application"
)
print("Task completed in", task_result['duration'], "seconds")
# Execute workflow
workflow_tasks = [
"Define project requirements and scope",
"Design system architecture",
"Create development timeline",
"Identify potential risks and mitigation strategies"
]
workflow_results = await automator.execute_workflow(workflow_tasks)
# Get summary
summary = automator.get_task_summary()
print("Workflow Summary:", summary)
asyncio.run(main())
Execution Contexts
Web Application Integration
from fastapi import FastAPI, BackgroundTasks
from zg_ai_sdk import create_agent
app = FastAPI()
agent = None
@app.on_event("startup")
async def startup_event():
global agent
agent = await create_agent({
'name': 'Web Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'web-memory',
'private_key': 'your-private-key'
})
await agent.init()
@app.post("/ask")
async def ask_agent(question: str):
response = await agent.ask(question)
return {"response": response}
@app.post("/ask-async")
async def ask_agent_async(question: str, background_tasks: BackgroundTasks):
task_id = f"task_{datetime.now().timestamp()}"
background_tasks.add_task(process_async_task, task_id, question)
return {"task_id": task_id, "status": "processing"}
async def process_async_task(task_id: str, question: str):
result = await agent.ask(question)
# Store result somewhere (database, cache, etc.)
await store_result(task_id, result)
CLI Application
import click
import asyncio
from zg_ai_sdk import create_agent
@click.command()
@click.option('--question', '-q', help='Question to ask the agent')
@click.option('--stream', '-s', is_flag=True, help='Stream the response')
@click.option('--config', '-c', help='Agent configuration file')
def cli_agent(question, stream, config):
"""CLI interface for 0G AI Agent"""
asyncio.run(run_cli_agent(question, stream, config))
async def run_cli_agent(question, stream, config_file):
# Load configuration
config = load_config(config_file) if config_file else get_default_config()
# Create and initialize agent
agent = await create_agent(config)
await agent.init()
if stream:
def print_chunk(chunk):
print(chunk, end='', flush=True)
await agent.stream_chat(question, print_chunk)
print() # New line after streaming
else:
response = await agent.ask(question)
print(response)
if __name__ == '__main__':
cli_agent()
Performance Optimization
Connection Pooling
class AgentPool:
def __init__(self, config, pool_size=5):
self.config = config
self.pool_size = pool_size
self.available_agents = asyncio.Queue()
self.busy_agents = set()
async def initialize_pool(self):
"""Initialize agent pool"""
for i in range(self.pool_size):
agent = await create_agent({
**self.config,
'name': f"{self.config['name']} Pool {i}",
'memory_bucket': f"{self.config['memory_bucket']}-pool-{i}"
})
await agent.init()
await self.available_agents.put(agent)
async def get_agent(self):
"""Get an available agent from the pool"""
agent = await self.available_agents.get()
self.busy_agents.add(agent)
return agent
async def return_agent(self, agent):
"""Return agent to the pool"""
self.busy_agents.discard(agent)
await self.available_agents.put(agent)
Caching Results
from functools import wraps
import hashlib
import json
def cache_agent_results(cache_ttl=300):
"""Decorator to cache agent results"""
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key
cache_key = hashlib.md5(
json.dumps(str(args) + str(kwargs)).encode()
).hexdigest()
# Check cache
if cache_key in cache:
result, timestamp = cache[cache_key]
if time.time() - timestamp < cache_ttl:
return result
# Execute function
result = await func(*args, **kwargs)
# Cache result
cache[cache_key] = (result, time.time())
return result
return wrapper
return decorator
@cache_agent_results(cache_ttl=600) # 10 minute cache
async def cached_agent_ask(agent, question):
return await agent.ask(question)
Monitoring and Logging
import logging
from datetime import datetime
class AgentExecutionMonitor:
def __init__(self):
self.logger = logging.getLogger('agent_execution')
self.execution_log = []
def log_execution_start(self, agent_name, task):
"""Log execution start"""
log_entry = {
'agent': agent_name,
'task': task[:100], # Truncate long tasks
'start_time': datetime.now(),
'status': 'started'
}
self.execution_log.append(log_entry)
self.logger.info(f"Agent {agent_name} started task: {task[:50]}...")
return len(self.execution_log) - 1 # Return log index
def log_execution_end(self, log_index, result=None, error=None):
"""Log execution end"""
if log_index < len(self.execution_log):
log_entry = self.execution_log[log_index]
log_entry['end_time'] = datetime.now()
log_entry['duration'] = (log_entry['end_time'] - log_entry['start_time']).total_seconds()
if error:
log_entry['status'] = 'failed'
log_entry['error'] = str(error)
self.logger.error(f"Agent {log_entry['agent']} failed: {error}")
else:
log_entry['status'] = 'completed'
log_entry['result_length'] = len(result) if result else 0
self.logger.info(f"Agent {log_entry['agent']} completed in {log_entry['duration']:.2f}s")
Best Practices
- Resource Management: Always properly initialize and clean up agents
- Error Handling: Implement comprehensive error handling and retry logic
- Performance: Use connection pooling and caching for high-throughput applications
- Monitoring: Log execution metrics and monitor agent performance
- Scalability: Design for horizontal scaling with multiple agent instances
- Security: Secure private keys and API credentials properly