Agent Execution
Execute agent tasks and manage agent lifecycle in Python applications.Overview
Agent execution involves running agents in various contexts, from simple one-off tasks to long-running services. The 0G AI SDK provides flexible execution patterns to suit different application needs.Execution Patterns
Simple Execution
Basic agent execution for straightforward tasks:Copy
Ask AI
import asyncio
from zg_ai_sdk import create_agent
async def simple_execution():
agent = await create_agent({
'name': 'Task Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'task-memory',
'private_key': 'your-private-key'
})
await agent.init()
# Execute single task
result = await agent.ask('Explain quantum computing in simple terms')
return result
# Run the task
result = asyncio.run(simple_execution())
print(result)
Batch Execution
Execute multiple tasks efficiently:Copy
Ask AI
async def batch_execution(tasks):
agent = await create_agent({
'name': 'Batch Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'batch-memory',
'private_key': 'your-private-key'
})
await agent.init()
results = []
for i, task in enumerate(tasks):
print(f"Processing task {i+1}/{len(tasks)}: {task[:50]}...")
result = await agent.ask(task)
results.append({'task': task, 'result': result})
return results
Streaming Execution
Real-time execution with streaming responses:Copy
Ask AI
async def streaming_execution(prompt):
agent = await create_agent({
'name': 'Streaming Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'streaming-memory',
'private_key': 'your-private-key'
})
await agent.init()
def handle_chunk(chunk: str):
print(chunk, end='', flush=True)
result = await agent.stream_chat(prompt, handle_chunk)
return result
Examples
Copy
Ask AI
import asyncio
from datetime import datetime
from zg_ai_sdk import create_agent
class TaskAutomator:
def __init__(self, agent_config):
self.agent_config = agent_config
self.agent = None
self.task_history = []
async def initialize(self):
"""Initialize the agent"""
self.agent = await create_agent(self.agent_config)
await self.agent.init()
# Set up system prompt for task automation
self.agent.set_system_prompt('''
You are a task automation assistant. When given tasks:
1. Break down complex tasks into steps
2. Provide clear, actionable instructions
3. Estimate time requirements when possible
4. Identify potential issues or dependencies
''')
async def execute_task(self, task_description, context=None):
"""Execute a single task"""
if not self.agent:
await self.initialize()
# Add context if provided
full_prompt = task_description
if context:
full_prompt = f"Context: {context}\n\nTask: {task_description}"
# Execute task
start_time = datetime.now()
result = await self.agent.chat_with_context(full_prompt)
end_time = datetime.now()
# Record task execution
task_record = {
'task': task_description,
'context': context,
'result': result,
'start_time': start_time,
'end_time': end_time,
'duration': (end_time - start_time).total_seconds()
}
self.task_history.append(task_record)
# Store in agent memory
await self.agent.remember(
f'task_{len(self.task_history)}',
task_record
)
return task_record
async def execute_workflow(self, tasks):
"""Execute a series of related tasks"""
if not self.agent:
await self.initialize()
workflow_results = []
context = ""
for i, task in enumerate(tasks):
print(f"Executing step {i+1}: {task}")
# Use previous results as context
result = await self.execute_task(task, context)
workflow_results.append(result)
# Build context for next task
context += f"Previous step result: {result['result'][:200]}...\n"
return workflow_results
def get_task_summary(self):
"""Get summary of executed tasks"""
if not self.task_history:
return "No tasks executed yet."
total_tasks = len(self.task_history)
total_time = sum(task['duration'] for task in self.task_history)
avg_time = total_time / total_tasks
return {
'total_tasks': total_tasks,
'total_time': total_time,
'average_time': avg_time,
'recent_tasks': [task['task'] for task in self.task_history[-5:]]
}
async def main():
config = {
'name': 'Task Automator',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'task-automation',
'private_key': 'your-private-key'
}
automator = TaskAutomator(config)
# Execute single task
task_result = await automator.execute_task(
"Create a project plan for building a web application"
)
print("Task completed in", task_result['duration'], "seconds")
# Execute workflow
workflow_tasks = [
"Define project requirements and scope",
"Design system architecture",
"Create development timeline",
"Identify potential risks and mitigation strategies"
]
workflow_results = await automator.execute_workflow(workflow_tasks)
# Get summary
summary = automator.get_task_summary()
print("Workflow Summary:", summary)
asyncio.run(main())
Execution Contexts
Web Application Integration
Copy
Ask AI
from fastapi import FastAPI, BackgroundTasks
from zg_ai_sdk import create_agent
app = FastAPI()
agent = None
@app.on_event("startup")
async def startup_event():
global agent
agent = await create_agent({
'name': 'Web Agent',
'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
'memory_bucket': 'web-memory',
'private_key': 'your-private-key'
})
await agent.init()
@app.post("/ask")
async def ask_agent(question: str):
response = await agent.ask(question)
return {"response": response}
@app.post("/ask-async")
async def ask_agent_async(question: str, background_tasks: BackgroundTasks):
task_id = f"task_{datetime.now().timestamp()}"
background_tasks.add_task(process_async_task, task_id, question)
return {"task_id": task_id, "status": "processing"}
async def process_async_task(task_id: str, question: str):
result = await agent.ask(question)
# Store result somewhere (database, cache, etc.)
await store_result(task_id, result)
CLI Application
Copy
Ask AI
import click
import asyncio
from zg_ai_sdk import create_agent
@click.command()
@click.option('--question', '-q', help='Question to ask the agent')
@click.option('--stream', '-s', is_flag=True, help='Stream the response')
@click.option('--config', '-c', help='Agent configuration file')
def cli_agent(question, stream, config):
"""CLI interface for 0G AI Agent"""
asyncio.run(run_cli_agent(question, stream, config))
async def run_cli_agent(question, stream, config_file):
# Load configuration
config = load_config(config_file) if config_file else get_default_config()
# Create and initialize agent
agent = await create_agent(config)
await agent.init()
if stream:
def print_chunk(chunk):
print(chunk, end='', flush=True)
await agent.stream_chat(question, print_chunk)
print() # New line after streaming
else:
response = await agent.ask(question)
print(response)
if __name__ == '__main__':
cli_agent()
Performance Optimization
Connection Pooling
Copy
Ask AI
class AgentPool:
def __init__(self, config, pool_size=5):
self.config = config
self.pool_size = pool_size
self.available_agents = asyncio.Queue()
self.busy_agents = set()
async def initialize_pool(self):
"""Initialize agent pool"""
for i in range(self.pool_size):
agent = await create_agent({
**self.config,
'name': f"{self.config['name']} Pool {i}",
'memory_bucket': f"{self.config['memory_bucket']}-pool-{i}"
})
await agent.init()
await self.available_agents.put(agent)
async def get_agent(self):
"""Get an available agent from the pool"""
agent = await self.available_agents.get()
self.busy_agents.add(agent)
return agent
async def return_agent(self, agent):
"""Return agent to the pool"""
self.busy_agents.discard(agent)
await self.available_agents.put(agent)
Caching Results
Copy
Ask AI
from functools import wraps
import hashlib
import json
def cache_agent_results(cache_ttl=300):
"""Decorator to cache agent results"""
cache = {}
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key
cache_key = hashlib.md5(
json.dumps(str(args) + str(kwargs)).encode()
).hexdigest()
# Check cache
if cache_key in cache:
result, timestamp = cache[cache_key]
if time.time() - timestamp < cache_ttl:
return result
# Execute function
result = await func(*args, **kwargs)
# Cache result
cache[cache_key] = (result, time.time())
return result
return wrapper
return decorator
@cache_agent_results(cache_ttl=600) # 10 minute cache
async def cached_agent_ask(agent, question):
return await agent.ask(question)
Monitoring and Logging
Copy
Ask AI
import logging
from datetime import datetime
class AgentExecutionMonitor:
def __init__(self):
self.logger = logging.getLogger('agent_execution')
self.execution_log = []
def log_execution_start(self, agent_name, task):
"""Log execution start"""
log_entry = {
'agent': agent_name,
'task': task[:100], # Truncate long tasks
'start_time': datetime.now(),
'status': 'started'
}
self.execution_log.append(log_entry)
self.logger.info(f"Agent {agent_name} started task: {task[:50]}...")
return len(self.execution_log) - 1 # Return log index
def log_execution_end(self, log_index, result=None, error=None):
"""Log execution end"""
if log_index < len(self.execution_log):
log_entry = self.execution_log[log_index]
log_entry['end_time'] = datetime.now()
log_entry['duration'] = (log_entry['end_time'] - log_entry['start_time']).total_seconds()
if error:
log_entry['status'] = 'failed'
log_entry['error'] = str(error)
self.logger.error(f"Agent {log_entry['agent']} failed: {error}")
else:
log_entry['status'] = 'completed'
log_entry['result_length'] = len(result) if result else 0
self.logger.info(f"Agent {log_entry['agent']} completed in {log_entry['duration']:.2f}s")
Best Practices
- Resource Management: Always properly initialize and clean up agents
- Error Handling: Implement comprehensive error handling and retry logic
- Performance: Use connection pooling and caching for high-throughput applications
- Monitoring: Log execution metrics and monitor agent performance
- Scalability: Design for horizontal scaling with multiple agent instances
- Security: Secure private keys and API credentials properly