Building Scalable Microservices with Python - Architecture Patterns and Best Practices

Building Scalable Microservices with Python - Architecture Patterns and Best Practices

Microservices architecture has revolutionized how we build and deploy enterprise applications. When implemented correctly with Python, it enables unprecedented scalability, maintainability, and team autonomy. In this comprehensive guide, we'll explore proven patterns and practical implementations that power production systems like Funeral Manager, demonstrating how to architect, containerize, and orchestrate Python microservices effectively.

Understanding Microservices Architecture Fundamentals

Microservices architecture breaks down monolithic applications into smaller, independently deployable services that communicate over well-defined APIs. Each service owns its data, can be developed by different teams, and scales independently based on demand.

Core Principles for Python Microservices

# Service boundary definition example
from abc import ABC, abstractmethod
from typing import Protocol

class ServiceBoundary(Protocol):
    """Defines the contract for a microservice"""
    
    def process_request(self, request: dict) -> dict:
        """Process incoming requests"""
        pass
    
    def health_check(self) -> bool:
        """Service health status"""
        pass
    
    def get_metrics(self) -> dict:
        """Service performance metrics"""
        pass

class UserService(ServiceBoundary):
    """User management microservice"""
    
    def __init__(self, database_url: str):
        self.db_connection = self._connect_database(database_url)
    
    def process_request(self, request: dict) -> dict:
        # Handle user-related operations
        operation = request.get('operation')
        
        if operation == 'create_user':
            return self._create_user(request['data'])
        elif operation == 'get_user':
            return self._get_user(request['user_id'])
        
        return {'error': 'Unknown operation'}
    
    def health_check(self) -> bool:
        try:
            # Check database connectivity
            self.db_connection.ping()
            return True
        except Exception:
            return False

Service Communication Patterns

Effective microservices communication is crucial for system reliability. Let's explore synchronous and asynchronous patterns used in production systems.

HTTP API Communication with FastAPI

# api_gateway.py - Central API gateway pattern
from fastapi import FastAPI, HTTPException, Depends
from httpx import AsyncClient
import asyncio
from typing import Dict, Any

app = FastAPI(title="Funeral Manager API Gateway")

class ServiceRegistry:
    """Service discovery and load balancing"""
    
    def __init__(self):
        self.services = {
            'user-service': ['http://user-service:8001'],
            'notification-service': ['http://notification-service:8002'],
            'billing-service': ['http://billing-service:8003']
        }
    
    def get_service_url(self, service_name: str) -> str:
        urls = self.services.get(service_name, [])
        if not urls:
            raise HTTPException(404, f"Service {service_name} not found")
        # Simple round-robin (in production, use proper load balancing)
        return urls[0]

registry = ServiceRegistry()

@app.post("/api/users")
async def create_user(user_data: dict):
    """Proxy request to user service"""
    async with AsyncClient() as client:
        service_url = registry.get_service_url('user-service')
        response = await client.post(f"{service_url}/users", json=user_data)
        
        if response.status_code == 201:
            # Trigger notification service asynchronously
            asyncio.create_task(
                notify_user_created(user_data['email'], user_data['name'])
            )
        
        return response.json()

async def notify_user_created(email: str, name: str):
    """Asynchronous service communication"""
    async with AsyncClient() as client:
        notification_url = registry.get_service_url('notification-service')
        await client.post(f"{notification_url}/send", json={
            'type': 'welcome_email',
            'recipient': email,
            'data': {'name': name}
        })

Event-Driven Architecture with Message Queues

# event_bus.py - Asynchronous event handling
import asyncio
import json
from typing import Callable, Dict, List
import aio_pika
from dataclasses import dataclass, asdict

@dataclass
class DomainEvent:
    """Base class for domain events"""
    event_type: str
    aggregate_id: str
    timestamp: float
    data: dict

class EventBus:
    """Distributed event bus using RabbitMQ"""
    
    def __init__(self, rabbitmq_url: str):
        self.connection_url = rabbitmq_url
        self.handlers: Dict[str, List[Callable]] = {}
    
    async def connect(self):
        self.connection = await aio_pika.connect_robust(self.connection_url)
        self.channel = await self.connection.channel()
        
        # Declare exchange for events
        self.exchange = await self.channel.declare_exchange(
            'domain_events', aio_pika.ExchangeType.TOPIC
        )
    
    def subscribe(self, event_type: str):
        """Decorator for event handlers"""
        def decorator(handler: Callable):
            if event_type not in self.handlers:
                self.handlers[event_type] = []
            self.handlers[event_type].append(handler)
            return handler
        return decorator
    
    async def publish(self, event: DomainEvent):
        """Publish event to message bus"""
        message = aio_pika.Message(
            json.dumps(asdict(event)).encode(),
            content_type='application/json'
        )
        
        await self.exchange.publish(
            message, routing_key=f"events.{event.event_type}"
        )
    
    async def start_consuming(self):
        """Start consuming events"""
        for event_type, handlers in self.handlers.items():
            queue = await self.channel.declare_queue(
                f"service_{event_type}_queue", durable=True
            )
            
            await queue.bind(self.exchange, f"events.{event_type}")
            
            async def process_message(message: aio_pika.IncomingMessage):
                async with message.process():
                    event_data = json.loads(message.body.decode())
                    event = DomainEvent(**event_data)
                    
                    # Execute all handlers for this event type
                    for handler in handlers:
                        try:
                            await handler(event)
                        except Exception as e:
                            print(f"Handler error: {e}")
            
            await queue.consume(process_message)

# Usage in a service
event_bus = EventBus("amqp://rabbitmq:5672")

@event_bus.subscribe("user.created")
async def handle_user_created(event: DomainEvent):
    """Handle user creation events"""
    user_data = event.data
    print(f"New user created: {user_data['email']}")
    
    # Send welcome email, create user profile, etc.
    await send_welcome_email(user_data['email'])

@event_bus.subscribe("funeral.scheduled")
async def handle_funeral_scheduled(event: DomainEvent):
    """Handle funeral scheduling events - Funeral Manager specific"""
    funeral_data = event.data
    
    # Notify family members, update calendar, send reminders
    await notify_family_members(funeral_data['family_contacts'])
    await update_facility_calendar(funeral_data['facility_id'], funeral_data['date'])

Containerization with Docker

Containerization ensures consistent deployment across environments. Here's how to properly containerize Python microservices:

Multi-Stage Docker Build

# Dockerfile for Python microservice
FROM python:3.11-slim as builder

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Set working directory
WORKDIR /app

# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt

# Production stage
FROM python:3.11-slim

# Create non-root user for security
RUN groupadd -r appuser && useradd -r -g appuser appuser

# Copy installed packages from builder stage
COPY --from=builder /root/.local /home/appuser/.local

# Set PATH to include user packages
ENV PATH=/home/appuser/.local/bin:$PATH

WORKDIR /app

# Copy application code
COPY --chown=appuser:appuser . .

# Switch to non-root user
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8000/health')"

# Expose port
EXPOSE 8000

# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose for Local Development

# docker-compose.yml - Local development environment
version: '3.8'

services:
  # API Gateway
  api-gateway:
    build: ./api-gateway
    ports:
      - "8000:8000"
    environment:
      - RABBITMQ_URL=amqp://rabbitmq:5672
      - REDIS_URL=redis://redis:6379
    depends_on:
      - rabbitmq
      - redis
    networks:
      - microservices-net

  # User Service
  user-service:
    build: ./user-service
    environment:
      - DATABASE_URL=postgresql://user:password@user-db:5432/users
      - RABBITMQ_URL=amqp://rabbitmq:5672
    depends_on:
      - user-db
      - rabbitmq
    networks:
      - microservices-net

  # Notification Service
  notification-service:
    build: ./notification-service
    environment:
      - SMTP_HOST=smtp.gmail.com
      - SMTP_PORT=587
      - RABBITMQ_URL=amqp://rabbitmq:5672
    depends_on:
      - rabbitmq
    networks:
      - microservices-net

  # Databases
  user-db:
    image: postgres:15
    environment:
      - POSTGRES_DB=users
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - user_data:/var/lib/postgresql/data
    networks:
      - microservices-net

  # Message Queue
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "15672:15672"  # Management UI
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=password
    networks:
      - microservices-net

  # Cache
  redis:
    image: redis:7-alpine
    networks:
      - microservices-net

volumes:
  user_data:

networks:
  microservices-net:
    driver: bridge

Kubernetes Orchestration

For production deployment, Kubernetes provides robust orchestration, scaling, and service discovery capabilities.

Service Deployment Configuration

# k8s/user-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
  labels:
    app: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: funeral-manager/user-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: database-secret
              key: url
        - name: RABBITMQ_URL
          valueFrom:
            configMapKeyRef:
              name: rabbitmq-config
              key: url
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: ClusterIP

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: user-service-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: user-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Service Mesh with Istio

# k8s/istio-gateway.yaml
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
  name: funeral-manager-gateway
spec:
  selector:
    istio: ingressgateway
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - api.funeral-manager.org

---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: funeral-manager-routes
spec:
  hosts:
  - api.funeral-manager.org
  gateways:
  - funeral-manager-gateway
  http:
  - match:
    - uri:
        prefix: /api/users
    route:
    - destination:
        host: user-service
        port:
          number: 80
    fault:
      delay:
        percentage:
          value: 0.1
        fixedDelay: 5s
  - match:
    - uri:
        prefix: /api/notifications
    route:
    - destination:
        host: notification-service
        port:
          number: 80
    retries:
      attempts: 3
      perTryTimeout: 2s

Data Management Patterns

Microservices require careful data management to maintain consistency while preserving service autonomy.

Database Per Service Pattern

# user_service/models.py
from sqlalchemy import create_engine, Column, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import uuid
from datetime import datetime

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
    email = Column(String, unique=True, nullable=False)
    name = Column(String, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    is_active = Column(Boolean, default=True)

class UserRepository:
    """Repository pattern for user data access"""
    
    def __init__(self, database_url: str):
        self.engine = create_engine(database_url)
        Base.metadata.create_all(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()
    
    async def create_user(self, email: str, name: str) -> User:
        user = User(email=email, name=name)
        self.session.add(user)
        self.session.commit()
        
        # Publish domain event
        await self._publish_user_created_event(user)
        return user
    
    async def _publish_user_created_event(self, user: User):
        """Publish user creation event for other services"""
        event = DomainEvent(
            event_type="user.created",
            aggregate_id=user.id,
            timestamp=datetime.utcnow().timestamp(),
            data={
                'user_id': user.id,
                'email': user.email,
                'name': user.name
            }
        )
        await event_bus.publish(event)

Saga Pattern for Distributed Transactions

# saga_orchestrator.py - Manages distributed transactions
from enum import Enum
from typing import Dict, List, Callable
import asyncio

class SagaStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"

class SagaStep:
    def __init__(self, name: str, action: Callable, compensation: Callable):
        self.name = name
        self.action = action
        self.compensation = compensation
        self.completed = False

class SagaOrchestrator:
    """Orchestrates distributed transactions across microservices"""
    
    def __init__(self):
        self.sagas: Dict[str, List[SagaStep]] = {}
    
    def define_saga(self, saga_name: str, steps: List[SagaStep]):
        """Define a saga workflow"""
        self.sagas[saga_name] = steps
    
    async def execute_saga(self, saga_name: str, context: dict) -> bool:
        """Execute saga with automatic compensation on failure"""
        steps = self.sagas.get(saga_name, [])
        completed_steps = []
        
        try:
            for step in steps:
                print(f"Executing step: {step.name}")
                await step.action(context)
                step.completed = True
                completed_steps.append(step)
            
            return True
            
        except Exception as e:
            print(f"Saga failed at step {step.name}: {e}")
            
            # Execute compensation in reverse order
            for step in reversed(completed_steps):
                try:
                    print(f"Compensating step: {step.name}")
                    await step.compensation(context)
                except Exception as comp_error:
                    print(f"Compensation failed for {step.name}: {comp_error}")
            
            return False

# Example: Funeral booking saga
async def reserve_facility(context: dict):
    """Reserve funeral facility"""
    facility_id = context['facility_id']
    date = context['date']
    # Call facility service to reserve
    print(f"Reserved facility {facility_id} for {date}")

async def compensate_facility_reservation(context: dict):
    """Cancel facility reservation"""
    facility_id = context['facility_id']
    # Call facility service to cancel
    print(f"Cancelled facility reservation {facility_id}")

async def process_payment(context: dict):
    """Process payment for funeral services"""
    amount = context['amount']
    # Call payment service
    print(f"Processed payment of ${amount}")

async def refund_payment(context: dict):
    """Refund payment"""
    amount = context['amount']
    # Call payment service for refund
    print(f"Refunded payment of ${amount}")

async def send_confirmation(context: dict):
    """Send booking confirmation"""
    email = context['email']
    # Call notification service
    print(f"Sent confirmation to {email}")

async def send_cancellation(context: dict):
    """Send cancellation notice"""
    email = context['email']
    # Call notification service
    print(f"Sent cancellation notice to {email}")

# Define funeral booking saga
saga_orchestrator = SagaOrchestrator()
saga_orchestrator.define_saga("funeral_booking", [
    SagaStep("reserve_facility", reserve_facility, compensate_facility_reservation),
    SagaStep("process_payment", process_payment, refund_payment),
    SagaStep("send_confirmation", send_confirmation, send_cancellation)
])

Monitoring and Observability

Production microservices require comprehensive monitoring, logging, and tracing capabilities.

Structured Logging and Metrics

# observability.py - Monitoring and logging utilities
import logging
import time
from functools import wraps
from typing import Dict, Any
import json
from prometheus_client import Counter, Histogram, generate_latest
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Prometheus metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')

class StructuredLogger:
    """Structured logging for microservices"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)
        
        # Configure structured logging
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_request(self, method: str, endpoint: str, status_code: int, 
                   duration: float, user_id: str = None, **kwargs):
        """Log HTTP request with structured data"""
        log_data = {
            'service': self.service_name,
            'type': 'http_request',
            'method': method,
            'endpoint': endpoint,
            'status_code': status_code,
            'duration_ms': duration * 1000,
            'user_id': user_id,
            **kwargs
        }
        
        self.logger.info(json.dumps(log_data))
        
        # Update Prometheus metrics
        REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status_code).inc()
        REQUEST_DURATION.observe(duration)

def monitor_performance(logger: StructuredLogger):
    """Decorator for monitoring function performance"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            
            # Start distributed trace
            tracer = trace.get_tracer(__name__)
            with tracer.start_as_current_span(func.__name__) as span:
                try:
                    result = await func(*args, **kwargs)
                    span.set_attribute("success", True)
                    return result
                except Exception as e:
                    span.set_attribute("success", False)
                    span.set_attribute("error", str(e))
                    logger.logger.error(f"Function {func.__name__} failed: {e}")
                    raise
                finally:
                    duration = time.time() - start_time
                    span.set_attribute("duration", duration)
        
        return wrapper
    return decorator

# Usage in service
logger = StructuredLogger("user-service")

@monitor_performance(logger)
async def create_user_endpoint(user_data: dict):
    """Monitored user creation endpoint"""
    # Business logic here
    user = await user_repository.create_user(
        email=user_data['email'],
        name=user_data['name']
    )
    
    logger.log_request(
        method="POST",
        endpoint="/users",
        status_code=201,
        duration=0.150,
        user_id=user.id,
        action="user_created"
    )
    
    return user

Best Practices and Production Considerations

Circuit Breaker Pattern

# circuit_breaker.py - Fault tolerance pattern
import asyncio
import time
from enum import Enum
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    """Circuit breaker for service resilience"""
    
    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""
        
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            
            # Reset on success
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
            
            return result
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise e

# Usage with external service calls
notification_circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)

async def send_notification_with_breaker(email: str, message: str):
    """Send notification with circuit breaker protection"""
    return await notification_circuit_breaker.call(
        send_notification, email, message
    )

Conclusion

Building scalable microservices with Python requires careful attention to architecture patterns, service communication, containerization, and operational concerns. The patterns demonstrated here—from event-driven communication to saga orchestration—provide the foundation for robust, maintainable systems.

At Custom Logic, we've successfully implemented these patterns in production systems like Funeral Manager, where microservices architecture enables independent scaling of user management, facility booking, and notification services. This approach allows our development teams to work autonomously while maintaining system coherence and reliability.

The key to successful microservices adoption lies in starting simple, implementing proper monitoring from day one, and gradually evolving your architecture as requirements become clearer. Whether you're building a new system or decomposing a monolith, these patterns provide a solid foundation for scalable, maintainable Python microservices.

Ready to architect your next microservices system? Contact Custom Logic to discuss how we can help you design and implement scalable, production-ready microservices that grow with your business needs.