Skip to main content

Documentation Index

Fetch the complete documentation index at: https://0g.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Agent Execution

Execute agent tasks and manage agent lifecycle in Python applications.

Overview

Agent execution involves running agents in various contexts, from simple one-off tasks to long-running services. The 0G AI SDK provides flexible execution patterns to suit different application needs.

Execution Patterns

Simple Execution

Basic agent execution for straightforward tasks:
import asyncio
from zg_ai_sdk import create_agent

async def simple_execution():
    agent = await create_agent({
        'name': 'Task Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'task-memory',
        'private_key': 'your-private-key'
    })
    
    await agent.init()
    
    # Execute single task
    result = await agent.ask('Explain quantum computing in simple terms')
    return result

# Run the task
result = asyncio.run(simple_execution())
print(result)

Batch Execution

Execute multiple tasks efficiently:
async def batch_execution(tasks):
    agent = await create_agent({
        'name': 'Batch Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'batch-memory',
        'private_key': 'your-private-key'
    })
    
    await agent.init()
    
    results = []
    for i, task in enumerate(tasks):
        print(f"Processing task {i+1}/{len(tasks)}: {task[:50]}...")
        result = await agent.ask(task)
        results.append({'task': task, 'result': result})
    
    return results

Streaming Execution

Real-time execution with streaming responses:
async def streaming_execution(prompt):
    agent = await create_agent({
        'name': 'Streaming Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'streaming-memory',
        'private_key': 'your-private-key'
    })
    
    await agent.init()
    
    def handle_chunk(chunk: str):
        print(chunk, end='', flush=True)
    
    result = await agent.stream_chat(prompt, handle_chunk)
    return result

Examples

import asyncio
from datetime import datetime
from zg_ai_sdk import create_agent

class TaskAutomator:
    def __init__(self, agent_config):
        self.agent_config = agent_config
        self.agent = None
        self.task_history = []
    
    async def initialize(self):
        """Initialize the agent"""
        self.agent = await create_agent(self.agent_config)
        await self.agent.init()
        
        # Set up system prompt for task automation
        self.agent.set_system_prompt('''
        You are a task automation assistant. When given tasks:
        1. Break down complex tasks into steps
        2. Provide clear, actionable instructions
        3. Estimate time requirements when possible
        4. Identify potential issues or dependencies
        ''')
    
    async def execute_task(self, task_description, context=None):
        """Execute a single task"""
        if not self.agent:
            await self.initialize()
        
        # Add context if provided
        full_prompt = task_description
        if context:
            full_prompt = f"Context: {context}\n\nTask: {task_description}"
        
        # Execute task
        start_time = datetime.now()
        result = await self.agent.chat_with_context(full_prompt)
        end_time = datetime.now()
        
        # Record task execution
        task_record = {
            'task': task_description,
            'context': context,
            'result': result,
            'start_time': start_time,
            'end_time': end_time,
            'duration': (end_time - start_time).total_seconds()
        }
        
        self.task_history.append(task_record)
        
        # Store in agent memory
        await self.agent.remember(
            f'task_{len(self.task_history)}',
            task_record
        )
        
        return task_record
    
    async def execute_workflow(self, tasks):
        """Execute a series of related tasks"""
        if not self.agent:
            await self.initialize()
        
        workflow_results = []
        context = ""
        
        for i, task in enumerate(tasks):
            print(f"Executing step {i+1}: {task}")
            
            # Use previous results as context
            result = await self.execute_task(task, context)
            workflow_results.append(result)
            
            # Build context for next task
            context += f"Previous step result: {result['result'][:200]}...\n"
        
        return workflow_results
    
    def get_task_summary(self):
        """Get summary of executed tasks"""
        if not self.task_history:
            return "No tasks executed yet."
        
        total_tasks = len(self.task_history)
        total_time = sum(task['duration'] for task in self.task_history)
        avg_time = total_time / total_tasks
        
        return {
            'total_tasks': total_tasks,
            'total_time': total_time,
            'average_time': avg_time,
            'recent_tasks': [task['task'] for task in self.task_history[-5:]]
        }

async def main():
    config = {
        'name': 'Task Automator',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'task-automation',
        'private_key': 'your-private-key'
    }
    
    automator = TaskAutomator(config)
    
    # Execute single task
    task_result = await automator.execute_task(
        "Create a project plan for building a web application"
    )
    print("Task completed in", task_result['duration'], "seconds")
    
    # Execute workflow
    workflow_tasks = [
        "Define project requirements and scope",
        "Design system architecture",
        "Create development timeline",
        "Identify potential risks and mitigation strategies"
    ]
    
    workflow_results = await automator.execute_workflow(workflow_tasks)
    
    # Get summary
    summary = automator.get_task_summary()
    print("Workflow Summary:", summary)

asyncio.run(main())

Execution Contexts

Web Application Integration

from fastapi import FastAPI, BackgroundTasks
from zg_ai_sdk import create_agent

app = FastAPI()
agent = None

@app.on_event("startup")
async def startup_event():
    global agent
    agent = await create_agent({
        'name': 'Web Agent',
        'provider_address': '0xf07240Efa67755B5311bc75784a061eDB47165Dd',
        'memory_bucket': 'web-memory',
        'private_key': 'your-private-key'
    })
    await agent.init()

@app.post("/ask")
async def ask_agent(question: str):
    response = await agent.ask(question)
    return {"response": response}

@app.post("/ask-async")
async def ask_agent_async(question: str, background_tasks: BackgroundTasks):
    task_id = f"task_{datetime.now().timestamp()}"
    background_tasks.add_task(process_async_task, task_id, question)
    return {"task_id": task_id, "status": "processing"}

async def process_async_task(task_id: str, question: str):
    result = await agent.ask(question)
    # Store result somewhere (database, cache, etc.)
    await store_result(task_id, result)

CLI Application

import click
import asyncio
from zg_ai_sdk import create_agent

@click.command()
@click.option('--question', '-q', help='Question to ask the agent')
@click.option('--stream', '-s', is_flag=True, help='Stream the response')
@click.option('--config', '-c', help='Agent configuration file')
def cli_agent(question, stream, config):
    """CLI interface for 0G AI Agent"""
    asyncio.run(run_cli_agent(question, stream, config))

async def run_cli_agent(question, stream, config_file):
    # Load configuration
    config = load_config(config_file) if config_file else get_default_config()
    
    # Create and initialize agent
    agent = await create_agent(config)
    await agent.init()
    
    if stream:
        def print_chunk(chunk):
            print(chunk, end='', flush=True)
        
        await agent.stream_chat(question, print_chunk)
        print()  # New line after streaming
    else:
        response = await agent.ask(question)
        print(response)

if __name__ == '__main__':
    cli_agent()

Performance Optimization

Connection Pooling

class AgentPool:
    def __init__(self, config, pool_size=5):
        self.config = config
        self.pool_size = pool_size
        self.available_agents = asyncio.Queue()
        self.busy_agents = set()
    
    async def initialize_pool(self):
        """Initialize agent pool"""
        for i in range(self.pool_size):
            agent = await create_agent({
                **self.config,
                'name': f"{self.config['name']} Pool {i}",
                'memory_bucket': f"{self.config['memory_bucket']}-pool-{i}"
            })
            await agent.init()
            await self.available_agents.put(agent)
    
    async def get_agent(self):
        """Get an available agent from the pool"""
        agent = await self.available_agents.get()
        self.busy_agents.add(agent)
        return agent
    
    async def return_agent(self, agent):
        """Return agent to the pool"""
        self.busy_agents.discard(agent)
        await self.available_agents.put(agent)

Caching Results

from functools import wraps
import hashlib
import json

def cache_agent_results(cache_ttl=300):
    """Decorator to cache agent results"""
    cache = {}
    
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Create cache key
            cache_key = hashlib.md5(
                json.dumps(str(args) + str(kwargs)).encode()
            ).hexdigest()
            
            # Check cache
            if cache_key in cache:
                result, timestamp = cache[cache_key]
                if time.time() - timestamp < cache_ttl:
                    return result
            
            # Execute function
            result = await func(*args, **kwargs)
            
            # Cache result
            cache[cache_key] = (result, time.time())
            
            return result
        return wrapper
    return decorator

@cache_agent_results(cache_ttl=600)  # 10 minute cache
async def cached_agent_ask(agent, question):
    return await agent.ask(question)

Monitoring and Logging

import logging
from datetime import datetime

class AgentExecutionMonitor:
    def __init__(self):
        self.logger = logging.getLogger('agent_execution')
        self.execution_log = []
    
    def log_execution_start(self, agent_name, task):
        """Log execution start"""
        log_entry = {
            'agent': agent_name,
            'task': task[:100],  # Truncate long tasks
            'start_time': datetime.now(),
            'status': 'started'
        }
        
        self.execution_log.append(log_entry)
        self.logger.info(f"Agent {agent_name} started task: {task[:50]}...")
        
        return len(self.execution_log) - 1  # Return log index
    
    def log_execution_end(self, log_index, result=None, error=None):
        """Log execution end"""
        if log_index < len(self.execution_log):
            log_entry = self.execution_log[log_index]
            log_entry['end_time'] = datetime.now()
            log_entry['duration'] = (log_entry['end_time'] - log_entry['start_time']).total_seconds()
            
            if error:
                log_entry['status'] = 'failed'
                log_entry['error'] = str(error)
                self.logger.error(f"Agent {log_entry['agent']} failed: {error}")
            else:
                log_entry['status'] = 'completed'
                log_entry['result_length'] = len(result) if result else 0
                self.logger.info(f"Agent {log_entry['agent']} completed in {log_entry['duration']:.2f}s")

Best Practices

  1. Resource Management: Always properly initialize and clean up agents
  2. Error Handling: Implement comprehensive error handling and retry logic
  3. Performance: Use connection pooling and caching for high-throughput applications
  4. Monitoring: Log execution metrics and monitor agent performance
  5. Scalability: Design for horizontal scaling with multiple agent instances
  6. Security: Secure private keys and API credentials properly

Next Steps