authentication authorization patterns

--- title: Modern Authentication and Authorization Patterns for Web Applications author: Custom Logic Development Team date: 2025-08-17 description: Comprehensive guide to implementing JWT, OAuth2, and session management patterns with practical examples and security best practices for modern web applications. ---

Modern Authentication and Authorization Patterns for Web Applications

Authentication and authorization are the cornerstones of secure web applications. As applications become more complex and distributed, choosing the right authentication pattern becomes critical for both security and user experience. This guide explores modern authentication patterns, from traditional session-based approaches to cutting-edge JWT and OAuth2 implementations.

Understanding Authentication vs Authorization

Before diving into implementation patterns, it's essential to understand the distinction between authentication and authorization:

  • Authentication verifies who the user is (identity verification)
  • Authorization determines what the user can do (permission verification)

Modern applications often require both, implemented through various patterns that balance security, scalability, and user experience.

Session-Based Authentication Patterns

Traditional Server-Side Sessions

Session-based authentication remains relevant for many applications, particularly those with server-side rendering:

from flask import Flask, session, request, jsonify
import secrets
import redis
from datetime import datetime, timedelta

app = Flask(__name__)
app.secret_key = secrets.token_hex(16)

# Redis for session storage (scalable across multiple servers)
redis_client = redis.Redis(host='localhost', port=6379, db=0)

class SessionManager:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.session_timeout = 3600  # 1 hour
    
    def create_session(self, user_id, user_data):
        session_id = secrets.token_urlsafe(32)
        session_data = {
            'user_id': user_id,
            'created_at': datetime.utcnow().isoformat(),
            'last_accessed': datetime.utcnow().isoformat(),
            **user_data
        }
        
        # Store in Redis with expiration
        self.redis.setex(
            f"session:{session_id}",
            self.session_timeout,
            json.dumps(session_data)
        )
        
        return session_id
    
    def get_session(self, session_id):
        session_data = self.redis.get(f"session:{session_id}")
        if session_data:
            data = json.loads(session_data)
            # Update last accessed time
            data['last_accessed'] = datetime.utcnow().isoformat()
            self.redis.setex(
                f"session:{session_id}",
                self.session_timeout,
                json.dumps(data)
            )
            return data
        return None
    
    def invalidate_session(self, session_id):
        self.redis.delete(f"session:{session_id}")

session_manager = SessionManager(redis_client)

@app.route('/login', methods=['POST'])
def login():
    credentials = request.get_json()
    
    # Validate credentials (implement your authentication logic)
    user = authenticate_user(credentials['username'], credentials['password'])
    
    if user:
        session_id = session_manager.create_session(user['id'], {
            'username': user['username'],
            'role': user['role']
        })
        
        response = jsonify({'message': 'Login successful'})
        response.set_cookie('session_id', session_id, 
                          httponly=True, secure=True, samesite='Strict')
        return response
    
    return jsonify({'error': 'Invalid credentials'}), 401

Secure Session Configuration

// Express.js session configuration with security best practices
const session = require('express-session');
const RedisStore = require('connect-redis')(session);
const redis = require('redis');

const redisClient = redis.createClient({
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
    password: process.env.REDIS_PASSWORD
});

app.use(session({
    store: new RedisStore({ client: redisClient }),
    secret: process.env.SESSION_SECRET,
    resave: false,
    saveUninitialized: false,
    name: 'sessionId', // Don't use default 'connect.sid'
    cookie: {
        secure: process.env.NODE_ENV === 'production', // HTTPS only in production
        httpOnly: true, // Prevent XSS attacks
        maxAge: 1000 * 60 * 60 * 24, // 24 hours
        sameSite: 'strict' // CSRF protection
    },
    rolling: true // Reset expiration on activity
}));

JWT (JSON Web Token) Authentication

JWT Implementation with Refresh Tokens

JWT provides stateless authentication, ideal for distributed systems and APIs:

import jwt
from datetime import datetime, timedelta
from functools import wraps
import secrets

class JWTManager:
    def __init__(self, secret_key, algorithm='HS256'):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.access_token_expiry = timedelta(minutes=15)
        self.refresh_token_expiry = timedelta(days=7)
    
    def generate_tokens(self, user_id, user_data):
        now = datetime.utcnow()
        
        # Access token (short-lived)
        access_payload = {
            'user_id': user_id,
            'iat': now,
            'exp': now + self.access_token_expiry,
            'type': 'access',
            **user_data
        }
        
        # Refresh token (long-lived)
        refresh_payload = {
            'user_id': user_id,
            'iat': now,
            'exp': now + self.refresh_token_expiry,
            'type': 'refresh',
            'jti': secrets.token_urlsafe(16)  # Unique token ID
        }
        
        access_token = jwt.encode(access_payload, self.secret_key, self.algorithm)
        refresh_token = jwt.encode(refresh_payload, self.secret_key, self.algorithm)
        
        return access_token, refresh_token
    
    def verify_token(self, token, token_type='access'):
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            
            if payload.get('type') != token_type:
                raise jwt.InvalidTokenError('Invalid token type')
            
            return payload
        except jwt.ExpiredSignatureError:
            raise jwt.ExpiredSignatureError('Token has expired')
        except jwt.InvalidTokenError:
            raise jwt.InvalidTokenError('Invalid token')
    
    def refresh_access_token(self, refresh_token):
        try:
            payload = self.verify_token(refresh_token, 'refresh')
            
            # Generate new access token
            user_data = {k: v for k, v in payload.items() 
                        if k not in ['user_id', 'iat', 'exp', 'type', 'jti']}
            
            new_access_token, _ = self.generate_tokens(
                payload['user_id'], user_data
            )
            
            return new_access_token
        except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
            raise jwt.InvalidTokenError('Invalid refresh token')

# JWT Authentication decorator
def jwt_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return jsonify({'error': 'Token missing'}), 401
        
        try:
            # Remove 'Bearer ' prefix
            token = token.split(' ')[1] if token.startswith('Bearer ') else token
            payload = jwt_manager.verify_token(token)
            request.current_user = payload
        except jwt.ExpiredSignatureError:
            return jsonify({'error': 'Token expired'}), 401
        except jwt.InvalidTokenError:
            return jsonify({'error': 'Invalid token'}), 401
        
        return f(*args, **kwargs)
    
    return decorated_function

Role-Based Authorization with JWT

from enum import Enum
from functools import wraps

class UserRole(Enum):
    ADMIN = "admin"
    MANAGER = "manager"
    USER = "user"

class Permission(Enum):
    READ_USERS = "read:users"
    WRITE_USERS = "write:users"
    DELETE_USERS = "delete:users"
    READ_REPORTS = "read:reports"
    WRITE_REPORTS = "write:reports"

# Role-permission mapping
ROLE_PERMISSIONS = {
    UserRole.ADMIN: [
        Permission.READ_USERS, Permission.WRITE_USERS, Permission.DELETE_USERS,
        Permission.READ_REPORTS, Permission.WRITE_REPORTS
    ],
    UserRole.MANAGER: [
        Permission.READ_USERS, Permission.WRITE_USERS,
        Permission.READ_REPORTS, Permission.WRITE_REPORTS
    ],
    UserRole.USER: [Permission.READ_REPORTS]
}

def require_permission(permission):
    def decorator(f):
        @wraps(f)
        @jwt_required
        def decorated_function(*args, **kwargs):
            user_role = UserRole(request.current_user.get('role'))
            user_permissions = ROLE_PERMISSIONS.get(user_role, [])
            
            if permission not in user_permissions:
                return jsonify({'error': 'Insufficient permissions'}), 403
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# Usage example
@app.route('/admin/users', methods=['GET'])
@require_permission(Permission.READ_USERS)
def get_users():
    return jsonify({'users': get_all_users()})

OAuth2 Implementation Patterns

OAuth2 Authorization Code Flow

OAuth2 is essential for third-party integrations and single sign-on (SSO):

import requests
from urllib.parse import urlencode, parse_qs
import secrets

class OAuth2Provider:
    def __init__(self, client_id, client_secret, auth_url, token_url, redirect_uri):
        self.client_id = client_id
        self.client_secret = client_secret
        self.auth_url = auth_url
        self.token_url = token_url
        self.redirect_uri = redirect_uri
    
    def get_authorization_url(self, scopes=None, state=None):
        if state is None:
            state = secrets.token_urlsafe(32)
        
        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'state': state
        }
        
        if scopes:
            params['scope'] = ' '.join(scopes)
        
        return f"{self.auth_url}?{urlencode(params)}", state
    
    def exchange_code_for_token(self, code, state):
        data = {
            'grant_type': 'authorization_code',
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'code': code,
            'redirect_uri': self.redirect_uri
        }
        
        response = requests.post(self.token_url, data=data)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Token exchange failed: {response.text}")
    
    def refresh_token(self, refresh_token):
        data = {
            'grant_type': 'refresh_token',
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'refresh_token': refresh_token
        }
        
        response = requests.post(self.token_url, data=data)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Token refresh failed: {response.text}")

# Google OAuth2 example
google_oauth = OAuth2Provider(
    client_id=os.getenv('GOOGLE_CLIENT_ID'),
    client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
    auth_url='https://accounts.google.com/o/oauth2/v2/auth',
    token_url='https://oauth2.googleapis.com/token',
    redirect_uri='https://yourapp.com/auth/google/callback'
)

@app.route('/auth/google')
def google_login():
    auth_url, state = google_oauth.get_authorization_url(
        scopes=['openid', 'email', 'profile']
    )
    
    # Store state in session for CSRF protection
    session['oauth_state'] = state
    
    return redirect(auth_url)

@app.route('/auth/google/callback')
def google_callback():
    code = request.args.get('code')
    state = request.args.get('state')
    
    # Verify state to prevent CSRF attacks
    if state != session.get('oauth_state'):
        return jsonify({'error': 'Invalid state parameter'}), 400
    
    try:
        token_data = google_oauth.exchange_code_for_token(code, state)
        
        # Get user info from Google
        user_info_response = requests.get(
            'https://www.googleapis.com/oauth2/v2/userinfo',
            headers={'Authorization': f"Bearer {token_data['access_token']}"}
        )
        
        user_info = user_info_response.json()
        
        # Create or update user in your system
        user = create_or_update_user_from_oauth(user_info)
        
        # Generate your application's JWT tokens
        access_token, refresh_token = jwt_manager.generate_tokens(
            user['id'], {'username': user['username'], 'role': user['role']}
        )
        
        return jsonify({
            'access_token': access_token,
            'refresh_token': refresh_token,
            'user': user
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 400

Multi-Factor Authentication (MFA)

TOTP (Time-based One-Time Password) Implementation

import pyotp
import qrcode
from io import BytesIO
import base64

class MFAManager:
    def __init__(self):
        self.issuer_name = "Custom Logic"
    
    def generate_secret(self, user_email):
        secret = pyotp.random_base32()
        
        # Create TOTP URI for QR code
        totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
            name=user_email,
            issuer_name=self.issuer_name
        )
        
        return secret, totp_uri
    
    def generate_qr_code(self, totp_uri):
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(totp_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        
        # Convert to base64 for web display
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        img_str = base64.b64encode(buffer.getvalue()).decode()
        
        return f"data:image/png;base64,{img_str}"
    
    def verify_token(self, secret, token):
        totp = pyotp.TOTP(secret)
        return totp.verify(token, valid_window=1)  # Allow 30-second window

@app.route('/mfa/setup', methods=['POST'])
@jwt_required
def setup_mfa():
    user_id = request.current_user['user_id']
    user_email = request.current_user['email']
    
    secret, totp_uri = mfa_manager.generate_secret(user_email)
    qr_code = mfa_manager.generate_qr_code(totp_uri)
    
    # Store secret temporarily (user must verify before enabling)
    redis_client.setex(f"mfa_setup:{user_id}", 300, secret)  # 5 minutes
    
    return jsonify({
        'secret': secret,
        'qr_code': qr_code,
        'manual_entry_key': secret
    })

@app.route('/mfa/verify', methods=['POST'])
@jwt_required
def verify_mfa_setup():
    user_id = request.current_user['user_id']
    token = request.json.get('token')
    
    # Get temporary secret
    secret = redis_client.get(f"mfa_setup:{user_id}")
    if not secret:
        return jsonify({'error': 'Setup session expired'}), 400
    
    secret = secret.decode('utf-8')
    
    if mfa_manager.verify_token(secret, token):
        # Save secret to user profile and enable MFA
        update_user_mfa_secret(user_id, secret)
        redis_client.delete(f"mfa_setup:{user_id}")
        
        return jsonify({'message': 'MFA enabled successfully'})
    
    return jsonify({'error': 'Invalid token'}), 400

Real-World Implementation: JobFinders Authentication System

At JobFinders, we've implemented a hybrid authentication system that combines the best of multiple patterns to handle diverse user types and security requirements:

Hybrid Authentication Architecture

class JobFindersAuthSystem:
    def __init__(self):
        self.jwt_manager = JWTManager(os.getenv('JWT_SECRET'))
        self.session_manager = SessionManager(redis_client)
        self.mfa_manager = MFAManager()
        self.oauth_providers = {
            'google': OAuth2Provider(...),
            'linkedin': OAuth2Provider(...),
            'github': OAuth2Provider(...)
        }
    
    def authenticate_user(self, login_method, credentials):
        """
        Unified authentication method supporting multiple login types
        """
        if login_method == 'email_password':
            return self._authenticate_email_password(credentials)
        elif login_method == 'oauth':
            return self._authenticate_oauth(credentials)
        elif login_method == 'api_key':
            return self._authenticate_api_key(credentials)
        else:
            raise ValueError("Unsupported authentication method")
    
    def _authenticate_email_password(self, credentials):
        user = verify_user_credentials(
            credentials['email'], 
            credentials['password']
        )
        
        if not user:
            return None
        
        # Check if MFA is enabled
        if user.get('mfa_enabled'):
            # Return partial token requiring MFA completion
            partial_token = self.jwt_manager.generate_partial_token(user['id'])
            return {
                'requires_mfa': True,
                'partial_token': partial_token,
                'user_id': user['id']
            }
        
        # Generate full access tokens
        access_token, refresh_token = self.jwt_manager.generate_tokens(
            user['id'], {
                'email': user['email'],
                'role': user['role'],
                'permissions': user['permissions']
            }
        )
        
        return {
            'access_token': access_token,
            'refresh_token': refresh_token,
            'user': user
        }
    
    def complete_mfa_authentication(self, partial_token, mfa_token):
        """
        Complete authentication after MFA verification
        """
        try:
            payload = self.jwt_manager.verify_partial_token(partial_token)
            user = get_user_by_id(payload['user_id'])
            
            if self.mfa_manager.verify_token(user['mfa_secret'], mfa_token):
                # Generate full access tokens
                access_token, refresh_token = self.jwt_manager.generate_tokens(
                    user['id'], {
                        'email': user['email'],
                        'role': user['role'],
                        'permissions': user['permissions']
                    }
                )
                
                return {
                    'access_token': access_token,
                    'refresh_token': refresh_token,
                    'user': user
                }
            
        except jwt.InvalidTokenError:
            pass
        
        return None

Advanced Authorization Patterns

class PermissionSystem:
    """
    Attribute-based access control (ABAC) for fine-grained permissions
    """
    
    def __init__(self):
        self.policies = {}
    
    def register_policy(self, resource, action, policy_func):
        """
        Register a policy function for resource-action combinations
        """
        key = f"{resource}:{action}"
        self.policies[key] = policy_func
    
    def check_permission(self, user, resource, action, context=None):
        """
        Check if user has permission for resource-action with context
        """
        key = f"{resource}:{action}"
        policy = self.policies.get(key)
        
        if not policy:
            return False
        
        return policy(user, resource, context or {})

# Example policies for JobFinders
permission_system = PermissionSystem()

def can_view_job_posting(user, resource, context):
    """
    Users can view job postings if:
    - Job is public, OR
    - User is the job poster, OR
    - User is an admin, OR
    - User has applied to the job
    """
    job = context.get('job')
    
    if job['is_public']:
        return True
    
    if user['role'] == 'admin':
        return True
    
    if job['posted_by'] == user['id']:
        return True
    
    if user['id'] in job.get('applicants', []):
        return True
    
    return False

def can_edit_job_posting(user, resource, context):
    """
    Users can edit job postings if:
    - User is the job poster, OR
    - User is an admin
    """
    job = context.get('job')
    
    return (user['role'] == 'admin' or 
            job['posted_by'] == user['id'])

# Register policies
permission_system.register_policy('job', 'view', can_view_job_posting)
permission_system.register_policy('job', 'edit', can_edit_job_posting)

# Usage in route handlers
@app.route('/jobs/<job_id>')
@jwt_required
def get_job(job_id):
    job = get_job_by_id(job_id)
    
    if not permission_system.check_permission(
        request.current_user, 'job', 'view', {'job': job}
    ):
        return jsonify({'error': 'Access denied'}), 403
    
    return jsonify({'job': job})

Security Best Practices

Token Security and Storage

// Frontend token management best practices
class TokenManager {
    constructor() {
        this.accessToken = null;
        this.refreshToken = null;
        this.tokenRefreshTimer = null;
    }
    
    setTokens(accessToken, refreshToken) {
        this.accessToken = accessToken;
        
        // Store refresh token in httpOnly cookie (more secure)
        // Access token in memory only (prevents XSS attacks)
        document.cookie = `refreshToken=${refreshToken}; HttpOnly; Secure; SameSite=Strict; Path=/auth/refresh`;
        
        // Decode JWT to get expiration time
        const payload = JSON.parse(atob(accessToken.split('.')[1]));
        const expiresIn = (payload.exp * 1000) - Date.now() - 60000; // Refresh 1 minute early
        
        // Set up automatic token refresh
        this.scheduleTokenRefresh(expiresIn);
    }
    
    scheduleTokenRefresh(delay) {
        if (this.tokenRefreshTimer) {
            clearTimeout(this.tokenRefreshTimer);
        }
        
        this.tokenRefreshTimer = setTimeout(() => {
            this.refreshAccessToken();
        }, delay);
    }
    
    async refreshAccessToken() {
        try {
            const response = await fetch('/auth/refresh', {
                method: 'POST',
                credentials: 'include' // Include httpOnly cookies
            });
            
            if (response.ok) {
                const data = await response.json();
                this.setTokens(data.access_token, data.refresh_token);
            } else {
                // Refresh failed, redirect to login
                this.logout();
            }
        } catch (error) {
            console.error('Token refresh failed:', error);
            this.logout();
        }
    }
    
    logout() {
        this.accessToken = null;
        if (this.tokenRefreshTimer) {
            clearTimeout(this.tokenRefreshTimer);
        }
        
        // Clear refresh token cookie
        document.cookie = 'refreshToken=; expires=Thu, 01 Jan 1970 00:00:00 UTC; path=/auth/refresh;';
        
        // Redirect to login
        window.location.href = '/login';
    }
    
    getAuthHeader() {
        return this.accessToken ? `Bearer ${this.accessToken}` : null;
    }
}

Rate Limiting and Brute Force Protection

from functools import wraps
import time

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def is_rate_limited(self, key, limit, window):
        """
        Check if key has exceeded rate limit
        """
        current_time = int(time.time())
        window_start = current_time - window
        
        # Remove old entries
        self.redis.zremrangebyscore(key, 0, window_start)
        
        # Count current requests
        current_count = self.redis.zcard(key)
        
        if current_count >= limit:
            return True
        
        # Add current request
        self.redis.zadd(key, {str(current_time): current_time})
        self.redis.expire(key, window)
        
        return False

def rate_limit(limit=5, window=300, key_func=None):
    """
    Rate limiting decorator
    limit: number of requests
    window: time window in seconds
    key_func: function to generate rate limit key
    """
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            if key_func:
                key = key_func(request)
            else:
                key = f"rate_limit:{request.remote_addr}:{f.__name__}"
            
            if rate_limiter.is_rate_limited(key, limit, window):
                return jsonify({'error': 'Rate limit exceeded'}), 429
            
            return f(*args, **kwargs)
        return decorated_function
    return decorator

# Brute force protection for login
@app.route('/login', methods=['POST'])
@rate_limit(limit=5, window=900, key_func=lambda req: f"login_attempts:{req.json.get('email', req.remote_addr)}")
def login():
    # Login implementation
    pass

Conclusion

Modern authentication and authorization patterns require careful consideration of security, scalability, and user experience. The choice between session-based authentication, JWT tokens, or OAuth2 depends on your specific requirements:

  • Session-based authentication works well for traditional web applications with server-side rendering
  • JWT tokens are ideal for APIs and single-page applications requiring stateless authentication
  • OAuth2 is essential for third-party integrations and social login features
  • Multi-factor authentication adds crucial security for sensitive applications

At Custom Logic, we help businesses implement robust authentication systems tailored to their specific needs. Whether you're building a simple web application or a complex distributed system like JobFinders, our team can design and implement authentication patterns that balance security with user experience.

The key to successful authentication implementation lies in understanding your threat model, user requirements, and system architecture. By combining multiple patterns and following security best practices, you can create authentication systems that protect your users while providing seamless access to your applications.

For businesses looking to implement or upgrade their authentication systems, Custom Logic offers comprehensive security consulting and development services. Our experience with platforms like JobFinders, Funeral Manager, and EOD Stock API ensures that your authentication system will be both secure and scalable.