Agent Templates & Examples
This document provides example agent implementations and templates to bootstrap your workflow in the Meta Agent Platform.
Overview
The Meta Agent Platform supports multiple agent implementation patterns to accommodate different use cases, runtime environments, and integration needs. This document provides ready-to-use templates for common agent types.

Note: This is a placeholder for an agent templates overview diagram. The actual diagram should be created and added to the project.
1. Python Script Agent
A simple Python class-based agent that can be used as a building block for more complex agents.
Basic Template
# agent_example.py
from typing import Dict, Any
import logging
class ExampleAgent:
"""A simple agent template for the Meta Agent Platform."""
def __init__(self, config: Dict[str, Any]):
"""Initialize the agent with configuration.
Args:
config: Dictionary containing agent configuration
"""
self.config = config
self.name = config.get('name', 'ExampleAgent')
self.version = config.get('version', '1.0.0')
# Configure logging
logging.basicConfig(
level=config.get('log_level', logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(self.name)
self.logger.info(f"Initializing {self.name} v{self.version}")
def validate_input(self, input_data: Dict[str, Any]) -> bool:
"""Validate the input data.
Args:
input_data: The input data to validate
Returns:
bool: True if valid, False otherwise
"""
# Implement input validation logic
required_fields = self.config.get('required_fields', [])
for field in required_fields:
if field not in input_data:
self.logger.error(f"Missing required field: {field}")
return False
return True
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process the input data and return results.
Args:
input_data: The input data to process
Returns:
Dict: The processing results
"""
# Implement your agent's core logic here
self.logger.info(f"Processing input: {input_data}")
# Example processing logic
result = {
"status": "success",
"message": "Processing completed",
"input": input_data,
"agent": f"{self.name} v{self.version}"
}
return result
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run the agent on the provided input data.
Args:
input_data: The input data to process
Returns:
Dict: The processing results or error message
"""
try:
# Validate input
if not self.validate_input(input_data):
return {
"status": "error",
"message": "Invalid input data",
"agent": f"{self.name} v{self.version}"
}
# Process input
result = self.process(input_data)
return result
except Exception as e:
self.logger.exception("Error processing input")
return {
"status": "error",
"message": str(e),
"agent": f"{self.name} v{self.version}"
}
# Example usage
if __name__ == "__main__":
# Create agent configuration
config = {
"name": "TextProcessor",
"version": "1.0.0",
"required_fields": ["text"],
"log_level": logging.INFO
}
# Initialize agent
agent = ExampleAgent(config)
# Run agent with sample input
result = agent.run({"text": "Hello, world!"})
print(result)
Usage Example
# Using the example agent in your application
from agent_example import ExampleAgent
# Create agent configuration
config = {
"name": "TextAnalyzer",
"version": "1.0.0",
"required_fields": ["text"],
"log_level": "INFO",
"model": "sentiment-analysis"
}
# Initialize agent
agent = ExampleAgent(config)
# Process some data
result = agent.run({
"text": "I love this product! It's amazing.",
"options": {"detailed": True}
})
print(result)
2. Dockerized Agent
A containerized agent that can be deployed and scaled independently. This approach provides isolation, reproducibility, and portability.
Directory Structure
dockerized-agent/
├── Dockerfile
├── requirements.txt
├── src/
│ ├── __init__.py
│ ├── agent.py
│ └── utils.py
├── tests/
│ ├── __init__.py
│ └── test_agent.py
└── README.md
Dockerfile
# Dockerfile for agent
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Create non-root user
RUN useradd -m -r agent && \
chown -R agent:agent /app
# Copy application code
COPY src/ /app/src/
# Switch to non-root user
USER agent
# Run the agent
CMD ["python", "-m", "src.agent"]
requirements.txt
Agent Implementation
# src/agent.py
import os
import json
import sys
from typing import Dict, Any, Optional, List
from loguru import logger
from pydantic import BaseModel, Field
# Configure logging
logger.remove()
logger.add(sys.stderr, level=os.environ.get("LOG_LEVEL", "INFO"))
# Define input/output models
class AgentInput(BaseModel):
"""Input model for the agent."""
query: str = Field(..., description="The query to process")
parameters: Optional[Dict[str, Any]] = Field(default={}, description="Additional parameters")
class AgentOutput(BaseModel):
"""Output model for the agent."""
result: str = Field(..., description="The processing result")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score")
metadata: Optional[Dict[str, Any]] = Field(default={}, description="Additional metadata")
class DockerizedAgent:
"""A containerized agent for the Meta Agent Platform."""
def __init__(self, config_path: Optional[str] = None):
"""Initialize the agent with configuration.
Args:
config_path: Path to configuration file (optional)
"""
# Load configuration
self.config = self._load_config(config_path)
self.name = self.config.get("name", "DockerizedAgent")
self.version = self.config.get("version", "1.0.0")
logger.info(f"Initializing {self.name} v{self.version}")
def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
"""Load configuration from file or environment.
Args:
config_path: Path to configuration file
Returns:
Dict: Configuration dictionary
"""
config = {}
# Try to load from file if provided
if config_path and os.path.exists(config_path):
with open(config_path, "r") as f:
config = json.load(f)
# Override with environment variables
env_prefix = "AGENT_"
for key, value in os.environ.items():
if key.startswith(env_prefix):
config_key = key[len(env_prefix):].lower()
config[config_key] = value
return config
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process the input data.
Args:
input_data: Input data dictionary
Returns:
Dict: Processing results
"""
try:
# Parse and validate input
agent_input = AgentInput(**input_data)
# Process the query (implement your logic here)
logger.info(f"Processing query: {agent_input.query}")
# Example processing logic
result = f"Processed: {agent_input.query}"
confidence = 0.95
# Create and validate output
output = AgentOutput(
result=result,
confidence=confidence,
metadata={
"agent": f"{self.name} v{self.version}",
"parameters": agent_input.parameters
}
)
return output.model_dump()
except Exception as e:
logger.exception("Error processing input")
return {
"error": str(e),
"agent": f"{self.name} v{self.version}"
}
# Main entry point
def main():
"""Main entry point for the containerized agent."""
# Get configuration path from environment
config_path = os.environ.get("AGENT_CONFIG_PATH")
# Initialize agent
agent = DockerizedAgent(config_path)
# Read input from stdin or environment
input_data = {}
if os.environ.get("AGENT_INPUT"):
input_data = json.loads(os.environ.get("AGENT_INPUT", "{}"))
else:
try:
for line in sys.stdin:
input_data = json.loads(line)
break
except json.JSONDecodeError:
logger.error("Invalid JSON input")
# Process input
result = agent.process(input_data)
# Write output to stdout
print(json.dumps(result))
if __name__ == "__main__":
main()
Building and Running
# Build the Docker image
docker build -t meta-agent-platform/dockerized-agent:1.0.0 .
# Run the agent with input from file
docker run --rm -v $(pwd)/input.json:/input.json \
meta-agent-platform/dockerized-agent:1.0.0 \
python -m src.agent < /input.json
# Run the agent with environment variables
docker run --rm \
-e AGENT_NAME="CustomAgent" \
-e AGENT_VERSION="1.1.0" \
-e AGENT_INPUT='{"query": "What is the weather?", "parameters": {"location": "New York"}}' \
meta-agent-platform/dockerized-agent:1.0.0
3. API Agent (FastAPI)
A RESTful API agent that can be deployed as a service and integrated with other systems via HTTP requests.
Directory Structure
api-agent/
├── Dockerfile
├── requirements.txt
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── models.py
│ ├── agent.py
│ └── config.py
├── tests/
│ ├── __init__.py
│ └── test_api.py
└── README.md
API Implementation
# app/main.py
from fastapi import FastAPI, Request, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Optional, Dict, Any
import time
import uuid
import logging
from .models import AgentRequest, AgentResponse, ErrorResponse
from .agent import ApiAgent
from .config import Settings, get_settings
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("api-agent")
# Create FastAPI app
app = FastAPI(
title="Meta Agent Platform API Agent",
description="A RESTful API agent for the Meta Agent Platform",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Restrict in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Create agent instance
agent = ApiAgent()
# Dependency for API key validation
async def verify_api_key(x_api_key: str = Header(None), settings: Settings = Depends(get_settings)):
if settings.api_key_required and x_api_key != settings.api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
return x_api_key
# Request ID middleware
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# Rate limiting middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
# Simple in-memory rate limiting (use Redis in production)
client_ip = request.client.host
settings = get_settings()
# Skip rate limiting if disabled
if not settings.rate_limit_enabled:
return await call_next(request)
# Check rate limit
current_time = time.time()
request_count = getattr(request.app.state, f"rate_limit_{client_ip}", 0)
last_request_time = getattr(request.app.state, f"last_request_{client_ip}", 0)
# Reset counter if window has passed
if current_time - last_request_time > settings.rate_limit_window:
request_count = 0
# Increment counter
request_count += 1
setattr(request.app.state, f"rate_limit_{client_ip}", request_count)
setattr(request.app.state, f"last_request_{client_ip}", current_time)
# Check if over limit
if request_count > settings.rate_limit_max_requests:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded", "retry_after": settings.rate_limit_window}
)
return await call_next(request)
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": app.version}
# Agent endpoint
@app.post("/run", response_model=AgentResponse, responses={400: {"model": ErrorResponse}, 401: {"model": ErrorResponse}, 429: {"model": ErrorResponse}, 500: {"model": ErrorResponse}})
async def run_agent(request: AgentRequest, request_id: str = Depends(verify_api_key)):
try:
logger.info(f"Processing request {request_id}")
# Process the request
result = agent.process(request.dict())
return AgentResponse(
request_id=request_id,
result=result["result"],
confidence=result.get("confidence", 1.0),
metadata=result.get("metadata", {})
)
except Exception as e:
logger.exception(f"Error processing request {request_id}")
raise HTTPException(status_code=500, detail=str(e))
# Async batch processing endpoint
@app.post("/batch")
async def batch_process(requests: list[AgentRequest], request_id: str = Depends(verify_api_key)):
results = []
for req in requests:
try:
result = agent.process(req.dict())
results.append({
"success": True,
"result": result
})
except Exception as e:
results.append({
"success": False,
"error": str(e)
})
return {"batch_results": results, "request_id": request_id}
Models
# app/models.py
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional
class AgentRequest(BaseModel):
"""Request model for the agent API."""
query: str = Field(..., description="The query to process")
parameters: Optional[Dict[str, Any]] = Field(default={}, description="Additional parameters")
class AgentResponse(BaseModel):
"""Response model for the agent API."""
request_id: str = Field(..., description="Unique request identifier")
result: str = Field(..., description="The processing result")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score")
metadata: Dict[str, Any] = Field(default={}, description="Additional metadata")
class ErrorResponse(BaseModel):
"""Error response model."""
error: str = Field(..., description="Error message")
detail: Optional[str] = Field(None, description="Detailed error information")
Agent Implementation
# app/agent.py
from typing import Dict, Any
import logging
logger = logging.getLogger("api-agent")
class ApiAgent:
"""API agent implementation."""
def __init__(self):
"""Initialize the agent."""
self.name = "API Agent"
self.version = "1.0.0"
logger.info(f"Initializing {self.name} v{self.version}")
def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process the input data.
Args:
input_data: Input data dictionary
Returns:
Dict: Processing results
"""
query = input_data.get("query", "")
parameters = input_data.get("parameters", {})
logger.info(f"Processing query: {query}")
# Implement your agent logic here
# This is a simple example that echoes the input
result = f"Processed: {query}"
return {
"result": result,
"confidence": 0.95,
"metadata": {
"agent": f"{self.name} v{self.version}",
"parameters": parameters
}
}
Configuration
# app/config.py
from pydantic import BaseSettings
from functools import lru_cache
import os
class Settings(BaseSettings):
"""Application settings."""
# API settings
api_key_required: bool = True
api_key: str = "your-api-key-here"
# Rate limiting
rate_limit_enabled: bool = True
rate_limit_max_requests: int = 100
rate_limit_window: int = 60 # seconds
# Agent settings
agent_name: str = "API Agent"
agent_version: str = "1.0.0"
class Config:
env_file = ".env"
env_prefix = "AGENT_"
@lru_cache()
def get_settings() -> Settings:
"""Get application settings."""
return Settings()
Dockerfile
# Dockerfile for API agent
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONPATH=/app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Create non-root user
RUN useradd -m -r agent && \
chown -R agent:agent /app
# Copy application code
COPY app/ /app/app/
# Switch to non-root user
USER agent
# Expose port
EXPOSE 8000
# Run the API server
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
requirements.txt
Usage Example
# Build and run the API agent
docker build -t meta-agent-platform/api-agent:1.0.0 .
docker run -p 8000:8000 -e AGENT_API_KEY=my-secret-key meta-agent-platform/api-agent:1.0.0
# Make a request to the agent
curl -X POST http://localhost:8000/run \
-H "Content-Type: application/json" \
-H "X-API-Key: my-secret-key" \
-d '{"query": "What is the weather?", "parameters": {"location": "New York"}}'
4. Cloud-Based Agent (Template)
A client-side agent that delegates processing to a cloud service, ideal for resource-intensive tasks or when leveraging managed AI services.
Directory Structure
cloud-agent/
├── requirements.txt
├── cloud_agent.py
├── config.py
├── exceptions.py
├── tests/
│ ├── __init__.py
│ └── test_cloud_agent.py
└── README.md
Agent Implementation
# cloud_agent.py
import requests
import json
import time
import logging
from typing import Dict, Any, Optional, Union
from requests.exceptions import RequestException, Timeout
from exceptions import CloudAgentError, AuthenticationError, RateLimitError, ServiceUnavailableError
from config import CloudAgentConfig
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("cloud-agent")
class CloudAgent:
"""Agent that delegates processing to a cloud service."""
def __init__(self, config: Optional[Union[Dict[str, Any], CloudAgentConfig]] = None):
"""Initialize the cloud agent.
Args:
config: Agent configuration (dict or CloudAgentConfig)
"""
# Initialize configuration
if config is None:
self.config = CloudAgentConfig()
elif isinstance(config, dict):
self.config = CloudAgentConfig(**config)
else:
self.config = config
# Initialize agent properties
self.name = "Cloud Agent"
self.version = "1.0.0"
# Initialize session
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json",
"User-Agent": f"{self.name}/{self.version}"
})
logger.info(f"Initialized {self.name} v{self.version} with endpoint {self.config.endpoint}")
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run the agent on the provided input data.
Args:
input_data: The input data to process
Returns:
Dict: The processing results
Raises:
CloudAgentError: If an error occurs during processing
"""
try:
# Prepare request payload
payload = self._prepare_payload(input_data)
# Send request to cloud service
response = self._send_request(payload)
# Process response
result = self._process_response(response)
return result
except CloudAgentError:
# Re-raise specific cloud agent errors
raise
except Exception as e:
# Wrap other exceptions
logger.exception("Unexpected error during agent execution")
raise CloudAgentError(f"Unexpected error: {str(e)}") from e
def _prepare_payload(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare the request payload.
Args:
input_data: The input data to process
Returns:
Dict: The prepared payload
"""
# Add any additional fields required by the cloud service
payload = {
"input": input_data,
"options": self.config.options,
"version": self.config.service_version
}
return payload
def _send_request(self, payload: Dict[str, Any]) -> requests.Response:
"""Send request to the cloud service.
Args:
payload: The request payload
Returns:
Response: The HTTP response
Raises:
AuthenticationError: If authentication fails
RateLimitError: If rate limit is exceeded
ServiceUnavailableError: If service is unavailable
CloudAgentError: For other request errors
"""
try:
# Send request with retry logic
retries = 0
while retries <= self.config.max_retries:
try:
response = self.session.post(
self.config.endpoint,
json=payload,
timeout=self.config.timeout
)
# Check for specific error status codes
if response.status_code == 401:
raise AuthenticationError("Authentication failed. Check your API key.")
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", self.config.retry_delay))
if retries < self.config.max_retries:
logger.warning(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
time.sleep(retry_after)
retries += 1
continue
else:
raise RateLimitError("Rate limit exceeded. Try again later.")
elif response.status_code >= 500:
if retries < self.config.max_retries:
delay = self.config.retry_delay * (2 ** retries) # Exponential backoff
logger.warning(f"Service error. Retrying after {delay} seconds.")
time.sleep(delay)
retries += 1
continue
else:
raise ServiceUnavailableError("Service is currently unavailable. Try again later.")
elif response.status_code != 200:
# Handle other error status codes
error_message = response.json().get("error", f"Request failed with status code {response.status_code}")
raise CloudAgentError(error_message)
# Success - return response
return response
except Timeout:
if retries < self.config.max_retries:
delay = self.config.retry_delay * (2 ** retries) # Exponential backoff
logger.warning(f"Request timeout. Retrying after {delay} seconds.")
time.sleep(delay)
retries += 1
else:
raise CloudAgentError("Request timed out after multiple retries.")
# Should not reach here, but just in case
raise CloudAgentError("Failed to send request after maximum retries.")
except RequestException as e:
# Handle request exceptions
logger.exception("Error sending request to cloud service")
raise CloudAgentError(f"Request error: {str(e)}") from e
def _process_response(self, response: requests.Response) -> Dict[str, Any]:
"""Process the response from the cloud service.
Args:
response: The HTTP response
Returns:
Dict: The processed result
Raises:
CloudAgentError: If response processing fails
"""
try:
# Parse JSON response
result = response.json()
# Add agent metadata
result["metadata"] = result.get("metadata", {})
result["metadata"]["agent"] = f"{self.name} v{self.version}"
return result
except json.JSONDecodeError as e:
logger.exception("Error decoding JSON response")
raise CloudAgentError(f"Invalid JSON response: {str(e)}") from e
Configuration
# config.py
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field
import os
class CloudAgentConfig(BaseModel):
"""Configuration for the cloud agent."""
# API settings
api_key: str = Field(
default_factory=lambda: os.environ.get("CLOUD_AGENT_API_KEY", ""),
description="API key for the cloud service"
)
endpoint: str = Field(
default_factory=lambda: os.environ.get(
"CLOUD_AGENT_ENDPOINT",
"https://api.example.com/agent/run"
),
description="Endpoint URL for the cloud service"
)
service_version: str = Field(
default="v1",
description="Version of the cloud service API"
)
# Request settings
timeout: int = Field(
default=30,
description="Request timeout in seconds"
)
max_retries: int = Field(
default=3,
description="Maximum number of retry attempts"
)
retry_delay: int = Field(
default=1,
description="Initial delay between retries in seconds"
)
# Agent options
options: Dict[str, Any] = Field(
default_factory=dict,
description="Additional options for the cloud service"
)
Exceptions
# exceptions.py
class CloudAgentError(Exception):
"""Base exception for cloud agent errors."""
pass
class AuthenticationError(CloudAgentError):
"""Raised when authentication fails."""
pass
class RateLimitError(CloudAgentError):
"""Raised when rate limit is exceeded."""
pass
class ServiceUnavailableError(CloudAgentError):
"""Raised when the service is unavailable."""
pass
Usage Example
# Example usage of the cloud agent
from cloud_agent import CloudAgent
from config import CloudAgentConfig
# Configure the agent
config = CloudAgentConfig(
api_key="your-api-key-here",
endpoint="https://api.example.com/agent/run",
service_version="v2",
timeout=60,
options={
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 1000
}
)
# Initialize the agent
agent = CloudAgent(config)
# Run the agent
try:
result = agent.run({
"query": "Explain quantum computing in simple terms",
"context": "The user is a high school student"
})
print(f"Result: {result['result']}")
print(f"Confidence: {result.get('confidence', 'N/A')}")
except Exception as e:
print(f"Error: {str(e)}")
5. Event-Driven Agent (Template)
An agent that responds to events from message queues, webhooks, or other event sources, ideal for asynchronous processing and integration with event-driven architectures.
Directory Structure
event-agent/
├── Dockerfile
├── requirements.txt
├── src/
│ ├── __init__.py
│ ├── event_agent.py
│ ├── event_handlers.py
│ ├── models.py
│ └── config.py
├── tests/
│ ├── __init__.py
│ └── test_event_agent.py
└── README.md
Agent Implementation
# src/event_agent.py
import asyncio
import json
import logging
from typing import Dict, Any, Optional, Callable, List
from .models import Event, EventResult
from .config import EventAgentConfig
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("event-agent")
class EventAgent:
"""Agent that processes events asynchronously."""
def __init__(self, config: Optional[EventAgentConfig] = None):
"""Initialize the event agent.
Args:
config: Agent configuration
"""
self.config = config or EventAgentConfig()
self.name = "Event Agent"
self.version = "1.0.0"
# Initialize event handlers registry
self.event_handlers: Dict[str, List[Callable]] = {}
logger.info(f"Initialized {self.name} v{self.version}")
def register_handler(self, event_type: str, handler: Callable):
"""Register a handler for a specific event type.
Args:
event_type: Type of event to handle
handler: Function to call when event is received
"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
logger.info(f"Registered handler for event type: {event_type}")
async def on_event(self, event_data: Dict[str, Any]) -> EventResult:
"""Handle an incoming event asynchronously.
Args:
event_data: Event data dictionary
Returns:
EventResult: Result of event processing
"""
try:
# Parse event
event = Event(**event_data)
logger.info(f"Handling event {event.id} of type {event.type}")
# Find handlers for this event type
handlers = self.event_handlers.get(event.type, [])
# Also check for wildcard handlers
wildcard_handlers = self.event_handlers.get("*", [])
handlers.extend(wildcard_handlers)
if not handlers:
logger.warning(f"No handlers registered for event type: {event.type}")
return EventResult(
event_id=event.id,
success=False,
message=f"No handlers for event type: {event.type}",
result=None
)
# Execute all handlers asynchronously
tasks = []
for handler in handlers:
if asyncio.iscoroutinefunction(handler):
# Async handler
tasks.append(handler(event))
else:
# Sync handler - wrap in a coroutine
tasks.append(asyncio.to_thread(handler, event))
# Wait for all handlers to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
handler_results = []
for i, result in enumerate(results):
handler_name = handlers[i].__name__
if isinstance(result, Exception):
logger.error(f"Error in handler {handler_name}: {result}")
handler_results.append({
"handler": handler_name,
"success": False,
"error": str(result)
})
else:
handler_results.append({
"handler": handler_name,
"success": True,
"result": result
})
# Determine overall success
success = any(r["success"] for r in handler_results)
return EventResult(
event_id=event.id,
success=success,
message="Event processed" if success else "Event processing failed",
result={
"handler_results": handler_results
}
)
except Exception as e:
logger.exception("Error handling event")
return EventResult(
event_id=event_data.get("id", "unknown"),
success=False,
message=f"Error: {str(e)}",
result=None
)
async def start(self):
"""Start the event agent."""
logger.info(f"Starting {self.name} v{self.version}")
# Start event listeners based on configuration
tasks = []
if self.config.kafka_enabled:
tasks.append(self._start_kafka_consumer())
if self.config.rabbitmq_enabled:
tasks.append(self._start_rabbitmq_consumer())
if self.config.webhook_enabled:
tasks.append(self._start_webhook_server())
logger.info(f"{self.name} started successfully")
# Wait for all listeners
await asyncio.gather(*tasks)
async def _start_kafka_consumer(self):
"""Start Kafka consumer."""
try:
from aiokafka import AIOKafkaConsumer
consumer = AIOKafkaConsumer(
*self.config.kafka_topics,
bootstrap_servers=self.config.kafka_bootstrap_servers,
group_id=self.config.kafka_group_id,
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
await consumer.start()
logger.info(f"Kafka consumer started. Listening to topics: {self.config.kafka_topics}")
try:
async for message in consumer:
try:
event_data = message.value
# Add Kafka-specific metadata
event_data["metadata"] = event_data.get("metadata", {})
event_data["metadata"]["kafka"] = {
"topic": message.topic,
"partition": message.partition,
"offset": message.offset
}
# Handle the event
await self.on_event(event_data)
except Exception as e:
logger.exception(f"Error processing Kafka message: {e}")
finally:
await consumer.stop()
except ImportError:
logger.error("Kafka support requires aiokafka package. Install with: pip install aiokafka")
async def _start_rabbitmq_consumer(self):
"""Start RabbitMQ consumer."""
try:
import aio_pika
connection = await aio_pika.connect_robust(
f"amqp://guest:guest@{self.config.rabbitmq_host}/"
)
channel = await connection.channel()
# Declare exchange
exchange = await channel.declare_exchange(
self.config.rabbitmq_exchange,
aio_pika.ExchangeType.TOPIC
)
# Declare queue
queue = await channel.declare_queue(exclusive=True)
# Bind to routing keys
for binding_key in self.config.rabbitmq_routing_keys:
await queue.bind(exchange, binding_key)
logger.info(f"RabbitMQ consumer started. Listening to exchange: {self.config.rabbitmq_exchange}")
async def process_message(message):
async with message.process():
try:
event_data = json.loads(message.body.decode())
# Add RabbitMQ-specific metadata
event_data["metadata"] = event_data.get("metadata", {})
event_data["metadata"]["rabbitmq"] = {
"exchange": self.config.rabbitmq_exchange,
"routing_key": message.routing_key
}
# Handle the event
await self.on_event(event_data)
except Exception as e:
logger.exception(f"Error processing RabbitMQ message: {e}")
await queue.consume(process_message)
# Keep the consumer running
await asyncio.Future()
except ImportError:
logger.error("RabbitMQ support requires aio_pika package. Install with: pip install aio_pika")
async def _start_webhook_server(self):
"""Start webhook server."""
try:
from aiohttp import web
app = web.Application()
async def webhook_handler(request):
try:
event_data = await request.json()
# Add webhook-specific metadata
event_data["metadata"] = event_data.get("metadata", {})
event_data["metadata"]["webhook"] = {
"headers": dict(request.headers),
"remote": request.remote
}
# Handle the event
result = await self.on_event(event_data)
return web.json_response(result.dict())
except Exception as e:
logger.exception(f"Error processing webhook: {e}")
return web.json_response({"error": str(e)}, status=500)
app.router.add_post('/webhook', webhook_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(
runner,
self.config.webhook_host,
self.config.webhook_port
)
await site.start()
logger.info(f"Webhook server started at http://{self.config.webhook_host}:{self.config.webhook_port}/webhook")
# Keep the server running
await asyncio.Future()
except ImportError:
logger.error("Webhook support requires aiohttp package. Install with: pip install aiohttp")
Models
# src/models.py
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional
class Event(BaseModel):
"""Event model."""
id: str = Field(..., description="Unique event identifier")
type: str = Field(..., description="Event type")
source: str = Field(..., description="Event source")
time: str = Field(..., description="Event timestamp (ISO format)")
data: Dict[str, Any] = Field(default_factory=dict, description="Event data")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata")
class EventResult(BaseModel):
"""Event processing result."""
event_id: str = Field(..., description="ID of the processed event")
success: bool = Field(..., description="Whether processing was successful")
message: str = Field(..., description="Result message")
result: Optional[Dict[str, Any]] = Field(None, description="Processing result")
Configuration
# src/config.py
from pydantic import BaseSettings, Field
from typing import List
class EventAgentConfig(BaseSettings):
"""Configuration for the event agent."""
# Kafka settings
kafka_enabled: bool = Field(default=False, description="Enable Kafka consumer")
kafka_bootstrap_servers: List[str] = Field(default=["localhost:9092"], description="Kafka bootstrap servers")
kafka_topics: List[str] = Field(default=["events"], description="Kafka topics to consume")
kafka_group_id: str = Field(default="event-agent", description="Kafka consumer group ID")
# RabbitMQ settings
rabbitmq_enabled: bool = Field(default=False, description="Enable RabbitMQ consumer")
rabbitmq_host: str = Field(default="localhost", description="RabbitMQ host")
rabbitmq_exchange: str = Field(default="events", description="RabbitMQ exchange name")
rabbitmq_routing_keys: List[str] = Field(default=["#"], description="RabbitMQ routing keys")
# Webhook settings
webhook_enabled: bool = Field(default=True, description="Enable webhook server")
webhook_host: str = Field(default="0.0.0.0", description="Webhook server host")
webhook_port: int = Field(default=5000, description="Webhook server port")
# Agent settings
log_level: str = Field(default="INFO", description="Logging level")
class Config:
env_file = ".env"
env_prefix = "EVENT_AGENT_"
Usage Example
# Example usage of the event agent
import asyncio
from src.event_agent import EventAgent
from src.config import EventAgentConfig
from src.models import Event
# Define event handlers
async def log_event_handler(event: Event):
"""Simple handler that logs the event."""
print(f"Received event: {event.id} of type {event.type}")
return {"message": "Event logged successfully"}
async def process_user_event(event: Event):
"""Handler for user events."""
print(f"Processing user event: {event.data}")
# Simulate some processing time
await asyncio.sleep(1)
return {"user_processed": True}
# Configure the agent
config = EventAgentConfig(
kafka_enabled=True,
kafka_topics=["user-events"],
webhook_enabled=True,
webhook_port=8080
)
# Main function
async def main():
# Initialize the agent
agent = EventAgent(config)
# Register event handlers
agent.register_handler("*", log_event_handler) # Log all events
agent.register_handler("user.created", process_user_event)
# Test with a sample event
test_event = {
"id": "evt-123",
"type": "user.created",
"source": "user-service",
"time": "2025-04-18T12:34:56Z",
"data": {
"user_id": "usr-456",
"username": "johndoe",
"email": "john@example.com"
}
}
# Process the test event
result = await agent.on_event(test_event)
print(f"Event processing result: {result}")
# Start the agent (this will run indefinitely)
await agent.start()
# Run the agent
if __name__ == "__main__":
asyncio.run(main())
6. Integration Guide: Adding New Frameworks
- Identify the agent framework's interface (input/output, lifecycle).
- Write an adapter class that wraps the framework's agent for use in your orchestrator.
- Add configuration and example usage to this document.
Example: LangChain Integration
# langchain_adapter.py
from typing import Dict, Any, Optional
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
class LangChainAdapter:
"""Adapter for LangChain framework."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""Initialize the LangChain adapter.
Args:
config: Configuration dictionary
"""
self.config = config or {}
# Initialize LangChain components
self.llm = OpenAI(
temperature=self.config.get("temperature", 0.7),
model_name=self.config.get("model_name", "gpt-3.5-turbo"),
max_tokens=self.config.get("max_tokens", 500)
)
# Create default prompt template
template = self.config.get("prompt_template", "{input}")
self.prompt = PromptTemplate(template=template, input_variables=["input"])
# Create chain
self.chain = LLMChain(llm=self.llm, prompt=self.prompt)
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run the LangChain agent.
Args:
input_data: Input data dictionary
Returns:
Dict: Processing results
"""
# Extract input text
input_text = input_data.get("input", "")
# Run the chain
result = self.chain.run(input=input_text)
# Return formatted result
return {
"result": result,
"model": self.config.get("model_name", "gpt-3.5-turbo"),
"metadata": {
"framework": "langchain",
"temperature": self.config.get("temperature", 0.7),
"max_tokens": self.config.get("max_tokens", 500)
}
}
Example: Hugging Face Integration
# huggingface_adapter.py
from typing import Dict, Any, Optional, List
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
class HuggingFaceAdapter:
"""Adapter for Hugging Face Transformers."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""Initialize the Hugging Face adapter.
Args:
config: Configuration dictionary
"""
self.config = config or {}
# Get model name
self.model_name = self.config.get("model_name", "gpt2")
# Initialize tokenizer and model
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForCausalLM.from_pretrained(self.model_name)
# Move to GPU if available
if torch.cuda.is_available() and self.config.get("use_gpu", True):
self.model = self.model.to("cuda")
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Run the Hugging Face model.
Args:
input_data: Input data dictionary
Returns:
Dict: Processing results
"""
# Extract input text
input_text = input_data.get("input", "")
max_length = self.config.get("max_length", 100)
# Tokenize input
inputs = self.tokenizer(input_text, return_tensors="pt")
if torch.cuda.is_available() and self.config.get("use_gpu", True):
inputs = inputs.to("cuda")
# Generate output
outputs = self.model.generate(
inputs["input_ids"],
max_length=max_length,
num_return_sequences=1,
temperature=self.config.get("temperature", 1.0),
top_p=self.config.get("top_p", 0.9),
do_sample=self.config.get("do_sample", True)
)
# Decode output
generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Return formatted result
return {
"result": generated_text,
"model": self.model_name,
"metadata": {
"framework": "huggingface",
"max_length": max_length,
"temperature": self.config.get("temperature", 1.0),
"top_p": self.config.get("top_p", 0.9)
}
}
References
- Agent Development Guide
- Execution & Runtimes
- Registry & Metadata
- Marketplace Guide
- Security & Compliance
- Multi-Modal Agent Development
- Edge Agent Development
- Federated Agent Development
Last updated: 2025-04-18