Skip to content

Agent Templates & Examples

This document provides example agent implementations and templates to bootstrap your workflow in the Meta Agent Platform.

Overview

The Meta Agent Platform supports multiple agent implementation patterns to accommodate different use cases, runtime environments, and integration needs. This document provides ready-to-use templates for common agent types.

Agent Templates Overview

Note: This is a placeholder for an agent templates overview diagram. The actual diagram should be created and added to the project.

1. Python Script Agent

A simple Python class-based agent that can be used as a building block for more complex agents.

Basic Template

# agent_example.py
from typing import Dict, Any
import logging

class ExampleAgent:
    """A simple agent template for the Meta Agent Platform."""

    def __init__(self, config: Dict[str, Any]):
        """Initialize the agent with configuration.

        Args:
            config: Dictionary containing agent configuration
        """
        self.config = config
        self.name = config.get('name', 'ExampleAgent')
        self.version = config.get('version', '1.0.0')

        # Configure logging
        logging.basicConfig(
            level=config.get('log_level', logging.INFO),
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(self.name)

        self.logger.info(f"Initializing {self.name} v{self.version}")

    def validate_input(self, input_data: Dict[str, Any]) -> bool:
        """Validate the input data.

        Args:
            input_data: The input data to validate

        Returns:
            bool: True if valid, False otherwise
        """
        # Implement input validation logic
        required_fields = self.config.get('required_fields', [])
        for field in required_fields:
            if field not in input_data:
                self.logger.error(f"Missing required field: {field}")
                return False
        return True

    def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process the input data and return results.

        Args:
            input_data: The input data to process

        Returns:
            Dict: The processing results
        """
        # Implement your agent's core logic here
        self.logger.info(f"Processing input: {input_data}")

        # Example processing logic
        result = {
            "status": "success",
            "message": "Processing completed",
            "input": input_data,
            "agent": f"{self.name} v{self.version}"
        }

        return result

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Run the agent on the provided input data.

        Args:
            input_data: The input data to process

        Returns:
            Dict: The processing results or error message
        """
        try:
            # Validate input
            if not self.validate_input(input_data):
                return {
                    "status": "error",
                    "message": "Invalid input data",
                    "agent": f"{self.name} v{self.version}"
                }

            # Process input
            result = self.process(input_data)
            return result

        except Exception as e:
            self.logger.exception("Error processing input")
            return {
                "status": "error",
                "message": str(e),
                "agent": f"{self.name} v{self.version}"
            }

# Example usage
if __name__ == "__main__":
    # Create agent configuration
    config = {
        "name": "TextProcessor",
        "version": "1.0.0",
        "required_fields": ["text"],
        "log_level": logging.INFO
    }

    # Initialize agent
    agent = ExampleAgent(config)

    # Run agent with sample input
    result = agent.run({"text": "Hello, world!"})
    print(result)

Usage Example

# Using the example agent in your application
from agent_example import ExampleAgent

# Create agent configuration
config = {
    "name": "TextAnalyzer",
    "version": "1.0.0",
    "required_fields": ["text"],
    "log_level": "INFO",
    "model": "sentiment-analysis"
}

# Initialize agent
agent = ExampleAgent(config)

# Process some data
result = agent.run({
    "text": "I love this product! It's amazing.",
    "options": {"detailed": True}
})

print(result)

2. Dockerized Agent

A containerized agent that can be deployed and scaled independently. This approach provides isolation, reproducibility, and portability.

Directory Structure

dockerized-agent/
├── Dockerfile
├── requirements.txt
├── src/
│   ├── __init__.py
│   ├── agent.py
│   └── utils.py
├── tests/
│   ├── __init__.py
│   └── test_agent.py
└── README.md

Dockerfile

# Dockerfile for agent
FROM python:3.11-slim

# Set working directory
WORKDIR /app

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=/app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Create non-root user
RUN useradd -m -r agent && \
    chown -R agent:agent /app

# Copy application code
COPY src/ /app/src/

# Switch to non-root user
USER agent

# Run the agent
CMD ["python", "-m", "src.agent"]

requirements.txt

pydantic==2.5.2
requests==2.31.0
python-dotenv==1.0.0
loguru==0.7.2

Agent Implementation

# src/agent.py
import os
import json
import sys
from typing import Dict, Any, Optional, List
from loguru import logger
from pydantic import BaseModel, Field

# Configure logging
logger.remove()
logger.add(sys.stderr, level=os.environ.get("LOG_LEVEL", "INFO"))

# Define input/output models
class AgentInput(BaseModel):
    """Input model for the agent."""
    query: str = Field(..., description="The query to process")
    parameters: Optional[Dict[str, Any]] = Field(default={}, description="Additional parameters")

class AgentOutput(BaseModel):
    """Output model for the agent."""
    result: str = Field(..., description="The processing result")
    confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score")
    metadata: Optional[Dict[str, Any]] = Field(default={}, description="Additional metadata")

class DockerizedAgent:
    """A containerized agent for the Meta Agent Platform."""

    def __init__(self, config_path: Optional[str] = None):
        """Initialize the agent with configuration.

        Args:
            config_path: Path to configuration file (optional)
        """
        # Load configuration
        self.config = self._load_config(config_path)
        self.name = self.config.get("name", "DockerizedAgent")
        self.version = self.config.get("version", "1.0.0")

        logger.info(f"Initializing {self.name} v{self.version}")

    def _load_config(self, config_path: Optional[str]) -> Dict[str, Any]:
        """Load configuration from file or environment.

        Args:
            config_path: Path to configuration file

        Returns:
            Dict: Configuration dictionary
        """
        config = {}

        # Try to load from file if provided
        if config_path and os.path.exists(config_path):
            with open(config_path, "r") as f:
                config = json.load(f)

        # Override with environment variables
        env_prefix = "AGENT_"
        for key, value in os.environ.items():
            if key.startswith(env_prefix):
                config_key = key[len(env_prefix):].lower()
                config[config_key] = value

        return config

    def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process the input data.

        Args:
            input_data: Input data dictionary

        Returns:
            Dict: Processing results
        """
        try:
            # Parse and validate input
            agent_input = AgentInput(**input_data)

            # Process the query (implement your logic here)
            logger.info(f"Processing query: {agent_input.query}")

            # Example processing logic
            result = f"Processed: {agent_input.query}"
            confidence = 0.95

            # Create and validate output
            output = AgentOutput(
                result=result,
                confidence=confidence,
                metadata={
                    "agent": f"{self.name} v{self.version}",
                    "parameters": agent_input.parameters
                }
            )

            return output.model_dump()

        except Exception as e:
            logger.exception("Error processing input")
            return {
                "error": str(e),
                "agent": f"{self.name} v{self.version}"
            }

# Main entry point
def main():
    """Main entry point for the containerized agent."""
    # Get configuration path from environment
    config_path = os.environ.get("AGENT_CONFIG_PATH")

    # Initialize agent
    agent = DockerizedAgent(config_path)

    # Read input from stdin or environment
    input_data = {}
    if os.environ.get("AGENT_INPUT"):
        input_data = json.loads(os.environ.get("AGENT_INPUT", "{}"))
    else:
        try:
            for line in sys.stdin:
                input_data = json.loads(line)
                break
        except json.JSONDecodeError:
            logger.error("Invalid JSON input")

    # Process input
    result = agent.process(input_data)

    # Write output to stdout
    print(json.dumps(result))

if __name__ == "__main__":
    main()

Building and Running

# Build the Docker image
docker build -t meta-agent-platform/dockerized-agent:1.0.0 .

# Run the agent with input from file
docker run --rm -v $(pwd)/input.json:/input.json \
    meta-agent-platform/dockerized-agent:1.0.0 \
    python -m src.agent < /input.json

# Run the agent with environment variables
docker run --rm \
    -e AGENT_NAME="CustomAgent" \
    -e AGENT_VERSION="1.1.0" \
    -e AGENT_INPUT='{"query": "What is the weather?", "parameters": {"location": "New York"}}' \
    meta-agent-platform/dockerized-agent:1.0.0

3. API Agent (FastAPI)

A RESTful API agent that can be deployed as a service and integrated with other systems via HTTP requests.

Directory Structure

api-agent/
├── Dockerfile
├── requirements.txt
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── models.py
│   ├── agent.py
│   └── config.py
├── tests/
│   ├── __init__.py
│   └── test_api.py
└── README.md

API Implementation

# app/main.py
from fastapi import FastAPI, Request, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Optional, Dict, Any
import time
import uuid
import logging

from .models import AgentRequest, AgentResponse, ErrorResponse
from .agent import ApiAgent
from .config import Settings, get_settings

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("api-agent")

# Create FastAPI app
app = FastAPI(
    title="Meta Agent Platform API Agent",
    description="A RESTful API agent for the Meta Agent Platform",
    version="1.0.0"
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Restrict in production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Create agent instance
agent = ApiAgent()

# Dependency for API key validation
async def verify_api_key(x_api_key: str = Header(None), settings: Settings = Depends(get_settings)):
    if settings.api_key_required and x_api_key != settings.api_key:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return x_api_key

# Request ID middleware
@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

# Rate limiting middleware
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    # Simple in-memory rate limiting (use Redis in production)
    client_ip = request.client.host
    settings = get_settings()

    # Skip rate limiting if disabled
    if not settings.rate_limit_enabled:
        return await call_next(request)

    # Check rate limit
    current_time = time.time()
    request_count = getattr(request.app.state, f"rate_limit_{client_ip}", 0)
    last_request_time = getattr(request.app.state, f"last_request_{client_ip}", 0)

    # Reset counter if window has passed
    if current_time - last_request_time > settings.rate_limit_window:
        request_count = 0

    # Increment counter
    request_count += 1
    setattr(request.app.state, f"rate_limit_{client_ip}", request_count)
    setattr(request.app.state, f"last_request_{client_ip}", current_time)

    # Check if over limit
    if request_count > settings.rate_limit_max_requests:
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded", "retry_after": settings.rate_limit_window}
        )

    return await call_next(request)

# Health check endpoint
@app.get("/health")
async def health_check():
    return {"status": "healthy", "version": app.version}

# Agent endpoint
@app.post("/run", response_model=AgentResponse, responses={400: {"model": ErrorResponse}, 401: {"model": ErrorResponse}, 429: {"model": ErrorResponse}, 500: {"model": ErrorResponse}})
async def run_agent(request: AgentRequest, request_id: str = Depends(verify_api_key)):
    try:
        logger.info(f"Processing request {request_id}")

        # Process the request
        result = agent.process(request.dict())

        return AgentResponse(
            request_id=request_id,
            result=result["result"],
            confidence=result.get("confidence", 1.0),
            metadata=result.get("metadata", {})
        )
    except Exception as e:
        logger.exception(f"Error processing request {request_id}")
        raise HTTPException(status_code=500, detail=str(e))

# Async batch processing endpoint
@app.post("/batch")
async def batch_process(requests: list[AgentRequest], request_id: str = Depends(verify_api_key)):
    results = []
    for req in requests:
        try:
            result = agent.process(req.dict())
            results.append({
                "success": True,
                "result": result
            })
        except Exception as e:
            results.append({
                "success": False,
                "error": str(e)
            })

    return {"batch_results": results, "request_id": request_id}

Models

# app/models.py
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional

class AgentRequest(BaseModel):
    """Request model for the agent API."""
    query: str = Field(..., description="The query to process")
    parameters: Optional[Dict[str, Any]] = Field(default={}, description="Additional parameters")

class AgentResponse(BaseModel):
    """Response model for the agent API."""
    request_id: str = Field(..., description="Unique request identifier")
    result: str = Field(..., description="The processing result")
    confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence score")
    metadata: Dict[str, Any] = Field(default={}, description="Additional metadata")

class ErrorResponse(BaseModel):
    """Error response model."""
    error: str = Field(..., description="Error message")
    detail: Optional[str] = Field(None, description="Detailed error information")

Agent Implementation

# app/agent.py
from typing import Dict, Any
import logging

logger = logging.getLogger("api-agent")

class ApiAgent:
    """API agent implementation."""

    def __init__(self):
        """Initialize the agent."""
        self.name = "API Agent"
        self.version = "1.0.0"
        logger.info(f"Initializing {self.name} v{self.version}")

    def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process the input data.

        Args:
            input_data: Input data dictionary

        Returns:
            Dict: Processing results
        """
        query = input_data.get("query", "")
        parameters = input_data.get("parameters", {})

        logger.info(f"Processing query: {query}")

        # Implement your agent logic here
        # This is a simple example that echoes the input
        result = f"Processed: {query}"

        return {
            "result": result,
            "confidence": 0.95,
            "metadata": {
                "agent": f"{self.name} v{self.version}",
                "parameters": parameters
            }
        }

Configuration

# app/config.py
from pydantic import BaseSettings
from functools import lru_cache
import os

class Settings(BaseSettings):
    """Application settings."""
    # API settings
    api_key_required: bool = True
    api_key: str = "your-api-key-here"

    # Rate limiting
    rate_limit_enabled: bool = True
    rate_limit_max_requests: int = 100
    rate_limit_window: int = 60  # seconds

    # Agent settings
    agent_name: str = "API Agent"
    agent_version: str = "1.0.0"

    class Config:
        env_file = ".env"
        env_prefix = "AGENT_"

@lru_cache()
def get_settings() -> Settings:
    """Get application settings."""
    return Settings()

Dockerfile

# Dockerfile for API agent
FROM python:3.11-slim

# Set working directory
WORKDIR /app

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PYTHONPATH=/app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Create non-root user
RUN useradd -m -r agent && \
    chown -R agent:agent /app

# Copy application code
COPY app/ /app/app/

# Switch to non-root user
USER agent

# Expose port
EXPOSE 8000

# Run the API server
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

requirements.txt

fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.2
python-dotenv==1.0.0

Usage Example

# Build and run the API agent
docker build -t meta-agent-platform/api-agent:1.0.0 .
docker run -p 8000:8000 -e AGENT_API_KEY=my-secret-key meta-agent-platform/api-agent:1.0.0

# Make a request to the agent
curl -X POST http://localhost:8000/run \
  -H "Content-Type: application/json" \
  -H "X-API-Key: my-secret-key" \
  -d '{"query": "What is the weather?", "parameters": {"location": "New York"}}'

4. Cloud-Based Agent (Template)

A client-side agent that delegates processing to a cloud service, ideal for resource-intensive tasks or when leveraging managed AI services.

Directory Structure

cloud-agent/
├── requirements.txt
├── cloud_agent.py
├── config.py
├── exceptions.py
├── tests/
│   ├── __init__.py
│   └── test_cloud_agent.py
└── README.md

Agent Implementation

# cloud_agent.py
import requests
import json
import time
import logging
from typing import Dict, Any, Optional, Union
from requests.exceptions import RequestException, Timeout

from exceptions import CloudAgentError, AuthenticationError, RateLimitError, ServiceUnavailableError
from config import CloudAgentConfig

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("cloud-agent")

class CloudAgent:
    """Agent that delegates processing to a cloud service."""

    def __init__(self, config: Optional[Union[Dict[str, Any], CloudAgentConfig]] = None):
        """Initialize the cloud agent.

        Args:
            config: Agent configuration (dict or CloudAgentConfig)
        """
        # Initialize configuration
        if config is None:
            self.config = CloudAgentConfig()
        elif isinstance(config, dict):
            self.config = CloudAgentConfig(**config)
        else:
            self.config = config

        # Initialize agent properties
        self.name = "Cloud Agent"
        self.version = "1.0.0"

        # Initialize session
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {self.config.api_key}",
            "Content-Type": "application/json",
            "User-Agent": f"{self.name}/{self.version}"
        })

        logger.info(f"Initialized {self.name} v{self.version} with endpoint {self.config.endpoint}")

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Run the agent on the provided input data.

        Args:
            input_data: The input data to process

        Returns:
            Dict: The processing results

        Raises:
            CloudAgentError: If an error occurs during processing
        """
        try:
            # Prepare request payload
            payload = self._prepare_payload(input_data)

            # Send request to cloud service
            response = self._send_request(payload)

            # Process response
            result = self._process_response(response)

            return result

        except CloudAgentError:
            # Re-raise specific cloud agent errors
            raise
        except Exception as e:
            # Wrap other exceptions
            logger.exception("Unexpected error during agent execution")
            raise CloudAgentError(f"Unexpected error: {str(e)}") from e

    def _prepare_payload(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Prepare the request payload.

        Args:
            input_data: The input data to process

        Returns:
            Dict: The prepared payload
        """
        # Add any additional fields required by the cloud service
        payload = {
            "input": input_data,
            "options": self.config.options,
            "version": self.config.service_version
        }

        return payload

    def _send_request(self, payload: Dict[str, Any]) -> requests.Response:
        """Send request to the cloud service.

        Args:
            payload: The request payload

        Returns:
            Response: The HTTP response

        Raises:
            AuthenticationError: If authentication fails
            RateLimitError: If rate limit is exceeded
            ServiceUnavailableError: If service is unavailable
            CloudAgentError: For other request errors
        """
        try:
            # Send request with retry logic
            retries = 0
            while retries <= self.config.max_retries:
                try:
                    response = self.session.post(
                        self.config.endpoint,
                        json=payload,
                        timeout=self.config.timeout
                    )

                    # Check for specific error status codes
                    if response.status_code == 401:
                        raise AuthenticationError("Authentication failed. Check your API key.")
                    elif response.status_code == 429:
                        retry_after = int(response.headers.get("Retry-After", self.config.retry_delay))
                        if retries < self.config.max_retries:
                            logger.warning(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
                            time.sleep(retry_after)
                            retries += 1
                            continue
                        else:
                            raise RateLimitError("Rate limit exceeded. Try again later.")
                    elif response.status_code >= 500:
                        if retries < self.config.max_retries:
                            delay = self.config.retry_delay * (2 ** retries)  # Exponential backoff
                            logger.warning(f"Service error. Retrying after {delay} seconds.")
                            time.sleep(delay)
                            retries += 1
                            continue
                        else:
                            raise ServiceUnavailableError("Service is currently unavailable. Try again later.")
                    elif response.status_code != 200:
                        # Handle other error status codes
                        error_message = response.json().get("error", f"Request failed with status code {response.status_code}")
                        raise CloudAgentError(error_message)

                    # Success - return response
                    return response

                except Timeout:
                    if retries < self.config.max_retries:
                        delay = self.config.retry_delay * (2 ** retries)  # Exponential backoff
                        logger.warning(f"Request timeout. Retrying after {delay} seconds.")
                        time.sleep(delay)
                        retries += 1
                    else:
                        raise CloudAgentError("Request timed out after multiple retries.")

            # Should not reach here, but just in case
            raise CloudAgentError("Failed to send request after maximum retries.")

        except RequestException as e:
            # Handle request exceptions
            logger.exception("Error sending request to cloud service")
            raise CloudAgentError(f"Request error: {str(e)}") from e

    def _process_response(self, response: requests.Response) -> Dict[str, Any]:
        """Process the response from the cloud service.

        Args:
            response: The HTTP response

        Returns:
            Dict: The processed result

        Raises:
            CloudAgentError: If response processing fails
        """
        try:
            # Parse JSON response
            result = response.json()

            # Add agent metadata
            result["metadata"] = result.get("metadata", {})
            result["metadata"]["agent"] = f"{self.name} v{self.version}"

            return result
        except json.JSONDecodeError as e:
            logger.exception("Error decoding JSON response")
            raise CloudAgentError(f"Invalid JSON response: {str(e)}") from e

Configuration

# config.py
from typing import Dict, Any, Optional
from pydantic import BaseModel, Field
import os

class CloudAgentConfig(BaseModel):
    """Configuration for the cloud agent."""
    # API settings
    api_key: str = Field(
        default_factory=lambda: os.environ.get("CLOUD_AGENT_API_KEY", ""),
        description="API key for the cloud service"
    )
    endpoint: str = Field(
        default_factory=lambda: os.environ.get(
            "CLOUD_AGENT_ENDPOINT",
            "https://api.example.com/agent/run"
        ),
        description="Endpoint URL for the cloud service"
    )
    service_version: str = Field(
        default="v1",
        description="Version of the cloud service API"
    )

    # Request settings
    timeout: int = Field(
        default=30,
        description="Request timeout in seconds"
    )
    max_retries: int = Field(
        default=3,
        description="Maximum number of retry attempts"
    )
    retry_delay: int = Field(
        default=1,
        description="Initial delay between retries in seconds"
    )

    # Agent options
    options: Dict[str, Any] = Field(
        default_factory=dict,
        description="Additional options for the cloud service"
    )

Exceptions

# exceptions.py
class CloudAgentError(Exception):
    """Base exception for cloud agent errors."""
    pass

class AuthenticationError(CloudAgentError):
    """Raised when authentication fails."""
    pass

class RateLimitError(CloudAgentError):
    """Raised when rate limit is exceeded."""
    pass

class ServiceUnavailableError(CloudAgentError):
    """Raised when the service is unavailable."""
    pass

Usage Example

# Example usage of the cloud agent
from cloud_agent import CloudAgent
from config import CloudAgentConfig

# Configure the agent
config = CloudAgentConfig(
    api_key="your-api-key-here",
    endpoint="https://api.example.com/agent/run",
    service_version="v2",
    timeout=60,
    options={
        "model": "gpt-4",
        "temperature": 0.7,
        "max_tokens": 1000
    }
)

# Initialize the agent
agent = CloudAgent(config)

# Run the agent
try:
    result = agent.run({
        "query": "Explain quantum computing in simple terms",
        "context": "The user is a high school student"
    })
    print(f"Result: {result['result']}")
    print(f"Confidence: {result.get('confidence', 'N/A')}")

 except Exception as e:
    print(f"Error: {str(e)}")

5. Event-Driven Agent (Template)

An agent that responds to events from message queues, webhooks, or other event sources, ideal for asynchronous processing and integration with event-driven architectures.

Directory Structure

event-agent/
├── Dockerfile
├── requirements.txt
├── src/
│   ├── __init__.py
│   ├── event_agent.py
│   ├── event_handlers.py
│   ├── models.py
│   └── config.py
├── tests/
│   ├── __init__.py
│   └── test_event_agent.py
└── README.md

Agent Implementation

# src/event_agent.py
import asyncio
import json
import logging
from typing import Dict, Any, Optional, Callable, List

from .models import Event, EventResult
from .config import EventAgentConfig

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("event-agent")

class EventAgent:
    """Agent that processes events asynchronously."""

    def __init__(self, config: Optional[EventAgentConfig] = None):
        """Initialize the event agent.

        Args:
            config: Agent configuration
        """
        self.config = config or EventAgentConfig()
        self.name = "Event Agent"
        self.version = "1.0.0"

        # Initialize event handlers registry
        self.event_handlers: Dict[str, List[Callable]] = {}

        logger.info(f"Initialized {self.name} v{self.version}")

    def register_handler(self, event_type: str, handler: Callable):
        """Register a handler for a specific event type.

        Args:
            event_type: Type of event to handle
            handler: Function to call when event is received
        """
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []

        self.event_handlers[event_type].append(handler)
        logger.info(f"Registered handler for event type: {event_type}")

    async def on_event(self, event_data: Dict[str, Any]) -> EventResult:
        """Handle an incoming event asynchronously.

        Args:
            event_data: Event data dictionary

        Returns:
            EventResult: Result of event processing
        """
        try:
            # Parse event
            event = Event(**event_data)

            logger.info(f"Handling event {event.id} of type {event.type}")

            # Find handlers for this event type
            handlers = self.event_handlers.get(event.type, [])

            # Also check for wildcard handlers
            wildcard_handlers = self.event_handlers.get("*", [])
            handlers.extend(wildcard_handlers)

            if not handlers:
                logger.warning(f"No handlers registered for event type: {event.type}")
                return EventResult(
                    event_id=event.id,
                    success=False,
                    message=f"No handlers for event type: {event.type}",
                    result=None
                )

            # Execute all handlers asynchronously
            tasks = []
            for handler in handlers:
                if asyncio.iscoroutinefunction(handler):
                    # Async handler
                    tasks.append(handler(event))
                else:
                    # Sync handler - wrap in a coroutine
                    tasks.append(asyncio.to_thread(handler, event))

            # Wait for all handlers to complete
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # Process results
            handler_results = []
            for i, result in enumerate(results):
                handler_name = handlers[i].__name__
                if isinstance(result, Exception):
                    logger.error(f"Error in handler {handler_name}: {result}")
                    handler_results.append({
                        "handler": handler_name,
                        "success": False,
                        "error": str(result)
                    })
                else:
                    handler_results.append({
                        "handler": handler_name,
                        "success": True,
                        "result": result
                    })

            # Determine overall success
            success = any(r["success"] for r in handler_results)

            return EventResult(
                event_id=event.id,
                success=success,
                message="Event processed" if success else "Event processing failed",
                result={
                    "handler_results": handler_results
                }
            )

        except Exception as e:
            logger.exception("Error handling event")
            return EventResult(
                event_id=event_data.get("id", "unknown"),
                success=False,
                message=f"Error: {str(e)}",
                result=None
            )

    async def start(self):
        """Start the event agent."""
        logger.info(f"Starting {self.name} v{self.version}")

        # Start event listeners based on configuration
        tasks = []

        if self.config.kafka_enabled:
            tasks.append(self._start_kafka_consumer())

        if self.config.rabbitmq_enabled:
            tasks.append(self._start_rabbitmq_consumer())

        if self.config.webhook_enabled:
            tasks.append(self._start_webhook_server())

        logger.info(f"{self.name} started successfully")

        # Wait for all listeners
        await asyncio.gather(*tasks)

    async def _start_kafka_consumer(self):
        """Start Kafka consumer."""
        try:
            from aiokafka import AIOKafkaConsumer

            consumer = AIOKafkaConsumer(
                *self.config.kafka_topics,
                bootstrap_servers=self.config.kafka_bootstrap_servers,
                group_id=self.config.kafka_group_id,
                auto_offset_reset='earliest',
                value_deserializer=lambda m: json.loads(m.decode('utf-8'))
            )

            await consumer.start()
            logger.info(f"Kafka consumer started. Listening to topics: {self.config.kafka_topics}")

            try:
                async for message in consumer:
                    try:
                        event_data = message.value
                        # Add Kafka-specific metadata
                        event_data["metadata"] = event_data.get("metadata", {})
                        event_data["metadata"]["kafka"] = {
                            "topic": message.topic,
                            "partition": message.partition,
                            "offset": message.offset
                        }

                        # Handle the event
                        await self.on_event(event_data)
                    except Exception as e:
                        logger.exception(f"Error processing Kafka message: {e}")
            finally:
                await consumer.stop()

        except ImportError:
            logger.error("Kafka support requires aiokafka package. Install with: pip install aiokafka")

    async def _start_rabbitmq_consumer(self):
        """Start RabbitMQ consumer."""
        try:
            import aio_pika

            connection = await aio_pika.connect_robust(
                f"amqp://guest:guest@{self.config.rabbitmq_host}/"
            )

            channel = await connection.channel()

            # Declare exchange
            exchange = await channel.declare_exchange(
                self.config.rabbitmq_exchange,
                aio_pika.ExchangeType.TOPIC
            )

            # Declare queue
            queue = await channel.declare_queue(exclusive=True)

            # Bind to routing keys
            for binding_key in self.config.rabbitmq_routing_keys:
                await queue.bind(exchange, binding_key)

            logger.info(f"RabbitMQ consumer started. Listening to exchange: {self.config.rabbitmq_exchange}")

            async def process_message(message):
                async with message.process():
                    try:
                        event_data = json.loads(message.body.decode())
                        # Add RabbitMQ-specific metadata
                        event_data["metadata"] = event_data.get("metadata", {})
                        event_data["metadata"]["rabbitmq"] = {
                            "exchange": self.config.rabbitmq_exchange,
                            "routing_key": message.routing_key
                        }

                        # Handle the event
                        await self.on_event(event_data)
                    except Exception as e:
                        logger.exception(f"Error processing RabbitMQ message: {e}")

            await queue.consume(process_message)

            # Keep the consumer running
            await asyncio.Future()

        except ImportError:
            logger.error("RabbitMQ support requires aio_pika package. Install with: pip install aio_pika")

    async def _start_webhook_server(self):
        """Start webhook server."""
        try:
            from aiohttp import web

            app = web.Application()

            async def webhook_handler(request):
                try:
                    event_data = await request.json()
                    # Add webhook-specific metadata
                    event_data["metadata"] = event_data.get("metadata", {})
                    event_data["metadata"]["webhook"] = {
                        "headers": dict(request.headers),
                        "remote": request.remote
                    }

                    # Handle the event
                    result = await self.on_event(event_data)

                    return web.json_response(result.dict())
                except Exception as e:
                    logger.exception(f"Error processing webhook: {e}")
                    return web.json_response({"error": str(e)}, status=500)

            app.router.add_post('/webhook', webhook_handler)

            runner = web.AppRunner(app)
            await runner.setup()

            site = web.TCPSite(
                runner,
                self.config.webhook_host,
                self.config.webhook_port
            )
            await site.start()

            logger.info(f"Webhook server started at http://{self.config.webhook_host}:{self.config.webhook_port}/webhook")

            # Keep the server running
            await asyncio.Future()

        except ImportError:
            logger.error("Webhook support requires aiohttp package. Install with: pip install aiohttp")

Models

# src/models.py
from pydantic import BaseModel, Field
from typing import Dict, Any, Optional

class Event(BaseModel):
    """Event model."""
    id: str = Field(..., description="Unique event identifier")
    type: str = Field(..., description="Event type")
    source: str = Field(..., description="Event source")
    time: str = Field(..., description="Event timestamp (ISO format)")
    data: Dict[str, Any] = Field(default_factory=dict, description="Event data")
    metadata: Dict[str, Any] = Field(default_factory=dict, description="Event metadata")

class EventResult(BaseModel):
    """Event processing result."""
    event_id: str = Field(..., description="ID of the processed event")
    success: bool = Field(..., description="Whether processing was successful")
    message: str = Field(..., description="Result message")
    result: Optional[Dict[str, Any]] = Field(None, description="Processing result")

Configuration

# src/config.py
from pydantic import BaseSettings, Field
from typing import List

class EventAgentConfig(BaseSettings):
    """Configuration for the event agent."""
    # Kafka settings
    kafka_enabled: bool = Field(default=False, description="Enable Kafka consumer")
    kafka_bootstrap_servers: List[str] = Field(default=["localhost:9092"], description="Kafka bootstrap servers")
    kafka_topics: List[str] = Field(default=["events"], description="Kafka topics to consume")
    kafka_group_id: str = Field(default="event-agent", description="Kafka consumer group ID")

    # RabbitMQ settings
    rabbitmq_enabled: bool = Field(default=False, description="Enable RabbitMQ consumer")
    rabbitmq_host: str = Field(default="localhost", description="RabbitMQ host")
    rabbitmq_exchange: str = Field(default="events", description="RabbitMQ exchange name")
    rabbitmq_routing_keys: List[str] = Field(default=["#"], description="RabbitMQ routing keys")

    # Webhook settings
    webhook_enabled: bool = Field(default=True, description="Enable webhook server")
    webhook_host: str = Field(default="0.0.0.0", description="Webhook server host")
    webhook_port: int = Field(default=5000, description="Webhook server port")

    # Agent settings
    log_level: str = Field(default="INFO", description="Logging level")

    class Config:
        env_file = ".env"
        env_prefix = "EVENT_AGENT_"

Usage Example

# Example usage of the event agent
import asyncio
from src.event_agent import EventAgent
from src.config import EventAgentConfig
from src.models import Event

# Define event handlers
async def log_event_handler(event: Event):
    """Simple handler that logs the event."""
    print(f"Received event: {event.id} of type {event.type}")
    return {"message": "Event logged successfully"}

async def process_user_event(event: Event):
    """Handler for user events."""
    print(f"Processing user event: {event.data}")
    # Simulate some processing time
    await asyncio.sleep(1)
    return {"user_processed": True}

# Configure the agent
config = EventAgentConfig(
    kafka_enabled=True,
    kafka_topics=["user-events"],
    webhook_enabled=True,
    webhook_port=8080
)

# Main function
async def main():
    # Initialize the agent
    agent = EventAgent(config)

    # Register event handlers
    agent.register_handler("*", log_event_handler)  # Log all events
    agent.register_handler("user.created", process_user_event)

    # Test with a sample event
    test_event = {
        "id": "evt-123",
        "type": "user.created",
        "source": "user-service",
        "time": "2025-04-18T12:34:56Z",
        "data": {
            "user_id": "usr-456",
            "username": "johndoe",
            "email": "john@example.com"
        }
    }

    # Process the test event
    result = await agent.on_event(test_event)
    print(f"Event processing result: {result}")

    # Start the agent (this will run indefinitely)
    await agent.start()

# Run the agent
if __name__ == "__main__":
    asyncio.run(main())

6. Integration Guide: Adding New Frameworks

  1. Identify the agent framework's interface (input/output, lifecycle).
  2. Write an adapter class that wraps the framework's agent for use in your orchestrator.
  3. Add configuration and example usage to this document.

Example: LangChain Integration

# langchain_adapter.py
from typing import Dict, Any, Optional
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI

class LangChainAdapter:
    """Adapter for LangChain framework."""

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """Initialize the LangChain adapter.

        Args:
            config: Configuration dictionary
        """
        self.config = config or {}

        # Initialize LangChain components
        self.llm = OpenAI(
            temperature=self.config.get("temperature", 0.7),
            model_name=self.config.get("model_name", "gpt-3.5-turbo"),
            max_tokens=self.config.get("max_tokens", 500)
        )

        # Create default prompt template
        template = self.config.get("prompt_template", "{input}")
        self.prompt = PromptTemplate(template=template, input_variables=["input"])

        # Create chain
        self.chain = LLMChain(llm=self.llm, prompt=self.prompt)

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Run the LangChain agent.

        Args:
            input_data: Input data dictionary

        Returns:
            Dict: Processing results
        """
        # Extract input text
        input_text = input_data.get("input", "")

        # Run the chain
        result = self.chain.run(input=input_text)

        # Return formatted result
        return {
            "result": result,
            "model": self.config.get("model_name", "gpt-3.5-turbo"),
            "metadata": {
                "framework": "langchain",
                "temperature": self.config.get("temperature", 0.7),
                "max_tokens": self.config.get("max_tokens", 500)
            }
        }

Example: Hugging Face Integration

# huggingface_adapter.py
from typing import Dict, Any, Optional, List
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

class HuggingFaceAdapter:
    """Adapter for Hugging Face Transformers."""

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """Initialize the Hugging Face adapter.

        Args:
            config: Configuration dictionary
        """
        self.config = config or {}

        # Get model name
        self.model_name = self.config.get("model_name", "gpt2")

        # Initialize tokenizer and model
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(self.model_name)

        # Move to GPU if available
        if torch.cuda.is_available() and self.config.get("use_gpu", True):
            self.model = self.model.to("cuda")

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """Run the Hugging Face model.

        Args:
            input_data: Input data dictionary

        Returns:
            Dict: Processing results
        """
        # Extract input text
        input_text = input_data.get("input", "")
        max_length = self.config.get("max_length", 100)

        # Tokenize input
        inputs = self.tokenizer(input_text, return_tensors="pt")
        if torch.cuda.is_available() and self.config.get("use_gpu", True):
            inputs = inputs.to("cuda")

        # Generate output
        outputs = self.model.generate(
            inputs["input_ids"],
            max_length=max_length,
            num_return_sequences=1,
            temperature=self.config.get("temperature", 1.0),
            top_p=self.config.get("top_p", 0.9),
            do_sample=self.config.get("do_sample", True)
        )

        # Decode output
        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        # Return formatted result
        return {
            "result": generated_text,
            "model": self.model_name,
            "metadata": {
                "framework": "huggingface",
                "max_length": max_length,
                "temperature": self.config.get("temperature", 1.0),
                "top_p": self.config.get("top_p", 0.9)
            }
        }

References


Last updated: 2025-04-18