Building Scalable Microservices with Python - Architecture Patterns and Best Practices
Microservices architecture has revolutionized how we build and deploy enterprise applications. When implemented correctly with Python, it enables unprecedented scalability, maintainability, and team autonomy. In this comprehensive guide, we'll explore proven patterns and practical implementations that power production systems like Funeral Manager, demonstrating how to architect, containerize, and orchestrate Python microservices effectively.
Understanding Microservices Architecture Fundamentals
Microservices architecture breaks down monolithic applications into smaller, independently deployable services that communicate over well-defined APIs. Each service owns its data, can be developed by different teams, and scales independently based on demand.
Core Principles for Python Microservices
# Service boundary definition example
from abc import ABC, abstractmethod
from typing import Protocol
class ServiceBoundary(Protocol):
"""Defines the contract for a microservice"""
def process_request(self, request: dict) -> dict:
"""Process incoming requests"""
pass
def health_check(self) -> bool:
"""Service health status"""
pass
def get_metrics(self) -> dict:
"""Service performance metrics"""
pass
class UserService(ServiceBoundary):
"""User management microservice"""
def __init__(self, database_url: str):
self.db_connection = self._connect_database(database_url)
def process_request(self, request: dict) -> dict:
# Handle user-related operations
operation = request.get('operation')
if operation == 'create_user':
return self._create_user(request['data'])
elif operation == 'get_user':
return self._get_user(request['user_id'])
return {'error': 'Unknown operation'}
def health_check(self) -> bool:
try:
# Check database connectivity
self.db_connection.ping()
return True
except Exception:
return False
Service Communication Patterns
Effective microservices communication is crucial for system reliability. Let's explore synchronous and asynchronous patterns used in production systems.
HTTP API Communication with FastAPI
# api_gateway.py - Central API gateway pattern
from fastapi import FastAPI, HTTPException, Depends
from httpx import AsyncClient
import asyncio
from typing import Dict, Any
app = FastAPI(title="Funeral Manager API Gateway")
class ServiceRegistry:
"""Service discovery and load balancing"""
def __init__(self):
self.services = {
'user-service': ['http://user-service:8001'],
'notification-service': ['http://notification-service:8002'],
'billing-service': ['http://billing-service:8003']
}
def get_service_url(self, service_name: str) -> str:
urls = self.services.get(service_name, [])
if not urls:
raise HTTPException(404, f"Service {service_name} not found")
# Simple round-robin (in production, use proper load balancing)
return urls[0]
registry = ServiceRegistry()
@app.post("/api/users")
async def create_user(user_data: dict):
"""Proxy request to user service"""
async with AsyncClient() as client:
service_url = registry.get_service_url('user-service')
response = await client.post(f"{service_url}/users", json=user_data)
if response.status_code == 201:
# Trigger notification service asynchronously
asyncio.create_task(
notify_user_created(user_data['email'], user_data['name'])
)
return response.json()
async def notify_user_created(email: str, name: str):
"""Asynchronous service communication"""
async with AsyncClient() as client:
notification_url = registry.get_service_url('notification-service')
await client.post(f"{notification_url}/send", json={
'type': 'welcome_email',
'recipient': email,
'data': {'name': name}
})
Event-Driven Architecture with Message Queues
# event_bus.py - Asynchronous event handling
import asyncio
import json
from typing import Callable, Dict, List
import aio_pika
from dataclasses import dataclass, asdict
@dataclass
class DomainEvent:
"""Base class for domain events"""
event_type: str
aggregate_id: str
timestamp: float
data: dict
class EventBus:
"""Distributed event bus using RabbitMQ"""
def __init__(self, rabbitmq_url: str):
self.connection_url = rabbitmq_url
self.handlers: Dict[str, List[Callable]] = {}
async def connect(self):
self.connection = await aio_pika.connect_robust(self.connection_url)
self.channel = await self.connection.channel()
# Declare exchange for events
self.exchange = await self.channel.declare_exchange(
'domain_events', aio_pika.ExchangeType.TOPIC
)
def subscribe(self, event_type: str):
"""Decorator for event handlers"""
def decorator(handler: Callable):
if event_type not in self.handlers:
self.handlers[event_type] = []
self.handlers[event_type].append(handler)
return handler
return decorator
async def publish(self, event: DomainEvent):
"""Publish event to message bus"""
message = aio_pika.Message(
json.dumps(asdict(event)).encode(),
content_type='application/json'
)
await self.exchange.publish(
message, routing_key=f"events.{event.event_type}"
)
async def start_consuming(self):
"""Start consuming events"""
for event_type, handlers in self.handlers.items():
queue = await self.channel.declare_queue(
f"service_{event_type}_queue", durable=True
)
await queue.bind(self.exchange, f"events.{event_type}")
async def process_message(message: aio_pika.IncomingMessage):
async with message.process():
event_data = json.loads(message.body.decode())
event = DomainEvent(**event_data)
# Execute all handlers for this event type
for handler in handlers:
try:
await handler(event)
except Exception as e:
print(f"Handler error: {e}")
await queue.consume(process_message)
# Usage in a service
event_bus = EventBus("amqp://rabbitmq:5672")
@event_bus.subscribe("user.created")
async def handle_user_created(event: DomainEvent):
"""Handle user creation events"""
user_data = event.data
print(f"New user created: {user_data['email']}")
# Send welcome email, create user profile, etc.
await send_welcome_email(user_data['email'])
@event_bus.subscribe("funeral.scheduled")
async def handle_funeral_scheduled(event: DomainEvent):
"""Handle funeral scheduling events - Funeral Manager specific"""
funeral_data = event.data
# Notify family members, update calendar, send reminders
await notify_family_members(funeral_data['family_contacts'])
await update_facility_calendar(funeral_data['facility_id'], funeral_data['date'])
Containerization with Docker
Containerization ensures consistent deployment across environments. Here's how to properly containerize Python microservices:
Multi-Stage Docker Build
# Dockerfile for Python microservice
FROM python:3.11-slim as builder
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# Production stage
FROM python:3.11-slim
# Create non-root user for security
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Copy installed packages from builder stage
COPY --from=builder /root/.local /home/appuser/.local
# Set PATH to include user packages
ENV PATH=/home/appuser/.local/bin:$PATH
WORKDIR /app
# Copy application code
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# Expose port
EXPOSE 8000
# Run application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose for Local Development
# docker-compose.yml - Local development environment
version: '3.8'
services:
# API Gateway
api-gateway:
build: ./api-gateway
ports:
- "8000:8000"
environment:
- RABBITMQ_URL=amqp://rabbitmq:5672
- REDIS_URL=redis://redis:6379
depends_on:
- rabbitmq
- redis
networks:
- microservices-net
# User Service
user-service:
build: ./user-service
environment:
- DATABASE_URL=postgresql://user:password@user-db:5432/users
- RABBITMQ_URL=amqp://rabbitmq:5672
depends_on:
- user-db
- rabbitmq
networks:
- microservices-net
# Notification Service
notification-service:
build: ./notification-service
environment:
- SMTP_HOST=smtp.gmail.com
- SMTP_PORT=587
- RABBITMQ_URL=amqp://rabbitmq:5672
depends_on:
- rabbitmq
networks:
- microservices-net
# Databases
user-db:
image: postgres:15
environment:
- POSTGRES_DB=users
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- user_data:/var/lib/postgresql/data
networks:
- microservices-net
# Message Queue
rabbitmq:
image: rabbitmq:3-management
ports:
- "15672:15672" # Management UI
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=password
networks:
- microservices-net
# Cache
redis:
image: redis:7-alpine
networks:
- microservices-net
volumes:
user_data:
networks:
microservices-net:
driver: bridge
Kubernetes Orchestration
For production deployment, Kubernetes provides robust orchestration, scaling, and service discovery capabilities.
Service Deployment Configuration
# k8s/user-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: user-service
labels:
app: user-service
spec:
replicas: 3
selector:
matchLabels:
app: user-service
template:
metadata:
labels:
app: user-service
spec:
containers:
- name: user-service
image: funeral-manager/user-service:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: database-secret
key: url
- name: RABBITMQ_URL
valueFrom:
configMapKeyRef:
name: rabbitmq-config
key: url
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: user-service
spec:
selector:
app: user-service
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: user-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: user-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Service Mesh with Istio
# k8s/istio-gateway.yaml
apiVersion: networking.istio.io/v1alpha3
kind: Gateway
metadata:
name: funeral-manager-gateway
spec:
selector:
istio: ingressgateway
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- api.funeral-manager.org
---
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: funeral-manager-routes
spec:
hosts:
- api.funeral-manager.org
gateways:
- funeral-manager-gateway
http:
- match:
- uri:
prefix: /api/users
route:
- destination:
host: user-service
port:
number: 80
fault:
delay:
percentage:
value: 0.1
fixedDelay: 5s
- match:
- uri:
prefix: /api/notifications
route:
- destination:
host: notification-service
port:
number: 80
retries:
attempts: 3
perTryTimeout: 2s
Data Management Patterns
Microservices require careful data management to maintain consistency while preserving service autonomy.
Database Per Service Pattern
# user_service/models.py
from sqlalchemy import create_engine, Column, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import uuid
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
email = Column(String, unique=True, nullable=False)
name = Column(String, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
is_active = Column(Boolean, default=True)
class UserRepository:
"""Repository pattern for user data access"""
def __init__(self, database_url: str):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
async def create_user(self, email: str, name: str) -> User:
user = User(email=email, name=name)
self.session.add(user)
self.session.commit()
# Publish domain event
await self._publish_user_created_event(user)
return user
async def _publish_user_created_event(self, user: User):
"""Publish user creation event for other services"""
event = DomainEvent(
event_type="user.created",
aggregate_id=user.id,
timestamp=datetime.utcnow().timestamp(),
data={
'user_id': user.id,
'email': user.email,
'name': user.name
}
)
await event_bus.publish(event)
Saga Pattern for Distributed Transactions
# saga_orchestrator.py - Manages distributed transactions
from enum import Enum
from typing import Dict, List, Callable
import asyncio
class SagaStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
class SagaStep:
def __init__(self, name: str, action: Callable, compensation: Callable):
self.name = name
self.action = action
self.compensation = compensation
self.completed = False
class SagaOrchestrator:
"""Orchestrates distributed transactions across microservices"""
def __init__(self):
self.sagas: Dict[str, List[SagaStep]] = {}
def define_saga(self, saga_name: str, steps: List[SagaStep]):
"""Define a saga workflow"""
self.sagas[saga_name] = steps
async def execute_saga(self, saga_name: str, context: dict) -> bool:
"""Execute saga with automatic compensation on failure"""
steps = self.sagas.get(saga_name, [])
completed_steps = []
try:
for step in steps:
print(f"Executing step: {step.name}")
await step.action(context)
step.completed = True
completed_steps.append(step)
return True
except Exception as e:
print(f"Saga failed at step {step.name}: {e}")
# Execute compensation in reverse order
for step in reversed(completed_steps):
try:
print(f"Compensating step: {step.name}")
await step.compensation(context)
except Exception as comp_error:
print(f"Compensation failed for {step.name}: {comp_error}")
return False
# Example: Funeral booking saga
async def reserve_facility(context: dict):
"""Reserve funeral facility"""
facility_id = context['facility_id']
date = context['date']
# Call facility service to reserve
print(f"Reserved facility {facility_id} for {date}")
async def compensate_facility_reservation(context: dict):
"""Cancel facility reservation"""
facility_id = context['facility_id']
# Call facility service to cancel
print(f"Cancelled facility reservation {facility_id}")
async def process_payment(context: dict):
"""Process payment for funeral services"""
amount = context['amount']
# Call payment service
print(f"Processed payment of ${amount}")
async def refund_payment(context: dict):
"""Refund payment"""
amount = context['amount']
# Call payment service for refund
print(f"Refunded payment of ${amount}")
async def send_confirmation(context: dict):
"""Send booking confirmation"""
email = context['email']
# Call notification service
print(f"Sent confirmation to {email}")
async def send_cancellation(context: dict):
"""Send cancellation notice"""
email = context['email']
# Call notification service
print(f"Sent cancellation notice to {email}")
# Define funeral booking saga
saga_orchestrator = SagaOrchestrator()
saga_orchestrator.define_saga("funeral_booking", [
SagaStep("reserve_facility", reserve_facility, compensate_facility_reservation),
SagaStep("process_payment", process_payment, refund_payment),
SagaStep("send_confirmation", send_confirmation, send_cancellation)
])
Monitoring and Observability
Production microservices require comprehensive monitoring, logging, and tracing capabilities.
Structured Logging and Metrics
# observability.py - Monitoring and logging utilities
import logging
import time
from functools import wraps
from typing import Dict, Any
import json
from prometheus_client import Counter, Histogram, generate_latest
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Prometheus metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
class StructuredLogger:
"""Structured logging for microservices"""
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
# Configure structured logging
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_request(self, method: str, endpoint: str, status_code: int,
duration: float, user_id: str = None, **kwargs):
"""Log HTTP request with structured data"""
log_data = {
'service': self.service_name,
'type': 'http_request',
'method': method,
'endpoint': endpoint,
'status_code': status_code,
'duration_ms': duration * 1000,
'user_id': user_id,
**kwargs
}
self.logger.info(json.dumps(log_data))
# Update Prometheus metrics
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status_code).inc()
REQUEST_DURATION.observe(duration)
def monitor_performance(logger: StructuredLogger):
"""Decorator for monitoring function performance"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
# Start distributed trace
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(func.__name__) as span:
try:
result = await func(*args, **kwargs)
span.set_attribute("success", True)
return result
except Exception as e:
span.set_attribute("success", False)
span.set_attribute("error", str(e))
logger.logger.error(f"Function {func.__name__} failed: {e}")
raise
finally:
duration = time.time() - start_time
span.set_attribute("duration", duration)
return wrapper
return decorator
# Usage in service
logger = StructuredLogger("user-service")
@monitor_performance(logger)
async def create_user_endpoint(user_data: dict):
"""Monitored user creation endpoint"""
# Business logic here
user = await user_repository.create_user(
email=user_data['email'],
name=user_data['name']
)
logger.log_request(
method="POST",
endpoint="/users",
status_code=201,
duration=0.150,
user_id=user.id,
action="user_created"
)
return user
Best Practices and Production Considerations
Circuit Breaker Pattern
# circuit_breaker.py - Fault tolerance pattern
import asyncio
import time
from enum import Enum
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker for service resilience"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
# Reset on success
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise e
# Usage with external service calls
notification_circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30)
async def send_notification_with_breaker(email: str, message: str):
"""Send notification with circuit breaker protection"""
return await notification_circuit_breaker.call(
send_notification, email, message
)
Conclusion
Building scalable microservices with Python requires careful attention to architecture patterns, service communication, containerization, and operational concerns. The patterns demonstrated hereâfrom event-driven communication to saga orchestrationâprovide the foundation for robust, maintainable systems.
At Custom Logic, we've successfully implemented these patterns in production systems like Funeral Manager, where microservices architecture enables independent scaling of user management, facility booking, and notification services. This approach allows our development teams to work autonomously while maintaining system coherence and reliability.
The key to successful microservices adoption lies in starting simple, implementing proper monitoring from day one, and gradually evolving your architecture as requirements become clearer. Whether you're building a new system or decomposing a monolith, these patterns provide a solid foundation for scalable, maintainable Python microservices.
Ready to architect your next microservices system? Contact Custom Logic to discuss how we can help you design and implement scalable, production-ready microservices that grow with your business needs.