The Memory system provides efficient retrieval of both ephemeral and persistent data. Data is automatically cached for better performance and supports various retrieval patterns for different use cases.
async def get_nested_data(agent, base_key): """Retrieve hierarchical data structure""" # Get the main object main_data = await agent.recall(base_key) if not main_data: return None # Get related objects if 'related_keys' in main_data: for related_key in main_data['related_keys']: related_data = await agent.recall(related_key) main_data[f'related_{related_key}'] = related_data return main_data# Usageproject_data = await get_nested_data(agent, 'project_001')
async def get_with_fallback(agent, primary_key, fallback_key, default_value=None): """Try primary key, then fallback, then default""" # Try primary key data = await agent.recall(primary_key) if data is not None: return data # Try fallback key data = await agent.recall(fallback_key) if data is not None: return data # Return default return default_value# Usageuser_theme = await get_with_fallback( agent, 'user_123_theme', 'default_theme', 'light')
async def get_latest_version(agent, base_key): """Get the latest version of versioned data""" # Get version index version_index = await agent.recall(f'{base_key}_versions') or [] if not version_index: return None # Get latest version latest_version = max(version_index, key=lambda v: v['timestamp']) return await agent.recall(f"{base_key}_v{latest_version['version']}")# Usagelatest_document = await get_latest_version(agent, 'document_001')
from zg_ai_sdk import SDKErrorasync def safe_recall(agent, key, expected_type=None, validator=None): """Safely retrieve and validate data""" try: data = await agent.recall(key) if data is None: return None # Type checking if expected_type and not isinstance(data, expected_type): print(f"Warning: Expected {expected_type}, got {type(data)}") return None # Custom validation if validator and not validator(data): print(f"Warning: Data validation failed for key {key}") return None return data except SDKError as e: print(f"Storage error retrieving {key}: {e.message}") return None except Exception as e: print(f"Unexpected error retrieving {key}: {e}") return None# Usage with validationdef validate_user_data(data): required_fields = ['name', 'email'] return all(field in data for field in required_fields)user_data = await safe_recall( agent, 'user_123', expected_type=dict, validator=validate_user_data)