Python Performance Optimization: Advanced Techniques for Production Systems
Performance optimization in Python is often misunderstood as simply "making code faster." In reality, it's about understanding your application's bottlenecks, measuring impact systematically, and applying the right techniques at the right scale. At Custom Logic, we've learned that sustainable performance improvements come from a methodical approach combining profiling, algorithmic improvements, and architectural decisions.
Understanding Performance Bottlenecks
Before optimizing anything, you need to identify where your application actually spends its time. The most common performance issues we encounter in enterprise Python applications fall into several categories:
CPU-Bound Operations
import time
import cProfile
import pstats
from functools import wraps
def profile_performance(func):
"""Decorator to profile function performance"""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions
return result
return wrapper
@profile_performance
def inefficient_calculation(n):
"""Example of CPU-bound operation that needs optimization"""
total = 0
for i in range(n):
for j in range(i):
total += i * j
return total
# Optimized version using mathematical approach
def optimized_calculation(n):
"""Mathematically optimized version"""
# Using the formula for sum of products
return sum(i * (i-1) * i // 2 for i in range(n))
Memory Management Issues
import tracemalloc
import sys
from typing import Generator, List
class MemoryProfiler:
"""Context manager for memory profiling"""
def __enter__(self):
tracemalloc.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Current memory usage: {current / 1024 / 1024:.2f} MB")
print(f"Peak memory usage: {peak / 1024 / 1024:.2f} MB")
# Memory-inefficient approach
def load_large_dataset_inefficient(filename: str) -> List[dict]:
"""Loads entire dataset into memory at once"""
data = []
with open(filename, 'r') as file:
for line in file:
# Simulate processing each line into a dictionary
data.append({'processed': line.strip().upper()})
return data
# Memory-efficient generator approach
def load_large_dataset_efficient(filename: str) -> Generator[dict, None, None]:
"""Processes data line by line using generators"""
with open(filename, 'r') as file:
for line in file:
yield {'processed': line.strip().upper()}
# Usage comparison
def demonstrate_memory_efficiency():
with MemoryProfiler():
# This would consume significant memory for large files
# data = load_large_dataset_inefficient('large_file.txt')
# This processes data with constant memory usage
for item in load_large_dataset_efficient('large_file.txt'):
# Process each item individually
pass
Advanced Profiling Techniques
Effective optimization starts with comprehensive profiling. Here's our systematic approach to identifying performance bottlenecks:
Line-by-Line Profiling
# Install: pip install line_profiler
# Usage: kernprof -l -v script.py
@profile # This decorator is added by line_profiler
def complex_data_processing(data):
"""Function that processes complex data structures"""
# Step 1: Data validation (potentially slow)
validated_data = []
for item in data:
if isinstance(item, dict) and 'value' in item:
validated_data.append(item)
# Step 2: Mathematical operations (CPU intensive)
processed_data = []
for item in validated_data:
result = {
'original': item['value'],
'squared': item['value'] ** 2,
'sqrt': item['value'] ** 0.5,
'log': __import__('math').log(item['value']) if item['value'] > 0 else 0
}
processed_data.append(result)
# Step 3: Aggregation (memory intensive)
total = sum(item['squared'] for item in processed_data)
return total, processed_data
# Optimized version using list comprehensions and built-ins
def optimized_data_processing(data):
"""Optimized version of the same function"""
import math
# Combined validation and processing in single pass
processed_data = [
{
'original': item['value'],
'squared': item['value'] ** 2,
'sqrt': math.sqrt(item['value']),
'log': math.log(item['value']) if item['value'] > 0 else 0
}
for item in data
if isinstance(item, dict) and 'value' in item and item['value'] > 0
]
total = sum(item['squared'] for item in processed_data)
return total, processed_data
Async Performance Monitoring
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
class AsyncPerformanceMonitor:
"""Monitor performance of async operations"""
def __init__(self):
self.metrics = {}
async def timed_request(self, session: aiohttp.ClientSession,
url: str, method: str = 'GET') -> Dict[str, Any]:
"""Make a timed HTTP request"""
start_time = time.time()
try:
async with session.request(method, url) as response:
data = await response.text()
duration = time.time() - start_time
self.metrics[url] = {
'duration': duration,
'status': response.status,
'size': len(data)
}
return {
'url': url,
'data': data,
'duration': duration,
'status': response.status
}
except Exception as e:
duration = time.time() - start_time
self.metrics[url] = {
'duration': duration,
'error': str(e)
}
raise
async def fetch_multiple_apis(urls: List[str]) -> List[Dict[str, Any]]:
"""Efficiently fetch multiple APIs concurrently"""
monitor = AsyncPerformanceMonitor()
async with aiohttp.ClientSession() as session:
tasks = [monitor.timed_request(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Print performance metrics
print("API Performance Metrics:")
for url, metrics in monitor.metrics.items():
if 'error' not in metrics:
print(f"{url}: {metrics['duration']:.3f}s, "
f"Status: {metrics['status']}, "
f"Size: {metrics['size']} bytes")
else:
print(f"{url}: Error - {metrics['error']}")
return [r for r in results if not isinstance(r, Exception)]
Algorithmic Optimizations
The biggest performance gains often come from choosing better algorithms rather than micro-optimizations:
Data Structure Selection
import bisect
from collections import defaultdict, deque
from typing import Set, List, Dict
class OptimizedDataStructures:
"""Examples of choosing the right data structure for performance"""
def __init__(self):
self.sorted_list = [] # For binary search operations
self.lookup_dict = {} # For O(1) lookups
self.frequency_counter = defaultdict(int) # For counting
self.recent_items = deque(maxlen=1000) # For LRU-style operations
def add_item_optimized(self, item: str, value: float):
"""Add item using optimized data structures"""
# Maintain sorted list for range queries
bisect.insort(self.sorted_list, (value, item))
# Direct lookup dictionary
self.lookup_dict[item] = value
# Count frequencies
self.frequency_counter[item] += 1
# Track recent items
self.recent_items.append(item)
def find_items_in_range(self, min_val: float, max_val: float) -> List[str]:
"""Find items in value range using binary search - O(log n + k)"""
start_idx = bisect.bisect_left(self.sorted_list, (min_val, ''))
end_idx = bisect.bisect_right(self.sorted_list, (max_val, '~'))
return [item for value, item in self.sorted_list[start_idx:end_idx]]
def get_top_frequent_items(self, n: int = 10) -> List[tuple]:
"""Get most frequent items efficiently"""
return sorted(self.frequency_counter.items(),
key=lambda x: x[1], reverse=True)[:n]
# Comparison with naive approach
class NaiveDataStructures:
"""Inefficient implementation for comparison"""
def __init__(self):
self.items = [] # List of (item, value) tuples
def add_item_naive(self, item: str, value: float):
"""Naive implementation - O(n) for each operation"""
self.items.append((item, value))
def find_items_in_range_naive(self, min_val: float, max_val: float) -> List[str]:
"""Naive range search - O(n)"""
return [item for item, value in self.items
if min_val <= value <= max_val]
Caching and Memoization
import functools
import time
from typing import Dict, Any, Callable
import hashlib
import pickle
class AdvancedCache:
"""Advanced caching with TTL and size limits"""
def __init__(self, max_size: int = 1000, ttl: int = 3600):
self.cache: Dict[str, Dict[str, Any]] = {}
self.max_size = max_size
self.ttl = ttl
def _is_expired(self, entry: Dict[str, Any]) -> bool:
"""Check if cache entry has expired"""
return time.time() - entry['timestamp'] > self.ttl
def _evict_expired(self):
"""Remove expired entries"""
current_time = time.time()
expired_keys = [
key for key, entry in self.cache.items()
if current_time - entry['timestamp'] > self.ttl
]
for key in expired_keys:
del self.cache[key]
def _make_key(self, func: Callable, args: tuple, kwargs: dict) -> str:
"""Create a cache key from function and arguments"""
key_data = {
'func': func.__name__,
'args': args,
'kwargs': sorted(kwargs.items())
}
return hashlib.md5(pickle.dumps(key_data)).hexdigest()
def cached_call(self, func: Callable, *args, **kwargs):
"""Execute function with caching"""
# Clean expired entries
self._evict_expired()
# Generate cache key
cache_key = self._make_key(func, args, kwargs)
# Check cache
if cache_key in self.cache and not self._is_expired(self.cache[cache_key]):
return self.cache[cache_key]['result']
# Execute function
result = func(*args, **kwargs)
# Store in cache (with size limit)
if len(self.cache) >= self.max_size:
# Remove oldest entry
oldest_key = min(self.cache.keys(),
key=lambda k: self.cache[k]['timestamp'])
del self.cache[oldest_key]
self.cache[cache_key] = {
'result': result,
'timestamp': time.time()
}
return result
# Usage example with expensive computation
cache = AdvancedCache(max_size=100, ttl=300) # 5-minute TTL
def expensive_computation(n: int, complexity: str = 'high') -> float:
"""Simulate expensive computation"""
time.sleep(0.1) # Simulate processing time
return sum(i ** 2 for i in range(n)) * (2 if complexity == 'high' else 1)
# Cached version
def cached_expensive_computation(n: int, complexity: str = 'high') -> float:
return cache.cached_call(expensive_computation, n, complexity=complexity)
Production Performance Patterns
Based on our experience building scalable systems at Custom Logic, here are the patterns that deliver the most significant performance improvements in production:
Connection Pooling and Resource Management
import asyncio
import aioredis
import asyncpg
from contextlib import asynccontextmanager
from typing import AsyncGenerator
class DatabaseConnectionPool:
"""Efficient database connection pooling"""
def __init__(self, database_url: str, min_connections: int = 5,
max_connections: int = 20):
self.database_url = database_url
self.min_connections = min_connections
self.max_connections = max_connections
self.pool = None
async def initialize(self):
"""Initialize the connection pool"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.min_connections,
max_size=self.max_connections,
command_timeout=60
)
@asynccontextmanager
async def get_connection(self) -> AsyncGenerator[asyncpg.Connection, None]:
"""Get a connection from the pool"""
async with self.pool.acquire() as connection:
yield connection
async def execute_query(self, query: str, *args) -> list:
"""Execute a query using the pool"""
async with self.get_connection() as conn:
return await conn.fetch(query, *args)
async def close(self):
"""Close the connection pool"""
if self.pool:
await self.pool.close()
class CacheManager:
"""Redis cache manager with connection pooling"""
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
async def initialize(self):
"""Initialize Redis connection pool"""
self.redis = await aioredis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True,
max_connections=20
)
async def get_cached_result(self, key: str,
compute_func: Callable,
ttl: int = 3600) -> Any:
"""Get result from cache or compute and cache it"""
# Try to get from cache first
cached_result = await self.redis.get(key)
if cached_result:
return pickle.loads(cached_result.encode())
# Compute result
result = await compute_func()
# Cache the result
await self.redis.setex(
key,
ttl,
pickle.dumps(result).decode('latin1')
)
return result
async def close(self):
"""Close Redis connections"""
if self.redis:
await self.redis.close()
Batch Processing Optimization
import asyncio
from typing import List, Callable, TypeVar, Generic
from dataclasses import dataclass
import time
T = TypeVar('T')
R = TypeVar('R')
@dataclass
class BatchConfig:
"""Configuration for batch processing"""
batch_size: int = 100
max_wait_time: float = 1.0 # seconds
max_concurrent_batches: int = 5
class BatchProcessor(Generic[T, R]):
"""Efficient batch processor for high-throughput operations"""
def __init__(self,
process_batch_func: Callable[[List[T]], List[R]],
config: BatchConfig = None):
self.process_batch_func = process_batch_func
self.config = config or BatchConfig()
self.pending_items: List[T] = []
self.pending_futures: List[asyncio.Future] = []
self.last_batch_time = time.time()
self.semaphore = asyncio.Semaphore(self.config.max_concurrent_batches)
async def process_item(self, item: T) -> R:
"""Process a single item through batching"""
future = asyncio.Future()
# Add item and future to pending lists
self.pending_items.append(item)
self.pending_futures.append(future)
# Check if we should process the batch
should_process = (
len(self.pending_items) >= self.config.batch_size or
time.time() - self.last_batch_time >= self.config.max_wait_time
)
if should_process:
await self._process_pending_batch()
return await future
async def _process_pending_batch(self):
"""Process the current batch of pending items"""
if not self.pending_items:
return
# Extract current batch
batch_items = self.pending_items.copy()
batch_futures = self.pending_futures.copy()
# Clear pending lists
self.pending_items.clear()
self.pending_futures.clear()
self.last_batch_time = time.time()
# Process batch with concurrency control
async with self.semaphore:
try:
results = await asyncio.get_event_loop().run_in_executor(
None, self.process_batch_func, batch_items
)
# Set results for all futures
for future, result in zip(batch_futures, results):
if not future.done():
future.set_result(result)
except Exception as e:
# Set exception for all futures
for future in batch_futures:
if not future.done():
future.set_exception(e)
# Example usage for database operations
async def batch_database_inserts():
"""Example of batch processing for database operations"""
def insert_batch(items: List[dict]) -> List[int]:
"""Simulate batch database insert"""
# In real implementation, this would be a single SQL INSERT
print(f"Inserting batch of {len(items)} items")
return list(range(len(items))) # Return generated IDs
processor = BatchProcessor(
insert_batch,
BatchConfig(batch_size=50, max_wait_time=0.5)
)
# Simulate concurrent item processing
tasks = []
for i in range(200):
task = processor.process_item({'data': f'item_{i}'})
tasks.append(task)
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} items in batches")
Custom Logic Performance Engineering Approach
At Custom Logic, we've developed a systematic approach to performance optimization that has proven effective across our enterprise applications. This methodology focuses on measurable improvements and sustainable practices:
Performance Monitoring Framework
import time
import psutil
import threading
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from contextlib import contextmanager
@dataclass
class PerformanceMetrics:
"""Container for performance metrics"""
cpu_percent: float = 0.0
memory_mb: float = 0.0
execution_time: float = 0.0
function_calls: int = 0
cache_hits: int = 0
cache_misses: int = 0
custom_metrics: Dict[str, float] = field(default_factory=dict)
class PerformanceTracker:
"""Production-ready performance tracking system"""
def __init__(self):
self.metrics_history: List[PerformanceMetrics] = []
self.active_sessions: Dict[str, dict] = {}
self.lock = threading.Lock()
@contextmanager
def track_performance(self, operation_name: str):
"""Context manager for tracking operation performance"""
session_id = f"{operation_name}_{time.time()}"
# Start tracking
start_time = time.time()
start_cpu = psutil.cpu_percent()
start_memory = psutil.virtual_memory().used / 1024 / 1024
with self.lock:
self.active_sessions[session_id] = {
'start_time': start_time,
'start_cpu': start_cpu,
'start_memory': start_memory,
'operation': operation_name
}
try:
yield session_id
finally:
# End tracking
end_time = time.time()
end_cpu = psutil.cpu_percent()
end_memory = psutil.virtual_memory().used / 1024 / 1024
with self.lock:
if session_id in self.active_sessions:
session = self.active_sessions.pop(session_id)
metrics = PerformanceMetrics(
cpu_percent=end_cpu - session['start_cpu'],
memory_mb=end_memory - session['start_memory'],
execution_time=end_time - session['start_time'],
function_calls=1
)
self.metrics_history.append(metrics)
def get_performance_summary(self, last_n: int = 100) -> Dict[str, float]:
"""Get performance summary for recent operations"""
recent_metrics = self.metrics_history[-last_n:]
if not recent_metrics:
return {}
return {
'avg_execution_time': sum(m.execution_time for m in recent_metrics) / len(recent_metrics),
'avg_cpu_usage': sum(m.cpu_percent for m in recent_metrics) / len(recent_metrics),
'avg_memory_usage': sum(m.memory_mb for m in recent_metrics) / len(recent_metrics),
'total_operations': len(recent_metrics)
}
# Global performance tracker instance
performance_tracker = PerformanceTracker()
# Decorator for automatic performance tracking
def track_performance(operation_name: Optional[str] = None):
"""Decorator to automatically track function performance"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
name = operation_name or f"{func.__module__}.{func.__name__}"
with performance_tracker.track_performance(name):
return func(*args, **kwargs)
return wrapper
return decorator
# Example usage
@track_performance("data_processing")
def process_large_dataset(data: List[dict]) -> List[dict]:
"""Example function with performance tracking"""
return [
{**item, 'processed': True, 'timestamp': time.time()}
for item in data
if item.get('valid', True)
]
Conclusion
Python performance optimization is a journey of continuous improvement, not a destination. The key insights from our experience at Custom Logic are:
1. Measure First: Always profile before optimizing. Intuition about bottlenecks is often wrong. 2. Algorithmic Wins: The biggest gains come from better algorithms and data structures, not micro-optimizations. 3. Production Patterns: Connection pooling, caching, and batch processing provide consistent performance improvements. 4. Monitoring Matters: Continuous performance monitoring helps catch regressions early.
Whether you're building high-performance APIs, processing large datasets, or scaling enterprise applications, these techniques provide a solid foundation for Python performance optimization. The methodical approach we've outlined has helped us deliver consistently fast and reliable solutions for our clients.
For organizations looking to optimize their Python applications systematically, Custom Logic offers performance engineering consulting that combines these proven techniques with deep understanding of your specific business requirements.