Skip to main content

Agent Execution

Execute agent tasks and manage agent lifecycle in Python applications.

Overview

Agent execution involves running agents in various contexts, from simple one-off tasks to long-running services. The 0G AI SDK provides flexible execution patterns to suit different application needs.

Execution Patterns

Simple Execution

Basic agent execution for straightforward tasks:
import asyncio
from zg_ai_sdk import create_agent

async def simple_execution():
    agent = await create_agent({
        'name': 'Task Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'task-memory',
        'private_key': 'your-private-key'
    })
    
    await agent.init()
    
    # Execute single task
    result = await agent.ask('Explain quantum computing in simple terms')
    return result

# Run the task
result = asyncio.run(simple_execution())
print(result)

Batch Execution

Execute multiple tasks efficiently:
async def batch_execution(tasks):
    agent = await create_agent({
        'name': 'Batch Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'batch-memory',
        'private_key': 'your-private-key'
    })
    
    await agent.init()
    
    results = []
    for i, task in enumerate(tasks):
        print(f"Processing task {i+1}/{len(tasks)}: {task[:50]}...")
        result = await agent.ask(task)
        results.append({'task': task, 'result': result})
    
    return results

Streaming Execution

Real-time execution with streaming responses:
async def streaming_execution(prompt):
    agent = await create_agent({
        'name': 'Streaming Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'streaming-memory',
        'private_key': 'your-private-key'
    })
    
    await agent.init()
    
    def handle_chunk(chunk: str):
        print(chunk, end='', flush=True)
    
    result = await agent.stream_chat(prompt, handle_chunk)
    return result

Examples

import asyncio
from datetime import datetime
from zg_ai_sdk import create_agent

class TaskAutomator:
    def __init__(self, agent_config):
        self.agent_config = agent_config
        self.agent = None
        self.task_history = []
    
    async def initialize(self):
        """Initialize the agent"""
        self.agent = await create_agent(self.agent_config)
        await self.agent.init()
        
        # Set up system prompt for task automation
        self.agent.set_system_prompt('''
        You are a task automation assistant. When given tasks:
        1. Break down complex tasks into steps
        2. Provide clear, actionable instructions
        3. Estimate time requirements when possible
        4. Identify potential issues or dependencies
        ''')
    
    async def execute_task(self, task_description, context=None):
        """Execute a single task"""
        if not self.agent:
            await self.initialize()
        
        # Add context if provided
        full_prompt = task_description
        if context:
            full_prompt = f"Context: {context}\n\nTask: {task_description}"
        
        # Execute task
        start_time = datetime.now()
        result = await self.agent.chat_with_context(full_prompt)
        end_time = datetime.now()
        
        # Record task execution
        task_record = {
            'task': task_description,
            'context': context,
            'result': result,
            'start_time': start_time,
            'end_time': end_time,
            'duration': (end_time - start_time).total_seconds()
        }
        
        self.task_history.append(task_record)
        
        # Store in agent memory
        await self.agent.remember(
            f'task_{len(self.task_history)}',
            task_record
        )
        
        return task_record
    
    async def execute_workflow(self, tasks):
        """Execute a series of related tasks"""
        if not self.agent:
            await self.initialize()
        
        workflow_results = []
        context = ""
        
        for i, task in enumerate(tasks):
            print(f"Executing step {i+1}: {task}")
            
            # Use previous results as context
            result = await self.execute_task(task, context)
            workflow_results.append(result)
            
            # Build context for next task
            context += f"Previous step result: {result['result'][:200]}...\n"
        
        return workflow_results
    
    def get_task_summary(self):
        """Get summary of executed tasks"""
        if not self.task_history:
            return "No tasks executed yet."
        
        total_tasks = len(self.task_history)
        total_time = sum(task['duration'] for task in self.task_history)
        avg_time = total_time / total_tasks
        
        return {
            'total_tasks': total_tasks,
            'total_time': total_time,
            'average_time': avg_time,
            'recent_tasks': [task['task'] for task in self.task_history[-5:]]
        }

async def main():
    config = {
        'name': 'Task Automator',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'task-automation',
        'private_key': 'your-private-key'
    }
    
    automator = TaskAutomator(config)
    
    # Execute single task
    task_result = await automator.execute_task(
        "Create a project plan for building a web application"
    )
    print("Task completed in", task_result['duration'], "seconds")
    
    # Execute workflow
    workflow_tasks = [
        "Define project requirements and scope",
        "Design system architecture",
        "Create development timeline",
        "Identify potential risks and mitigation strategies"
    ]
    
    workflow_results = await automator.execute_workflow(workflow_tasks)
    
    # Get summary
    summary = automator.get_task_summary()
    print("Workflow Summary:", summary)

asyncio.run(main())

Execution Contexts

Web Application Integration

from fastapi import FastAPI, BackgroundTasks
from zg_ai_sdk import create_agent

app = FastAPI()
agent = None

@app.on_event("startup")
async def startup_event():
    global agent
    agent = await create_agent({
        'name': 'Web Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'web-memory',
        'private_key': 'your-private-key'
    })
    await agent.init()

@app.post("/ask")
async def ask_agent(question: str):
    response = await agent.ask(question)
    return {"response": response}

@app.post("/ask-async")
async def ask_agent_async(question: str, background_tasks: BackgroundTasks):
    task_id = f"task_{datetime.now().timestamp()}"
    background_tasks.add_task(process_async_task, task_id, question)
    return {"task_id": task_id, "status": "processing"}

async def process_async_task(task_id: str, question: str):
    result = await agent.ask(question)
    # Store result somewhere (database, cache, etc.)
    await store_result(task_id, result)

CLI Application

import click
import asyncio
from zg_ai_sdk import create_agent

@click.command()
@click.option('--question', '-q', help='Question to ask the agent')
@click.option('--stream', '-s', is_flag=True, help='Stream the response')
@click.option('--config', '-c', help='Agent configuration file')
def cli_agent(question, stream, config):
    """CLI interface for 0G AI Agent"""
    asyncio.run(run_cli_agent(question, stream, config))

async def run_cli_agent(question, stream, config_file):
    # Load configuration
    config = load_config(config_file) if config_file else get_default_config()
    
    # Create and initialize agent
    agent = await create_agent(config)
    await agent.init()
    
    if stream:
        def print_chunk(chunk):
            print(chunk, end='', flush=True)
        
        await agent.stream_chat(question, print_chunk)
        print()  # New line after streaming
    else:
        response = await agent.ask(question)
        print(response)

if __name__ == '__main__':
    cli_agent()

Performance Optimization

Connection Pooling

class AgentPool:
    def __init__(self, config, pool_size=5):
        self.config = config
        self.pool_size = pool_size
        self.available_agents = asyncio.Queue()
        self.busy_agents = set()
    
    async def initialize_pool(self):
        """Initialize agent pool"""
        for i in range(self.pool_size):
            agent = await create_agent({
                **self.config,
                'name': f"{self.config['name']} Pool {i}",
                'memory_bucket': f"{self.config['memory_bucket']}-pool-{i}"
            })
            await agent.init()
            await self.available_agents.put(agent)
    
    async def get_agent(self):
        """Get an available agent from the pool"""
        agent = await self.available_agents.get()
        self.busy_agents.add(agent)
        return agent
    
    async def return_agent(self, agent):
        """Return agent to the pool"""
        self.busy_agents.discard(agent)
        await self.available_agents.put(agent)

Caching Results

from functools import wraps
import hashlib
import json

def cache_agent_results(cache_ttl=300):
    """Decorator to cache agent results"""
    cache = {}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Create cache key
            cache_key = hashlib.md5(
                json.dumps(str(args) + str(kwargs)).encode()
            ).hexdigest()
            
            # Check cache
            if cache_key in cache:
                result, timestamp = cache[cache_key]
                if time.time() - timestamp < cache_ttl:
                    return result
            
            # Execute function
            result = await func(*args, **kwargs)
            
            # Cache result
            cache[cache_key] = (result, time.time())
            
            return result
        return wrapper
    return decorator

@cache_agent_results(cache_ttl=600)  # 10 minute cache
async def cached_agent_ask(agent, question):
    return await agent.ask(question)

Monitoring and Logging

import logging
from datetime import datetime

class AgentExecutionMonitor:
    def __init__(self):
        self.logger = logging.getLogger('agent_execution')
        self.execution_log = []
    
    def log_execution_start(self, agent_name, task):
        """Log execution start"""
        log_entry = {
            'agent': agent_name,
            'task': task[:100],  # Truncate long tasks
            'start_time': datetime.now(),
            'status': 'started'
        }
        
        self.execution_log.append(log_entry)
        self.logger.info(f"Agent {agent_name} started task: {task[:50]}...")
        
        return len(self.execution_log) - 1  # Return log index
    
    def log_execution_end(self, log_index, result=None, error=None):
        """Log execution end"""
        if log_index < len(self.execution_log):
            log_entry = self.execution_log[log_index]
            log_entry['end_time'] = datetime.now()
            log_entry['duration'] = (log_entry['end_time'] - log_entry['start_time']).total_seconds()
            
            if error:
                log_entry['status'] = 'failed'
                log_entry['error'] = str(error)
                self.logger.error(f"Agent {log_entry['agent']} failed: {error}")
            else:
                log_entry['status'] = 'completed'
                log_entry['result_length'] = len(result) if result else 0
                self.logger.info(f"Agent {log_entry['agent']} completed in {log_entry['duration']:.2f}s")

Best Practices

  1. Resource Management: Always properly initialize and clean up agents
  2. Error Handling: Implement comprehensive error handling and retry logic
  3. Performance: Use connection pooling and caching for high-throughput applications
  4. Monitoring: Log execution metrics and monitor agent performance
  5. Scalability: Design for horizontal scaling with multiple agent instances
  6. Security: Secure private keys and API credentials properly

Next Steps