Shahzad Bhatti Welcome to my ramblings and rants!

September 9, 2025

Dynamic Facets and Runtime Behavior Composition: Beyond Adaptive Object Models

Filed under: Computing — admin @ 7:28 pm

Background

In my previous blog of the Adaptive Object Model (AOM) pattern, I focused on dynamic schema evolution and metadata-driven architectures. However, there’s a complementary pattern that addresses a different but equally important challenge: how to compose behavior dynamically at runtime without modifying existing objects. I first saw this pattern in Voyager ORB’s “Dynamic Aggregation” and San Francisco Design Patterns: Blueprints for Business Software (Part-IV Dynamic Behavioral Patterns) in early 2000s, which has profound implications for building extensible systems. The facets pattern, also known as dynamic aggregation or extension objects, allows secondary objects (facets) to be attached to primary objects at runtime, effectively extending their capabilities without inheritance or modification. Unlike AOM, which focuses on schema flexibility, facets address behavioral composition – the ability to mix and match capabilities based on runtime requirements.

Facets Pattern

The facets pattern emerged from several key observations about real-world software systems:

  • Interface Segregation: Not every object needs every capability all the time. A User object might need audit trail capabilities in some contexts, caching in others, and validation in yet others.
  • Runtime Composition: The specific mix of capabilities often depends on runtime context – user permissions, configuration settings, or environmental factors that cannot be determined at compile time.
  • Separation of Concerns: Cross-cutting concerns like logging, security, and persistence should be composable without polluting domain objects.

Voyager ORB’s implementation demonstrated these principles elegantly:

// Voyager ORB example - attaching an account facet to an employee
IEmployee employee = new Employee("joe", "234-44-2678");
IFacets facets = Facets.of(employee);
IAccount account = (IAccount) facets.of(IAccount.class);
account.deposit(2000);

The beauty of this approach is that the Employee class knows nothing about accounting capabilities, yet the object can seamlessly provide financial operations when needed.

Modern Implementations

Let’s explore how this pattern can be implemented in modern languages, taking advantage of their unique strengths while maintaining the core principles.

Rust Implementation: Type-Safe Facet Composition

Rust’s type system and trait system provide excellent foundations for type-safe facet composition:

use std::collections::HashMap;
use std::any::{Any, TypeId};
use std::sync::RwLock;

// Core facet trait that all facets must implement
pub trait Facet: Any + Send + Sync {
    fn as_any(&self) -> &dyn Any;
    fn as_any_mut(&mut self) -> &mut dyn Any;
}

// Faceted object that can have facets attached
pub struct FacetedObject {
    facets: RwLock<HashMap<TypeId, Box<dyn Facet>>>,
    core_object: Box<dyn Any + Send + Sync>,
}

impl FacetedObject {
    pub fn new<T: Any + Send + Sync>(core: T) -> Self {
        Self {
            facets: RwLock::new(HashMap::new()),
            core_object: Box::new(core),
        }
    }

    // Attach a facet to this object
    pub fn attach_facet<F: Facet + 'static>(&self, facet: F) -> Result<(), String> {
        let type_id = TypeId::of::<F>();
        let mut facets = self.facets.write()
            .map_err(|_| "Failed to acquire write lock")?;
        
        if facets.contains_key(&type_id) {
            return Err(format!("Facet of type {:?} already attached", type_id));
        }
        
        facets.insert(type_id, Box::new(facet));
        Ok(())
    }

    // Execute an operation that requires a specific facet (safe callback pattern)
    pub fn with_facet<F: Facet + 'static, R>(
        &self, 
        operation: impl FnOnce(&F) -> R
    ) -> Result<R, String> {
        let facets = self.facets.read()
            .map_err(|_| "Failed to acquire read lock")?;
        let type_id = TypeId::of::<F>();
        
        if let Some(facet) = facets.get(&type_id) {
            if let Some(typed_facet) = facet.as_any().downcast_ref::<F>() {
                Ok(operation(typed_facet))
            } else {
                Err("Failed to downcast facet".to_string())
            }
        } else {
            Err(format!("Required facet not found: {:?}", type_id))
        }
    }

    // Execute a mutable operation on a facet
    pub fn with_facet_mut<F: Facet + 'static, R>(
        &self,
        operation: impl FnOnce(&mut F) -> R
    ) -> Result<R, String> {
        let mut facets = self.facets.write()
            .map_err(|_| "Failed to acquire write lock")?;
        let type_id = TypeId::of::<F>();
        
        if let Some(facet) = facets.get_mut(&type_id) {
            if let Some(typed_facet) = facet.as_any_mut().downcast_mut::<F>() {
                Ok(operation(typed_facet))
            } else {
                Err("Failed to downcast facet".to_string())
            }
        } else {
            Err(format!("Required facet not found: {:?}", type_id))
        }
    }

    // Check if a facet is attached
    pub fn has_facet<F: Facet + 'static>(&self) -> bool {
        let facets = self.facets.read().unwrap();
        let type_id = TypeId::of::<F>();
        facets.contains_key(&type_id)
    }

    // Get the core object
    pub fn get_core<T: 'static>(&self) -> Option<&T> {
        self.core_object.downcast_ref::<T>()
    }
}

// Example domain object
#[derive(Debug)]
pub struct Employee {
    pub name: String,
    pub id: String,
    pub department: String,
}

impl Employee {
    pub fn new(name: &str, id: &str, department: &str) -> Self {
        Self {
            name: name.to_string(),
            id: id.to_string(),
            department: department.to_string(),
        }
    }
}

// Account facet for financial operations
#[derive(Debug)]
pub struct AccountFacet {
    balance: f64,
    account_number: String,
}

impl AccountFacet {
    pub fn new(account_number: &str) -> Self {
        Self {
            balance: 0.0,
            account_number: account_number.to_string(),
        }
    }

    pub fn deposit(&mut self, amount: f64) -> Result<f64, String> {
        if amount <= 0.0 {
            return Err("Deposit amount must be positive".to_string());
        }
        self.balance += amount;
        Ok(self.balance)
    }

    pub fn withdraw(&mut self, amount: f64) -> Result<f64, String> {
        if amount <= 0.0 {
            return Err("Withdrawal amount must be positive".to_string());
        }
        if amount > self.balance {
            return Err("Insufficient funds".to_string());
        }
        self.balance -= amount;
        Ok(self.balance)
    }

    pub fn get_balance(&self) -> f64 {
        self.balance
    }

    pub fn get_account_number(&self) -> &str {
        &self.account_number
    }
}

impl Facet for AccountFacet {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

// Audit trail facet for tracking operations
#[derive(Debug)]
pub struct AuditFacet {
    entries: Vec<AuditEntry>,
}

#[derive(Debug, Clone)]
pub struct AuditEntry {
    timestamp: std::time::SystemTime,
    operation: String,
    details: String,
}

impl AuditFacet {
    pub fn new() -> Self {
        Self {
            entries: Vec::new(),
        }
    }

    pub fn log_operation(&mut self, operation: &str, details: &str) {
        self.entries.push(AuditEntry {
            timestamp: std::time::SystemTime::now(),
            operation: operation.to_string(),
            details: details.to_string(),
        });
    }

    pub fn get_audit_trail(&self) -> &[AuditEntry] {
        &self.entries
    }

    pub fn get_recent_entries(&self, count: usize) -> &[AuditEntry] {
        let start = if self.entries.len() > count {
            self.entries.len() - count
        } else {
            0
        };
        &self.entries[start..]
    }
}

impl Facet for AuditFacet {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

// Permission facet for access control
#[derive(Debug)]
pub struct PermissionFacet {
    permissions: HashMap<String, bool>,
    role: String,
}

impl PermissionFacet {
    pub fn new(role: &str) -> Self {
        let mut permissions = HashMap::new();
        
        // Define role-based permissions
        match role {
            "admin" => {
                permissions.insert("read".to_string(), true);
                permissions.insert("write".to_string(), true);
                permissions.insert("delete".to_string(), true);
                permissions.insert("financial_operations".to_string(), true);
            },
            "manager" => {
                permissions.insert("read".to_string(), true);
                permissions.insert("write".to_string(), true);
                permissions.insert("financial_operations".to_string(), true);
            },
            "employee" => {
                permissions.insert("read".to_string(), true);
            },
            _ => {}
        }

        Self {
            permissions,
            role: role.to_string(),
        }
    }

    pub fn has_permission(&self, permission: &str) -> bool {
        self.permissions.get(permission).copied().unwrap_or(false)
    }

    pub fn grant_permission(&mut self, permission: &str) {
        self.permissions.insert(permission.to_string(), true);
    }

    pub fn revoke_permission(&mut self, permission: &str) {
        self.permissions.insert(permission.to_string(), false);
    }

    pub fn get_role(&self) -> &str {
        &self.role
    }
}

impl Facet for PermissionFacet {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
}

// Composite operations that work across facets
pub struct EmployeeOperations;

impl EmployeeOperations {
    pub fn perform_financial_operation<F>(
        employee_obj: &FacetedObject,
        mut operation: F,
    ) -> Result<String, String> 
    where
        F: FnMut(&mut AccountFacet) -> Result<f64, String>,
    {
        // Check permissions first
        let has_permission = employee_obj.with_facet::<PermissionFacet, bool>(|permissions| {
            permissions.has_permission("financial_operations")
        }).unwrap_or(false);

        if !has_permission {
            return Err("Access denied: insufficient permissions for financial operations".to_string());
        }

        // Get employee info for logging
        let employee_name = employee_obj.get_core::<Employee>()
            .map(|emp| emp.name.clone())
            .unwrap_or_else(|| "Unknown".to_string());

        // Perform the operation
        let result = employee_obj.with_facet_mut::<AccountFacet, Result<f64, String>>(|account| {
            operation(account)
        })?;

        let balance = result?;

        // Log the operation if audit facet is present
        let _ = employee_obj.with_facet_mut::<AuditFacet, ()>(|audit| {
            audit.log_operation("financial_operation", &format!("New balance: {}", balance));
        });

        Ok(format!("Financial operation completed for {}. New balance: {}", employee_name, balance))
    }

    pub fn get_employee_summary(employee_obj: &FacetedObject) -> String {
        let mut summary = String::new();

        // Core employee information
        if let Some(employee) = employee_obj.get_core::<Employee>() {
            summary.push_str(&format!("Employee: {} (ID: {})\n", employee.name, employee.id));
            summary.push_str(&format!("Department: {}\n", employee.department));
        }

        // Account information if available
        let account_info = employee_obj.with_facet::<AccountFacet, String>(|account| {
            format!("Account: {} (Balance: ${:.2})\n", 
                account.get_account_number(), account.get_balance())
        }).unwrap_or_else(|_| "No account information\n".to_string());
        summary.push_str(&account_info);

        // Permission information if available
        let permission_info = employee_obj.with_facet::<PermissionFacet, String>(|permissions| {
            format!("Role: {}\n", permissions.get_role())
        }).unwrap_or_else(|_| "No permission information\n".to_string());
        summary.push_str(&permission_info);

        // Audit information if available
        let audit_info = employee_obj.with_facet::<AuditFacet, String>(|audit| {
            let recent_entries = audit.get_recent_entries(3);
            if !recent_entries.is_empty() {
                let mut info = "Recent Activity:\n".to_string();
                for entry in recent_entries {
                    info.push_str(&format!("  - {:?}: {} ({})\n", 
                        entry.timestamp,
                        entry.operation, 
                        entry.details));
                }
                info
            } else {
                "No recent activity\n".to_string()
            }
        }).unwrap_or_else(|_| "No audit information\n".to_string());
        summary.push_str(&audit_info);

        summary
    }
}

// Usage example
fn example_usage() -> Result<(), String> {
    println!("=== Dynamic Facets Example ===");

    // Create an employee
    let employee = Employee::new("Alice Johnson", "EMP001", "Engineering");
    let employee_obj = FacetedObject::new(employee);

    // Attach different facets based on requirements
    employee_obj.attach_facet(AccountFacet::new("ACC001"))?;
    employee_obj.attach_facet(PermissionFacet::new("manager"))?;
    employee_obj.attach_facet(AuditFacet::new())?;

    println!("Facets attached successfully!");

    // Use facets through the composite object
    let summary = EmployeeOperations::get_employee_summary(&employee_obj);
    println!("\nEmployee Summary:\n{}", summary);

    // Attempt financial operation (deposit)
    let result = EmployeeOperations::perform_financial_operation(
        &employee_obj,
        |account| account.deposit(1000.0)
    )?;
    println!("Deposit result: {}", result);

    // Attempt another financial operation (withdrawal)
    let result = EmployeeOperations::perform_financial_operation(
        &employee_obj,
        |account| account.withdraw(250.0)
    )?;
    println!("Withdrawal result: {}", result);

    // Display final summary
    let final_summary = EmployeeOperations::get_employee_summary(&employee_obj);
    println!("\nFinal Employee Summary:\n{}", final_summary);

    Ok(())
}

fn main() {
    match example_usage() {
        Ok(_) => println!("\nFacet composition example completed successfully."),
        Err(e) => eprintln!("Error: {}", e),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_facet_attachment() {
        let employee = Employee::new("Test User", "TEST001", "Engineering");
        let employee_obj = FacetedObject::new(employee);

        // Test attaching facets
        assert!(employee_obj.attach_facet(AccountFacet::new("ACC001")).is_ok());
        assert!(employee_obj.has_facet::<AccountFacet>());

        // Test duplicate attachment fails
        assert!(employee_obj.attach_facet(AccountFacet::new("ACC002")).is_err());
    }

    #[test]
    fn test_financial_operations() {
        let employee = Employee::new("Test User", "TEST001", "Engineering");
        let employee_obj = FacetedObject::new(employee);

        employee_obj.attach_facet(AccountFacet::new("ACC001")).unwrap();
        employee_obj.attach_facet(PermissionFacet::new("manager")).unwrap();

        // Test deposit
        let result = employee_obj.with_facet_mut::<AccountFacet, Result<f64, String>>(|account| {
            account.deposit(1000.0)
        }).unwrap();

        assert_eq!(result.unwrap(), 1000.0);

        // Test balance check
        let balance = employee_obj.with_facet::<AccountFacet, f64>(|account| {
            account.get_balance()
        }).unwrap();

        assert_eq!(balance, 1000.0);
    }

    #[test]
    fn test_permission_checking() {
        let employee = Employee::new("Test User", "TEST001", "Engineering");
        let employee_obj = FacetedObject::new(employee);

        employee_obj.attach_facet(PermissionFacet::new("employee")).unwrap();

        let has_financial = employee_obj.with_facet::<PermissionFacet, bool>(|permissions| {
            permissions.has_permission("financial_operations")
        }).unwrap();

        assert_eq!(has_financial, false);

        let has_read = employee_obj.with_facet::<PermissionFacet, bool>(|permissions| {
            permissions.has_permission("read")
        }).unwrap();

        assert_eq!(has_read, true);
    }
}

The Rust implementation provides several key advantages:

  • Type Safety: The type system ensures that facets can only be cast to their correct types
  • Memory Safety: Rust’s ownership model prevents common issues with shared mutable state
  • Performance: Zero-cost abstractions mean the facet system has minimal runtime overhead
  • Concurrency: Built-in thread safety through Send and Sync traits

TypeScript Implementation: Dynamic Composition with Type Safety

TypeScript’s type system allows for sophisticated compile-time checking while maintaining JavaScript’s dynamic nature:

// Base interfaces for the facet system

// Base interfaces for the facet system
interface Facet {
  readonly facetType: string;
}

interface FacetConstructor<T extends Facet> {
  new(...args: any[]): T;
  readonly facetType: string;
}

// Core faceted object implementation
class FacetedObject<TCore = any> {
  private facets: Map<string, Facet> = new Map();
  private core: TCore;

  constructor(core: TCore) {
    this.core = core;
  }

  // Attach a facet to this object
  attachFacet<T extends Facet>(FacetClass: FacetConstructor<T>, ...args: any[]): T {
    const facet = new FacetClass(...args);
    
    if (this.facets.has(FacetClass.facetType)) {
      throw new Error(`Facet ${FacetClass.facetType} already attached`);
    }
    
    this.facets.set(FacetClass.facetType, facet);
    return facet;
  }

  // Get a facet by its constructor
  getFacet<T extends Facet>(FacetClass: FacetConstructor<T>): T | undefined {
    const facet = this.facets.get(FacetClass.facetType);
    return facet as T | undefined;
  }

  // Check if a facet is attached
  hasFacet<T extends Facet>(FacetClass: FacetConstructor<T>): boolean {
    return this.facets.has(FacetClass.facetType);
  }

  // Remove a facet
  removeFacet<T extends Facet>(FacetClass: FacetConstructor<T>): boolean {
    return this.facets.delete(FacetClass.facetType);
  }

  // Get the core object
  getCore(): TCore {
    return this.core;
  }

  // Execute operation with facet requirement checking
  withFacet<T extends Facet, R>(
    FacetClass: FacetConstructor<T>,
    operation: (facet: T) => R
  ): R {
    const facet = this.getFacet(FacetClass);
    if (!facet) {
      throw new Error(`Required facet ${FacetClass.facetType} not found`);
    }
    return operation(facet);
  }

  // Get all attached facet types
  getAttachedFacetTypes(): string[] {
    return Array.from(this.facets.keys());
  }
}

// Example domain objects
interface Employee {
  name: string;
  id: string;
  department: string;
  email: string;
}

class EmployeeImpl implements Employee {
  constructor(
    public name: string,
    public id: string,
    public department: string,
    public email: string
  ) {}
}

// Account facet for financial operations
class AccountFacet implements Facet {
  static readonly facetType = 'account';
  readonly facetType = AccountFacet.facetType;

  private balance: number = 0;
  private accountNumber: string;
  private transactions: Transaction[] = [];

  constructor(accountNumber: string, initialBalance: number = 0) {
    this.accountNumber = accountNumber;
    this.balance = initialBalance;
  }

  deposit(amount: number): number {
    if (amount <= 0) {
      throw new Error('Deposit amount must be positive');
    }
    
    this.balance += amount;
    this.transactions.push({
      type: 'deposit',
      amount,
      timestamp: new Date(),
      balanceAfter: this.balance
    });
    
    return this.balance;
  }

  withdraw(amount: number): number {
    if (amount <= 0) {
      throw new Error('Withdrawal amount must be positive');
    }
    
    if (amount > this.balance) {
      throw new Error('Insufficient funds');
    }
    
    this.balance -= amount;
    this.transactions.push({
      type: 'withdrawal',
      amount,
      timestamp: new Date(),
      balanceAfter: this.balance
    });
    
    return this.balance;
  }

  getBalance(): number {
    return this.balance;
  }

  getAccountNumber(): string {
    return this.accountNumber;
  }

  getTransactionHistory(): Transaction[] {
    return [...this.transactions];
  }

  getRecentTransactions(count: number): Transaction[] {
    return this.transactions.slice(-count);
  }
}

interface Transaction {
  type: 'deposit' | 'withdrawal';
  amount: number;
  timestamp: Date;
  balanceAfter: number;
}

// Notification facet for alerting
class NotificationFacet implements Facet {
  static readonly facetType = 'notification';
  readonly facetType = NotificationFacet.facetType;

  private subscribers: Map<string, NotificationHandler[]> = new Map();

  subscribe(eventType: string, handler: NotificationHandler): void {
    if (!this.subscribers.has(eventType)) {
      this.subscribers.set(eventType, []);
    }
    this.subscribers.get(eventType)!.push(handler);
  }

  unsubscribe(eventType: string, handler: NotificationHandler): boolean {
    const handlers = this.subscribers.get(eventType);
    if (!handlers) return false;
    
    const index = handlers.indexOf(handler);
    if (index !== -1) {
      handlers.splice(index, 1);
      return true;
    }
    return false;
  }

  notify(eventType: string, data: any): void {
    const handlers = this.subscribers.get(eventType) || [];
    handlers.forEach(handler => {
      try {
        handler(eventType, data);
      } catch (error) {
        const errorMessage = error instanceof Error ? error.message : 'Unknown error';
        console.error(`Notification handler error for ${eventType}:`, errorMessage);
      }
    });
  }

  getSubscriberCount(eventType: string): number {
    return this.subscribers.get(eventType)?.length || 0;
  }
}

type NotificationHandler = (eventType: string, data: any) => void;

// Cache facet for performance optimization
class CacheFacet implements Facet {
  static readonly facetType = 'cache';
  readonly facetType = CacheFacet.facetType;

  private cache: Map<string, CacheEntry> = new Map();
  private maxSize: number;
  private defaultTTL: number;

  constructor(maxSize: number = 100, defaultTTL: number = 300000) { // 5 minutes default
    this.maxSize = maxSize;
    this.defaultTTL = defaultTTL;
  }

  set<T>(key: string, value: T, ttl?: number): void {
    // Remove oldest entries if cache is full
    if (this.cache.size >= this.maxSize) {
      const oldestKey = this.cache.keys().next().value;
      if (oldestKey !== undefined) {
        this.cache.delete(oldestKey);
      }
    }

    this.cache.set(key, {
      value,
      timestamp: Date.now(),
      ttl: ttl || this.defaultTTL
    });
  }

  get<T>(key: string): T | undefined {
    const entry = this.cache.get(key);
    if (!entry) return undefined;

    // Check if entry has expired
    if (Date.now() - entry.timestamp > entry.ttl) {
      this.cache.delete(key);
      return undefined;
    }

    return entry.value as T;
  }

  has(key: string): boolean {
    const entry = this.cache.get(key);
    if (!entry) return false;

    // Check if entry has expired
    if (Date.now() - entry.timestamp > entry.ttl) {
      this.cache.delete(key);
      return false;
    }

    return true;
  }

  invalidate(key: string): boolean {
    return this.cache.delete(key);
  }

  clear(): void {
    this.cache.clear();
  }

  getStats(): CacheStats {
    return {
      size: this.cache.size,
      maxSize: this.maxSize,
      hitRate: 0 // Would need to track hits/misses for real implementation
    };
  }
}

interface CacheEntry {
  value: any;
  timestamp: number;
  ttl: number;
}

interface CacheStats {
  size: number;
  maxSize: number;
  hitRate: number;
}

// Permission facet with role-based access control
class PermissionFacet implements Facet {
  static readonly facetType = 'permission';
  readonly facetType = PermissionFacet.facetType;

  private permissions: Set<string> = new Set();
  private role: string;

  constructor(role: string) {
    this.role = role;
    this.initializeRolePermissions(role);
  }

  private initializeRolePermissions(role: string): void {
    const rolePermissions: Record<string, string[]> = {
      'admin': ['read', 'write', 'delete', 'financial', 'admin'],
      'manager': ['read', 'write', 'financial', 'manage_team'],
      'employee': ['read', 'view_profile'],
      'guest': ['read']
    };

    const perms = rolePermissions[role] || [];
    perms.forEach(perm => this.permissions.add(perm));
  }

  hasPermission(permission: string): boolean {
    return this.permissions.has(permission);
  }

  grantPermission(permission: string): void {
    this.permissions.add(permission);
  }

  revokePermission(permission: string): void {
    this.permissions.delete(permission);
  }

  getPermissions(): string[] {
    return Array.from(this.permissions);
  }

  getRole(): string {
    return this.role;
  }

  requirePermission(permission: string): void {
    if (!this.hasPermission(permission)) {
      throw new Error(`Access denied: missing permission '${permission}'`);
    }
  }
}

// Composite operations using multiple facets
class EmployeeService {
  static performSecureFinancialOperation(
    employeeObj: FacetedObject<Employee>,
    operation: (account: AccountFacet) => number,
    operationType: string
  ): number {
    // Check permissions
    const permissions = employeeObj.getFacet(PermissionFacet);
    if (permissions) {
      permissions.requirePermission('financial');
    }

    // Perform operation
    const result = employeeObj.withFacet(AccountFacet, operation);

    // Send notification if facet is available
    const notifications = employeeObj.getFacet(NotificationFacet);
    if (notifications) {
      notifications.notify('financial_operation', {
        employee: employeeObj.getCore().name,
        operation: operationType,
        timestamp: new Date()
      });
    }

    // Invalidate related cache entries
    const cache = employeeObj.getFacet(CacheFacet);
    if (cache) {
      cache.invalidate(`balance_${employeeObj.getCore().id}`);
      cache.invalidate(`transactions_${employeeObj.getCore().id}`);
    }

    return result;
  }

  static getEmployeeSummary(employeeObj: FacetedObject<Employee>): string {
    const employee = employeeObj.getCore();
    const facetTypes = employeeObj.getAttachedFacetTypes();
    
    let summary = `Employee: ${employee.name} (${employee.id})\n`;
    summary += `Department: ${employee.department}\n`;
    summary += `Email: ${employee.email}\n`;
    summary += `Active Facets: ${facetTypes.join(', ')}\n`;

    // Add account information if available
    const account = employeeObj.getFacet(AccountFacet);
    if (account) {
      summary += `Account: ${account.getAccountNumber()} (Balance: $${account.getBalance().toFixed(2)})\n`;
      
      const recentTransactions = account.getRecentTransactions(3);
      if (recentTransactions.length > 0) {
        summary += 'Recent Transactions:\n';
        recentTransactions.forEach(tx => {
          summary += `  ${tx.type}: $${tx.amount.toFixed(2)} on ${tx.timestamp.toLocaleString()}\n`;
        });
      }
    }

    // Add permission information if available
    const permissions = employeeObj.getFacet(PermissionFacet);
    if (permissions) {
      summary += `Role: ${permissions.getRole()}\n`;
      summary += `Permissions: ${permissions.getPermissions().join(', ')}\n`;
    }

    // Add cache stats if available
    const cache = employeeObj.getFacet(CacheFacet);
    if (cache) {
      const stats = cache.getStats();
      summary += `Cache: ${stats.size}/${stats.maxSize} entries\n`;
    }

    return summary;
  }

  static configureEmployeeCapabilities(
    employeeObj: FacetedObject<Employee>,
    config: EmployeeConfig
  ): void {
    // Attach facets based on configuration
    if (config.hasAccount) {
      employeeObj.attachFacet(AccountFacet, config.accountNumber, config.initialBalance);
    }

    if (config.role) {
      employeeObj.attachFacet(PermissionFacet, config.role);
    }

    if (config.enableNotifications) {
      const notifications = employeeObj.attachFacet(NotificationFacet);
      
      // Set up default notification handlers
      notifications.subscribe('financial_operation', (eventType, data) => {
        console.log(`Financial operation performed: ${JSON.stringify(data)}`);
      });
    }

    if (config.enableCaching) {
      employeeObj.attachFacet(CacheFacet, config.cacheSize, config.cacheTTL);
    }
  }
}

interface EmployeeConfig {
  hasAccount?: boolean;
  accountNumber?: string;
  initialBalance?: number;
  role?: string;
  enableNotifications?: boolean;
  enableCaching?: boolean;
  cacheSize?: number;
  cacheTTL?: number;
}

// Usage example
function demonstrateFacetComposition(): void {
  console.log('=== Dynamic Facet Composition Demo ===');

  // Create an employee
  const employee = new EmployeeImpl('Bob Smith', 'EMP002', 'Finance', 'bob.smith@company.com');
  const employeeObj = new FacetedObject(employee);

  // Configure capabilities based on requirements
  EmployeeService.configureEmployeeCapabilities(employeeObj, {
    hasAccount: true,
    accountNumber: 'ACC002',
    initialBalance: 500,
    role: 'manager',
    enableNotifications: true,
    enableCaching: true,
    cacheSize: 50,
    cacheTTL: 600000 // 10 minutes
  });

  // Display initial summary
  console.log('\nInitial Employee Summary:');
  console.log(EmployeeService.getEmployeeSummary(employeeObj));

  // Perform financial operations
  try {
    const newBalance = EmployeeService.performSecureFinancialOperation(
      employeeObj,
      (account) => account.deposit(1000),
      'deposit'
    );
    console.log(`Deposit successful. New balance: $${newBalance.toFixed(2)}`);

    const finalBalance = EmployeeService.performSecureFinancialOperation(
      employeeObj,
      (account) => account.withdraw(200),
      'withdrawal'
    );
    console.log(`Withdrawal successful. Final balance: $${finalBalance.toFixed(2)}`);

  } catch (error) {
    const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred';
    console.error('Operation failed:', errorMessage);
  }

  // Display final summary
  console.log('\nFinal Employee Summary:');
  console.log(EmployeeService.getEmployeeSummary(employeeObj));
}

// Run the demonstration
demonstrateFacetComposition();

The TypeScript implementation provides:

  • Type Safety: Compile-time type checking for facet operations
  • IntelliSense Support: Rich IDE support with autocompletion and error detection
  • Interface Segregation: Clean separation between different capabilities
  • Dynamic Composition: Runtime attachment and detachment of behaviors

Ruby Implementation: Metaprogramming-Powered Facets

Ruby’s metaprogramming capabilities make facet implementation particularly elegant:

require 'date'
require 'set'
require 'json'

# Core facet module that all facets include
module Facet
  def self.included(base)
    base.extend(ClassMethods)
  end

  module ClassMethods
    def facet_type
      @facet_type ||= name.downcase.gsub(/facet$/, '')
    end

    def facet_type=(type)
      @facet_type = type
    end
  end

  def facet_type
    self.class.facet_type
  end
end

# Main faceted object implementation
class FacetedObject
  def initialize(core_object)
    @core_object = core_object
    @facets = {}
    @method_cache = {}
    
    # Enable method delegation
    extend_with_facet_methods
  end

  def attach_facet(facet_instance)
    facet_type = facet_instance.facet_type
    
    if @facets.key?(facet_type)
      raise ArgumentError, "Facet '#{facet_type}' already attached"
    end

    @facets[facet_type] = facet_instance
    
    # Add facet methods to this instance
    add_facet_methods(facet_instance)
    
    # Call initialization hook if facet defines it
    facet_instance.on_attached(self) if facet_instance.respond_to?(:on_attached)
    
    facet_instance
  end

  def detach_facet(facet_type_or_class)
    facet_type = case facet_type_or_class
                 when String
                   facet_type_or_class
                 when Class
                   facet_type_or_class.facet_type
                 else
                   facet_type_or_class.facet_type
                 end

    facet = @facets.delete(facet_type)
    
    if facet
      # Remove facet methods
      remove_facet_methods(facet)
      
      # Call cleanup hook if facet defines it
      facet.on_detached(self) if facet.respond_to?(:on_detached)
    end
    
    facet
  end

  def get_facet(facet_type_or_class)
    facet_type = case facet_type_or_class
                 when String
                   facet_type_or_class
                 when Class
                   facet_type_or_class.facet_type
                 else
                   facet_type_or_class.facet_type
                 end

    @facets[facet_type]
  end

  def has_facet?(facet_type_or_class)
    !get_facet(facet_type_or_class).nil?
  end

  def facet_types
    @facets.keys
  end

  def core_object
    @core_object
  end

  def with_facet(facet_type_or_class)
    facet = get_facet(facet_type_or_class)
    raise ArgumentError, "Facet not found: #{facet_type_or_class}" unless facet
    
    yield(facet)
  end

  # Require specific facets for an operation
  def requires_facets(*facet_types, &block)
    missing_facets = facet_types.select { |type| !has_facet?(type) }
    
    unless missing_facets.empty?
      raise ArgumentError, "Missing required facets: #{missing_facets.join(', ')}"
    end
    
    block.call(self) if block_given?
  end

  private

  def extend_with_facet_methods
    # Add method_missing to handle facet method calls
    singleton_class.class_eval do
      define_method :method_missing do |method_name, *args, &block|
        # Try to find the method in attached facets
        @facets.values.each do |facet|
          if facet.respond_to?(method_name)
            return facet.send(method_name, *args, &block)
          end
        end
        
        # Try the core object
        if @core_object.respond_to?(method_name)
          return @core_object.send(method_name, *args, &block)
        end
        
        super(method_name, *args, &block)
      end

      define_method :respond_to_missing? do |method_name, include_private = false|
        @facets.values.any? { |facet| facet.respond_to?(method_name, include_private) } ||
          @core_object.respond_to?(method_name, include_private) ||
          super(method_name, include_private)
      end
    end
  end

  def add_facet_methods(facet)
    facet.public_methods(false).each do |method_name|
      next if method_name == :facet_type

      # Create a delegating method for each public method of the facet
      singleton_class.class_eval do
        define_method("#{facet.facet_type}_#{method_name}") do |*args, &block|
          facet.send(method_name, *args, &block)
        end
      end
    end
  end

  def remove_facet_methods(facet)
    facet.public_methods(false).each do |method_name|
      method_to_remove = "#{facet.facet_type}_#{method_name}"
      
      if respond_to?(method_to_remove)
        singleton_class.class_eval do
          remove_method(method_to_remove) if method_defined?(method_to_remove)
        end
      end
    end
  end
end

# Example domain class
class Employee
  attr_accessor :name, :id, :department, :email, :hire_date

  def initialize(name, id, department, email, hire_date = Date.today)
    @name = name
    @id = id
    @department = department
    @email = email
    @hire_date = hire_date
  end

  def years_of_service
    ((Date.today - @hire_date) / 365.25).to_i
  end

  def to_h
    {
      name: @name,
      id: @id,
      department: @department,
      email: @email,
      hire_date: @hire_date,
      years_of_service: years_of_service
    }
  end
end

# Account facet for financial operations
class AccountFacet
  include Facet
  
  attr_reader :account_number, :balance

  def initialize(account_number, initial_balance = 0)
    @account_number = account_number
    @balance = initial_balance.to_f
    @transactions = []
  end

  def deposit(amount)
    raise ArgumentError, "Amount must be positive" unless amount > 0
    
    @balance += amount
    log_transaction('deposit', amount)
    @balance
  end

  def withdraw(amount)
    raise ArgumentError, "Amount must be positive" unless amount > 0
    raise ArgumentError, "Insufficient funds" if amount > @balance
    
    @balance -= amount
    log_transaction('withdrawal', amount)
    @balance
  end

  def transfer_to(target_account_number, amount)
    raise ArgumentError, "Cannot transfer to same account" if target_account_number == @account_number
    
    withdraw(amount)
    log_transaction('transfer_out', amount, target_account_number)
    amount
  end

  def receive_transfer(from_account_number, amount)
    deposit(amount)
    log_transaction('transfer_in', amount, from_account_number)
    @balance
  end

  def transaction_history(limit = nil)
    limit ? @transactions.last(limit) : @transactions.dup
  end

  def monthly_summary(year, month)
    start_date = Date.new(year, month, 1)
    end_date = start_date.next_month - 1
    
    monthly_transactions = @transactions.select do |tx|
      tx[:timestamp].to_date.between?(start_date, end_date)
    end

    {
      period: "#{year}-#{month.to_s.rjust(2, '0')}",
      transactions: monthly_transactions,
      total_deposits: monthly_transactions.select { |tx| tx[:type] == 'deposit' }.sum { |tx| tx[:amount] },
      total_withdrawals: monthly_transactions.select { |tx| tx[:type] == 'withdrawal' }.sum { |tx| tx[:amount] }
    }
  end

  private

  def log_transaction(type, amount, reference = nil)
    @transactions << {
      type: type,
      amount: amount,
      balance_after: @balance,
      timestamp: Time.now,
      reference: reference
    }
  end
end

# Performance tracking facet
class PerformanceFacet
  include Facet
  
  def initialize
    @metrics = {}
    @goals = {}
    @reviews = []
  end

  def set_metric(name, value, period = Date.today)
    @metrics[name] ||= []
    @metrics[name] << { value: value, period: period, timestamp: Time.now }
  end

  def get_metric(name, period = nil)
    return nil unless @metrics[name]
    
    if period
      @metrics[name].find { |m| m[:period] == period }&.fetch(:value)
    else
      @metrics[name].last&.fetch(:value)
    end
  end

  def set_goal(name, target_value, deadline)
    @goals[name] = { target: target_value, deadline: deadline, set_on: Date.today }
  end

  def goal_progress(name)
    goal = @goals[name]
    return nil unless goal
    
    current_value = get_metric(name)
    return nil unless current_value
    
    progress = (current_value.to_f / goal[:target]) * 100
    {
      goal: goal,
      current_value: current_value,
      progress_percentage: progress.round(2),
      days_remaining: (goal[:deadline] - Date.today).to_i
    }
  end

  def add_review(rating, comments, reviewer, review_date = Date.today)
    @reviews << {
      rating: rating,
      comments: comments,
      reviewer: reviewer,
      review_date: review_date,
      timestamp: Time.now
    }
  end

  def average_rating(last_n_reviews = nil)
    reviews_to_consider = last_n_reviews ? @reviews.last(last_n_reviews) : @reviews
    return 0 if reviews_to_consider.empty?
    
    total = reviews_to_consider.sum { |review| review[:rating] }
    (total.to_f / reviews_to_consider.size).round(2)
  end

  def performance_summary
    {
      metrics: @metrics.transform_values { |values| values.last },
      goals: @goals.transform_values { |goal| goal_progress(@goals.key(goal)) },
      recent_reviews: @reviews.last(3),
      average_rating: average_rating,
      total_reviews: @reviews.size
    }
  end
end

# Security facet for access control and audit
class SecurityFacet
  include Facet
  
  def initialize(security_level = 'basic')
    @security_level = security_level
    @access_log = []
    @failed_attempts = []
    @permissions = Set.new
    @active_sessions = {}
    
    setup_default_permissions
  end

  def authenticate(credentials)
    # Simulate authentication
    success = credentials[:password] == 'secret123'
    
    log_access_attempt(credentials[:user_id], success)
    
    if success
      session_id = generate_session_id
      @active_sessions[session_id] = {
        user_id: credentials[:user_id],
        start_time: Time.now,
        last_activity: Time.now
      }
      session_id
    else
      nil
    end
  end

  def validate_session(session_id)
    session = @active_sessions[session_id]
    return false unless session
    
    # Check session timeout (30 minutes)
    if Time.now - session[:last_activity] > 1800
      @active_sessions.delete(session_id)
      return false
    end
    
    session[:last_activity] = Time.now
    true
  end

  def logout(session_id)
    @active_sessions.delete(session_id)
  end

  def grant_permission(permission)
    @permissions.add(permission)
  end

  def revoke_permission(permission)
    @permissions.delete(permission)
  end

  def has_permission?(permission)
    @permissions.include?(permission) || @permissions.include?('admin')
  end

  def require_permission(permission)
    unless has_permission?(permission)
      raise SecurityError, "Access denied: missing permission '#{permission}'"
    end
  end

  def security_report
    {
      security_level: @security_level,
      permissions: @permissions.to_a,
      active_sessions: @active_sessions.size,
      recent_access_attempts: @access_log.last(10),
      failed_attempts_today: failed_attempts_today.size,
      total_access_attempts: @access_log.size
    }
  end

  private

  def setup_default_permissions
    case @security_level
    when 'admin'
      @permissions.merge(['read', 'write', 'delete', 'admin', 'financial'])
    when 'manager'
      @permissions.merge(['read', 'write', 'financial'])
    when 'employee'
      @permissions.merge(['read'])
    end
  end

  def log_access_attempt(user_id, success)
    attempt = {
      user_id: user_id,
      success: success,
      timestamp: Time.now,
      ip_address: '127.0.0.1' # Would be actual IP in real implementation
    }
    
    @access_log << attempt
    @failed_attempts << attempt unless success
  end

  def failed_attempts_today
    today = Date.today
    @failed_attempts.select { |attempt| attempt[:timestamp].to_date == today }
  end

  def generate_session_id
    "session_#{Time.now.to_i}_#{rand(10000)}"
  end
end

# Notification facet for messaging and alerts
class NotificationFacet
  include Facet
  
  def initialize
    @subscribers = Hash.new { |hash, key| hash[key] = [] }
    @message_history = []
    @preferences = {
      email: true,
      sms: false,
      push: true,
      frequency: 'immediate'
    }
  end

  def subscribe(event_type, &handler)
    @subscribers[event_type] << handler
  end

  def unsubscribe(event_type, handler)
    @subscribers[event_type].delete(handler)
  end

  def notify(event_type, data = {})
    timestamp = Time.now
    message = {
      event_type: event_type,
      data: data,
      timestamp: timestamp
    }
    
    @message_history << message
    
    # Deliver to subscribers
    @subscribers[event_type].each do |handler|
      begin
        handler.call(message)
      rescue => e
        puts "Notification handler error: #{e.message}"
      end
    end
    
    # Simulate different delivery channels based on preferences
    deliver_message(message) if should_deliver?(event_type)
  end

  def set_preference(channel, enabled)
    @preferences[channel] = enabled
  end

  def set_frequency(frequency)
    raise ArgumentError, "Invalid frequency" unless %w[immediate hourly daily].include?(frequency)
    @preferences[:frequency] = frequency
  end

  def message_history(limit = nil)
    limit ? @message_history.last(limit) : @message_history.dup
  end

  def unread_count
    # In a real implementation, this would track read status
    @message_history.count { |msg| msg[:timestamp] > Time.now - 3600 } # Last hour
  end

  private

  def should_deliver?(event_type)
    # Simple delivery logic based on preferences
    case @preferences[:frequency]
    when 'immediate'
      true
    when 'hourly'
      @message_history.select { |msg| msg[:timestamp] > Time.now - 3600 }.size <= 1
    when 'daily'
      @message_history.select { |msg| msg[:timestamp] > Time.now - 86400 }.size <= 1
    else
      true
    end
  end

  def deliver_message(message)
    puts "? Email: #{message[:event_type]} - #{message[:data]}" if @preferences[:email]
    puts "? Push: #{message[:event_type]} - #{message[:data]}" if @preferences[:push]
    puts "? SMS: #{message[:event_type]} - #{message[:data]}" if @preferences[:sms]
  end
end

# Service class for coordinated operations
class EmployeeService
  def self.create_employee(name, id, department, email, capabilities = {})
    employee = Employee.new(name, id, department, email)
    faceted_employee = FacetedObject.new(employee)
    
    # Attach facets based on capabilities
    if capabilities[:account]
      account_facet = AccountFacet.new(capabilities[:account][:number], capabilities[:account][:balance])
      faceted_employee.attach_facet(account_facet)
    end
    
    if capabilities[:security]
      security_facet = SecurityFacet.new(capabilities[:security][:level])
      capabilities[:security][:permissions]&.each { |perm| security_facet.grant_permission(perm) }
      faceted_employee.attach_facet(security_facet)
    end
    
    if capabilities[:performance_tracking]
      faceted_employee.attach_facet(PerformanceFacet.new)
    end
    
    if capabilities[:notifications]
      notification_facet = NotificationFacet.new
      
      # Set up default notification handlers
      notification_facet.subscribe('financial_transaction') do |message|
        puts "? Financial Alert: #{message[:data][:type]} of $#{message[:data][:amount]}"
      end
      
      notification_facet.subscribe('performance_update') do |message|
        puts "? Performance Update: #{message[:data][:metric]} = #{message[:data][:value]}"
      end
      
      faceted_employee.attach_facet(notification_facet)
    end
    
    faceted_employee
  end

  def self.perform_secure_transaction(employee_obj, transaction_type, amount)
    employee_obj.requires_facets('security', 'account') do |obj|
      # Authenticate and check permissions
      security = obj.get_facet('security')
      security.require_permission('financial')
      
      # Perform transaction
      account = obj.get_facet('account')
      result = case transaction_type
               when 'deposit'
                 account.deposit(amount)
               when 'withdraw'
                 account.withdraw(amount)
               else
                 raise ArgumentError, "Unknown transaction type: #{transaction_type}"
               end
      
      # Send notification if available
      if obj.has_facet?('notification')
        notification = obj.get_facet('notification')
        notification.notify('financial_transaction', {
          type: transaction_type,
          amount: amount,
          new_balance: result,
          employee: obj.core_object.name
        })
      end
      
      result
    end
  end

  def self.update_performance(employee_obj, metric_name, value)
    employee_obj.with_facet('performance') do |performance|
      performance.set_metric(metric_name, value)
      
      # Notify if notification facet is available
      if employee_obj.has_facet?('notification')
        notification = employee_obj.get_facet('notification')
        notification.notify('performance_update', {
          metric: metric_name,
          value: value,
          employee: employee_obj.core_object.name
        })
      end
    end
  end

  def self.comprehensive_report(employee_obj)
    employee = employee_obj.core_object
    
    report = {
      employee_info: employee.to_h,
      attached_facets: employee_obj.facet_types,
      timestamp: Time.now
    }
    
    # Add facet-specific information
    if employee_obj.has_facet?('account')
      account = employee_obj.get_facet('account')
      report[:financial] = {
        account_number: account.account_number,
        balance: account.balance,
        recent_transactions: account.transaction_history(5)
      }
    end
    
    if employee_obj.has_facet?('performance')
      performance = employee_obj.get_facet('performance')
      report[:performance] = performance.performance_summary
    end
    
    if employee_obj.has_facet?('security')
      security = employee_obj.get_facet('security')
      report[:security] = security.security_report
    end
    
    if employee_obj.has_facet?('notification')
      notification = employee_obj.get_facet('notification')
      report[:notifications] = {
        unread_count: notification.unread_count,
        recent_messages: notification.message_history(3)
      }
    end
    
    report
  end
end

# Usage demonstration
def demonstrate_facet_system
  puts "=== Dynamic Facet Composition Demo ==="
  
  # Create employee with various capabilities
  employee_obj = EmployeeService.create_employee(
    'Sarah Connor', 'EMP003', 'Engineering', 'sarah.connor@company.com',
    {
      account: { number: 'ACC003', balance: 1000 },
      security: { level: 'manager', permissions: ['read', 'write', 'financial'] },
      performance_tracking: true,
      notifications: true
    }
  )
  
  puts "\n--- Initial Employee State ---"
  puts "Attached facets: #{employee_obj.facet_types.join(', ')}"
  
  # Demonstrate financial operations
  puts "\n--- Financial Operations ---"
  begin
    # First authenticate (in a real system)
    security = employee_obj.get_facet('security')
    session_id = security.authenticate(user_id: 'sarah', password: 'secret123')
    puts "Authentication successful: #{session_id}"
    
    # Perform transactions
    new_balance = EmployeeService.perform_secure_transaction(employee_obj, 'deposit', 500)
    puts "Deposit completed. New balance: $#{new_balance}"
    
    new_balance = EmployeeService.perform_secure_transaction(employee_obj, 'withdraw', 200)
    puts "Withdrawal completed. New balance: $#{new_balance}"
    
  rescue => e
    puts "Transaction failed: #{e.message}"
  end
  
  # Demonstrate performance tracking
  puts "\n--- Performance Tracking ---"
  EmployeeService.update_performance(employee_obj, 'projects_completed', 5)
  EmployeeService.update_performance(employee_obj, 'customer_satisfaction', 4.5)
  
  performance = employee_obj.get_facet('performance')
  performance.set_goal('projects_completed', 10, Date.today + 90)
  
  puts "Goal progress: #{performance.goal_progress('projects_completed')}"
  
  # Generate comprehensive report
  puts "\n--- Comprehensive Employee Report ---"
  report = EmployeeService.comprehensive_report(employee_obj)
  puts JSON.pretty_generate(report)
  
  # Demonstrate dynamic facet management
  puts "\n--- Dynamic Facet Management ---"
  puts "Before detachment: #{employee_obj.facet_types.join(', ')}"
  
  # Detach performance facet
  employee_obj.detach_facet('performance')
  puts "After detaching performance: #{employee_obj.facet_types.join(', ')}"
  
  # Try to use detached facet (should fail gracefully)
  begin
    EmployeeService.update_performance(employee_obj, 'test_metric', 1)
  rescue => e
    puts "Expected error when using detached facet: #{e.message}"
  end
end

# Run the demonstration
demonstrate_facet_system

The Ruby implementation showcases:

  • Metaprogramming Power: Dynamic method addition and removal using Ruby’s metaprogramming capabilities
  • Elegant Syntax: Clean, readable code that expresses intent clearly
  • Flexible Composition: Easy attachment and detachment of facets at runtime
  • Duck Typing: Natural method delegation without complex type hierarchies

Real-World Applications

The facets pattern proves particularly valuable in several domains:

Enterprise Software Integration

Modern enterprise systems often need to integrate with multiple external services. Facets allow core business objects to gain integration capabilities dynamically:

// Core customer object
const customer = new Customer('ABC Corp', 'enterprise');
const customerObj = new FacetedObject(customer);

// Attach integration facets based on configuration
if (config.salesforce.enabled) {
  customerObj.attachFacet(SalesforceFacet, config.salesforce.credentials);
}

if (config.stripe.enabled) {
  customerObj.attachFacet(PaymentFacet, config.stripe.apiKey);
}

if (config.analytics.enabled) {
  customerObj.attachFacet(AnalyticsFacet, config.analytics.trackingId);
}

Multi-Tenant SaaS Applications

Different tenants often require different feature sets. Facets enable feature composition based on subscription levels:

// Configure tenant capabilities based on plan
match subscription_plan {
    Plan::Basic => {
        tenant_obj.attach_facet(BasicAnalyticsFacet::new())?;
    },
    Plan::Professional => {
        tenant_obj.attach_facet(AdvancedAnalyticsFacet::new())?;
        tenant_obj.attach_facet(IntegrationFacet::new())?;
    },
    Plan::Enterprise => {
        tenant_obj.attach_facet(AdvancedAnalyticsFacet::new())?;
        tenant_obj.attach_facet(IntegrationFacet::new())?;
        tenant_obj.attach_facet(WhiteLabelFacet::new())?;
        tenant_obj.attach_facet(ApiAccessFacet::new())?;
    }
}

IoT Device Management

IoT devices often have optional capabilities that depend on hardware configuration or runtime conditions:

# Device base configuration
device_obj = FacetedObject.new(IoTDevice.new(device_id, device_type))

# Attach facets based on detected capabilities
if device.has_sensor?('temperature')
  device_obj.attach_facet(TemperatureFacet.new)
end

if device.has_connectivity?('wifi')
  device_obj.attach_facet(WiFiFacet.new)
end

if device.battery_powered?
  device_obj.attach_facet(PowerManagementFacet.new)
end

Performance Considerations

While facets provide tremendous flexibility, they come with performance trade-offs that must be carefully managed:

Method Resolution Overhead

Dynamic method resolution can introduce latency. Caching strategies help mitigate this:

class OptimizedFacetedObject<TCore> extends FacetedObject<TCore> {
  private methodCache: Map<string, Facet> = new Map();
  
  getFacetForMethod(methodName: string): Facet | undefined {
    // Check cache first
    if (this.methodCache.has(methodName)) {
      return this.methodCache.get(methodName);
    }
    
    // Search facets for method
    for (const facet of this.facets.values()) {
      if (typeof (facet as any)[methodName] === 'function') {
        this.methodCache.set(methodName, facet);
        return facet;
      }
    }
    
    return undefined;
  }
}

Memory Management

Facets can create reference cycles. Proper cleanup is essential:

impl Drop for FacetedObject {
    fn drop(&mut self) {
        // Clean up facet references
        for (_, facet) in self.facets.drain() {
            // Perform any necessary cleanup
            // Call facet-specific cleanup if implemented
        }
    }
}

Serialization Challenges

Faceted objects require special handling for persistence:

class FacetedObject
  def to_serializable
    {
      core_object: @core_object,
      facets: @facets.transform_values { |facet| serialize_facet(facet) },
      facet_types: @facets.keys
    }
  end
  
  def self.from_serializable(data)
    obj = new(data[:core_object])
    
    data[:facets].each do |type, facet_data|
      facet_class = Object.const_get("#{type.camelize}Facet")
      facet = facet_class.deserialize(facet_data)
      obj.attach_facet(facet)
    end
    
    obj
  end
  
  private
  
  def serialize_facet(facet)
    if facet.respond_to?(:serialize)
      facet.serialize
    else
      # Default serialization
      facet.instance_variables.each_with_object({}) do |var, hash|
        hash[var] = facet.instance_variable_get(var)
      end
    end
  end
end

Architecture Patterns and Best Practices

Facet Discovery and Registration

Large systems benefit from automatic facet discovery:

class FacetRegistry {
  private static facetClasses: Map<string, FacetConstructor<any>> = new Map();
  
  static register<T extends Facet>(facetClass: FacetConstructor<T>): void {
    this.facetClasses.set(facetClass.facetType, facetClass);
  }
  
  static createFacet<T extends Facet>(
    facetType: string, 
    ...args: any[]
  ): T | undefined {
    const FacetClass = this.facetClasses.get(facetType);
    return FacetClass ? new FacetClass(...args) : undefined;
  }
  
  static getAvailableFacets(): string[] {
    return Array.from(this.facetClasses.keys());
  }
}

// Automatic registration
@RegisterFacet
class EmailFacet implements Facet {
  static readonly facetType = 'email';
  // ...
}

Configuration-Driven Composition

Enable declarative facet composition through configuration:

# facet-config.yml
employee_types:
  manager:
    facets:
      - type: account
        config:
          initial_balance: 1000
      - type: permission
        config:
          role: manager
      - type: notification
        config:
          channels: [email, push]
  
  admin:
    inherits: manager
    facets:
      - type: audit
        config:
          level: detailed
      - type: permission
        config:
          role: admin
pub struct FacetComposer {
    config: HashMap<String, EmployeeTypeConfig>,
}

impl FacetComposer {
    pub fn compose_employee(&self, employee_type: &str, employee: Employee) -> Result<FacetedObject, String> {
        let config = self.config.get(employee_type)
            .ok_or_else(|| format!("Unknown employee type: {}", employee_type))?;
        
        let mut employee_obj = FacetedObject::new(employee);
        
        for facet_config in &config.facets {
            let facet = self.create_facet(&facet_config.facet_type, &facet_config.config)?;
            employee_obj.attach_facet(facet)?;
        }
        
        Ok(employee_obj)
    }
}

Testing Strategies

Faceted objects require comprehensive testing approaches:

RSpec.describe FacetedObject do
  let(:employee) { Employee.new('Test User', 'TEST001', 'Engineering', 'test@example.com') }
  let(:employee_obj) { FacetedObject.new(employee) }
  
  describe 'facet composition' do
    it 'allows dynamic attachment of facets' do
      account_facet = AccountFacet.new('ACC001', 1000)
      employee_obj.attach_facet(account_facet)
      
      expect(employee_obj.has_facet?('account')).to be true
      expect(employee_obj.balance).to eq 1000
    end
    
    it 'prevents duplicate facet attachment' do
      employee_obj.attach_facet(AccountFacet.new('ACC001'))
      
      expect {
        employee_obj.attach_facet(AccountFacet.new('ACC002'))
      }.to raise_error(ArgumentError, /already attached/)
    end
  end
  
  describe 'cross-facet operations' do
    before do
      employee_obj.attach_facet(AccountFacet.new('ACC001', 1000))
      employee_obj.attach_facet(SecurityFacet.new('manager'))
      employee_obj.attach_facet(NotificationFacet.new)
    end
    
    it 'coordinates operations across multiple facets' do
      expect {
        EmployeeService.perform_secure_transaction(employee_obj, 'withdraw', 100)
      }.to change { employee_obj.balance }.by(-100)
        .and output(/Financial Alert/).to_stdout
    end
  end
end

Comparison with Related Patterns

Facets vs Decorators

While both patterns add behavior dynamically, they serve different purposes:

Decorators: Wrap objects to modify or extend their interface Facets: Compose objects from multiple behavioral aspects

// Decorator pattern - wrapping behavior
class LoggingDecorator implements Employee {
  constructor(private wrapped: Employee) {}
  
  performAction(action: string): void {
    console.log(`Performing: ${action}`);
    this.wrapped.performAction(action);
    console.log(`Completed: ${action}`);
  }
}

// Facets pattern - compositional behavior
const employee = new FacetedObject(new EmployeeImpl());
employee.attachFacet(LoggingFacet);
employee.attachFacet(SecurityFacet);
// Employee now has both logging AND security capabilities

Facets vs Mixins

Mixins operate at the class level, facets at the instance level:

# Mixin - class-level composition
module Auditable
  def log_action(action)
    puts "Action: #{action}"
  end
end

class Employee
  include Auditable  # All instances get audit capability
end

# Facets - instance-level composition
employee1 = FacetedObject.new(Employee.new)
employee1.attach_facet(AuditFacet.new)  # Only this instance gets audit capability

employee2 = FacetedObject.new(Employee.new)  # This instance doesn't have audit

Emerging Patterns

AI-Driven Facet Composition

Machine learning could optimize facet composition based on usage patterns:

class IntelligentFacetComposer {
  private usageAnalyzer: UsageAnalyzer;
  private mlModel: FacetRecommendationModel;
  
  async recommendFacets(
    objectType: string, 
    context: CompositionContext
  ): Promise<FacetRecommendation[]> {
    const usagePatterns = await this.usageAnalyzer.analyze(objectType);
    const contextFeatures = this.extractFeatures(context);
    
    return this.mlModel.predict(usagePatterns, contextFeatures);
  }
  
  async optimizeForPerformance(
    facetedObject: FacetedObject<any>
  ): Promise<OptimizationSuggestions> {
    const usage = await this.usageAnalyzer.getObjectUsage(facetedObject);
    
    return {
      facetsToPreload: usage.frequentlyUsedFacets,
      facetsToLazyLoad: usage.rarelyUsedFacets,
      cacheStrategy: usage.recommendedCacheStrategy
    };
  }
}

Blockchain and Distributed Facets

Distributed systems could benefit from blockchain-verified facet capabilities:

pub struct DistributedFacetRegistry {
    blockchain_client: BlockchainClient,
    capability_verifier: CapabilityVerifier,
}

impl DistributedFacetRegistry {
    pub async fn verify_facet_capability(
        &self,
        facet_hash: &str,
        required_permissions: &[String]
    ) -> Result<bool, DistributedError> {
        // Verify facet authenticity on blockchain
        let facet_record = self.blockchain_client
            .get_facet_record(facet_hash).await?;
        
        // Verify permissions
        self.capability_verifier
            .verify_permissions(&facet_record, required_permissions)
    }
}

Conclusion

The facets pattern represents a powerful approach to runtime behavior composition that complements the Adaptive Object Model pattern I discussed previously. While AOM focuses on schema flexibility, facets address the equally important challenge of behavioral composition. The implementations in Rust, TypeScript, and Ruby demonstrate how this pattern can be adapted to different language paradigms while maintaining its core principles. Each language brings unique strengths: Rust’s type safety and performance, TypeScript’s gradual typing and tooling support, and Ruby’s metaprogramming elegance.

Unfortunately, ObjectSpace company that created Voyager went out of business and San Francisco Design Patterns book didn’t gain traction, in part because of its ties to the now-obsolete EJB technology and the performance overhead from using runtime reflection in the extension pattern. Nevertheless, the facets/extension pattern excels in domains requiring high configurability and runtime adaptability. However, it requires careful attention to performance implications and testing strategies. The pattern works best when you have clear separation of concerns and well-defined interfaces between facets. The combination of AOM for schema evolution and facets for behavior composition provides a comprehensive approach to building truly adaptive systems. Together, these patterns enable software that can evolve gracefully with changing requirements while maintaining performance and reliability.

The sample implementations are available at the Dynamic Facets Sample Project, providing working examples in all three languages discussed. These implementations serve as a foundation for building more sophisticated facet-based systems tailored to specific domain requirements.

September 8, 2025

Adaptive Object Model: A Modern Approach with Dynamic Languages and Document Databases

Filed under: Computing,Methodologies — admin @ 11:36 am

Background

I have long been interested in the Adaptive Object Model (AOM) pattern and used it in a couple of projects in early 2000s. I have also written about this pattern earlier, which emerged from the work of Ralph Johnson and his colleagues in the late 1990s. It addresses a fundamental challenge in software architecture: how to build systems that can evolve structurally without code changes or downtime. The pattern draws heavily from several foundational concepts in computer science and software engineering. The roots of AOM can be traced back to several influential ideas:

  • Reflection and Metaprogramming: Early Lisp systems showed the power of treating code as data, enabling programs to modify themselves at runtime. This concept heavily influenced the AOM pattern’s approach to treating metadata as first-class objects.
  • Type Theory: The work of pioneers like Alonzo Church and Haskell Curry on type systems provided the theoretical foundation for the “type square” pattern that forms AOM’s core structure, where types themselves become objects that can be manipulated.
  • Database Systems: The entity-attribute-value (EAV) model used in database design influenced AOM’s approach to storing flexible data structures.

Related Patterns

Following are other patterns that are related to AOM:

  • Facade Pattern: AOM often employs facades to provide simplified interfaces over complex meta-object structures, hiding the underlying complexity from client code.
  • Strategy Pattern: The dynamic binding of operations in AOM naturally implements the Strategy pattern, allowing algorithms to be selected and modified at runtime.
  • Composition over Inheritance: AOM uses the principle of favoring composition over inheritance by building complex objects from simpler, configurable components rather than rigid class hierarchies.
  • Domain-Specific Languages (DSLs): Many AOM implementations provide DSLs for defining entity types and relationships, making the system accessible to domain experts rather than just programmers.

Voyager ORB’s Dynamic Aggregation

In late 1990s/early 2000s, I used Voyager ORB for some personal projects that pioneered a concept of “Dynamic Aggregation” – the ability to attach secondary objects, called facets, to primary objects at runtime. This system demonstrated several key principles that later influenced AOM development:

  • Runtime Object Extension: Objects could be extended with new capabilities without modifying their original class definitions:
// Voyager ORB example - attaching an account facet to an employee
IEmployee employee = new Employee("joe", "234-44-2678");
IFacets facets = Facets.of(employee);
IAccount account = (IAccount) facets.of(IAccount.class);
account.deposit(2000);
  • Interface-based Composition: Facets were accessed through interfaces, providing a clean separation between capability and implementation – a principle central to modern AOM.
  • Distributed Object Mobility: Voyager‘s facet system worked seamlessly across network boundaries, allowing objects and their attached capabilities to move between different machines while maintaining their extended functionality.
  • Automatic Proxy Generation: Like modern AOM systems, Voyager automatically generated the necessary plumbing code at runtime, using Java’s reflection and bytecode manipulation capabilities.

The Voyager approach influenced distributed computing patterns and demonstrated that dynamic composition could work reliably in production systems. The idea of attaching behavior at runtime through well-defined interfaces is directly applicable to modern AOM implementations. The key insight from Voyager was that objects don’t need to know about all their potential capabilities at compile time. Instead, capabilities can be discovered, attached, and composed dynamically based on runtime requirements – a principle that AOM extends to entire domain models.

Introduction to Adaptive Object Model

Adaptive Object Model is an architectural pattern used in domains requiring dynamic manipulation of metadata and business rules. Unlike traditional object-oriented design where class structures are fixed at compile time, AOM treats class definitions, attributes, relationships, and even business rules as data that can be modified at runtime.

Consider our vehicle example again. In traditional OO design, you might have:

Vehicle
??? Car
?   ??? Sedan
?   ??? SUV
?   ??? Coupe
??? Motorcycle
??? Truck
    ??? PickupTruck
    ??? SemiTruck

With AOM, instead of predefined inheritance hierarchies, we use the “type square” pattern:

  • EntityType: Represents what would traditionally be a class
  • Entity: Represents what would traditionally be an object instance
  • PropertyType: Defines the schema for attributes
  • Property: Holds actual attribute values

This meta-model allows for unlimited extensibility without code changes, making it ideal for domains with rapidly evolving requirements or where different customers need different data models.

The Database Challenge: From Relational to Document

Traditional relational databases present significant challenges for AOM implementations:

  • Excessive Joins: In a relational AOM implementation, reconstructing a single business object requires joining multiple tables:
    • Entity table (object instances)
    • Property table (attribute values)
    • PropertyType table (attribute metadata)
    • EntityType table (type definitions)
  • Schema Rigidity: Relational schemas require predefined table structures, which conflicts with AOM’s goal of runtime flexibility.
  • Performance Issues: The EAV (Entity-Attribute-Value) pattern commonly used in relational AOM implementations suffers from poor query performance due to the lack of indexing on the “value” column’s varied data types.
  • Complex Queries: Simple business queries become complex multi-table joins with numerous conditions, making the system difficult to optimize and maintain.

The Document Database Solution

Document databases like MongoDB naturally align with AOM principles:

  • Schema Flexibility: Documents can contain arbitrary fields without predefined schemas, allowing entity types to evolve dynamically.
  • Nested Structures: Complex relationships and metadata can be stored within documents, reducing the need for joins.
  • Rich Querying: Modern document databases provide sophisticated query capabilities while maintaining flexibility.
  • Indexing: Flexible indexing strategies can be applied to document fields as needed.

Rust Implementation

Let’s implement AOM in Rust, taking advantage of its type safety while maintaining flexibility through traits and enums. Rust’s ownership model and pattern matching make it particularly well-suited for safe metaprogramming.

use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use std::sync::Arc;

// Type-safe property values using enums
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PropertyValue {
    String(String),
    Integer(i64),
    Float(f64),
    Boolean(bool),
    Date(chrono::DateTime<chrono::Utc>),
}

impl PropertyValue {
    pub fn type_name(&self) -> &'static str {
        match self {
            PropertyValue::String(_) => "String",
            PropertyValue::Integer(_) => "Integer", 
            PropertyValue::Float(_) => "Float",
            PropertyValue::Boolean(_) => "Boolean",
            PropertyValue::Date(_) => "Date",
        }
    }
}

// Property type definition
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PropertyType {
    pub name: String,
    pub value_type: String,
    pub required: bool,
    pub default_value: Option<PropertyValue>,
}

impl PropertyType {
    pub fn new(name: &str, value_type: &str, required: bool) -> Self {
        Self {
            name: name.to_string(),
            value_type: value_type.to_string(),
            required,
            default_value: None,
        }
    }
}

// Property instance
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Property {
    pub property_type: String, // Reference to PropertyType name
    pub value: PropertyValue,
}

impl Property {
    pub fn new(property_type: &str, value: PropertyValue) -> Self {
        Self {
            property_type: property_type.to_string(),
            value,
        }
    }
}

// Operation trait for dynamic behavior
pub trait Operation: Send + Sync + std::fmt::Debug {
    fn execute(&self, entity: &Entity, args: &[PropertyValue]) -> Result<PropertyValue, String>;
    fn name(&self) -> &str;
}

// Entity type definition
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntityType {
    pub name: String,
    pub property_types: HashMap<String, PropertyType>,
    #[serde(skip)]
    pub operations: HashMap<String, Arc<dyn Operation>>,
}

impl EntityType {
    pub fn new(name: &str) -> Self {
        Self {
            name: name.to_string(),
            property_types: HashMap::new(),
            operations: HashMap::new(),
        }
    }

    pub fn add_property_type(&mut self, property_type: PropertyType) {
        self.property_types.insert(property_type.name.clone(), property_type);
    }

    pub fn add_operation(&mut self, operation: Arc<dyn Operation>) {
        self.operations.insert(operation.name().to_string(), operation);
    }

    pub fn get_property_type(&self, name: &str) -> Option<&PropertyType> {
        self.property_types.get(name)
    }
}

// Entity instance
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Entity {
    pub entity_type: String, // Reference to EntityType name
    pub properties: HashMap<String, Property>,
}

impl Entity {
    pub fn new(entity_type: &str) -> Self {
        Self {
            entity_type: entity_type.to_string(),
            properties: HashMap::new(),
        }
    }

    pub fn add_property(&mut self, property: Property) {
        self.properties.insert(property.property_type.clone(), property);
    }

    pub fn get_property(&self, name: &str) -> Option<&PropertyValue> {
        self.properties.get(name).map(|p| &p.value)
    }

    pub fn set_property(&mut self, name: &str, value: PropertyValue) {
        if let Some(property) = self.properties.get_mut(name) {
            property.value = value;
        }
    }
}

// Registry to manage types and instances
pub struct EntityRegistry {
    entity_types: HashMap<String, EntityType>,
    entities: HashMap<String, Entity>,
}

impl EntityRegistry {
    pub fn new() -> Self {
        Self {
            entity_types: HashMap::new(),
            entities: HashMap::new(),
        }
    }

    pub fn register_type(&mut self, entity_type: EntityType) {
        self.entity_types.insert(entity_type.name.clone(), entity_type);
    }

    pub fn create_entity(&mut self, type_name: &str, id: &str) -> Result<(), String> {
        if !self.entity_types.contains_key(type_name) {
            return Err(format!("Unknown entity type: {}", type_name));
        }
        
        let entity = Entity::new(type_name);
        self.entities.insert(id.to_string(), entity);
        Ok(())
    }

    // New method to get a mutable reference to an entity
    pub fn get_entity_mut(&mut self, id: &str) -> Option<&mut Entity> {
        self.entities.get_mut(id)
    }

    pub fn execute_operation(
        &self, 
        entity_id: &str, 
        operation_name: &str, 
        args: &[PropertyValue]
    ) -> Result<PropertyValue, String> {
        let entity = self.entities.get(entity_id)
            .ok_or_else(|| format!("Entity not found: {}", entity_id))?;
        
        let entity_type = self.entity_types.get(&entity.entity_type)
            .ok_or_else(|| format!("Entity type not found: {}", entity.entity_type))?;
        
        let operation = entity_type.operations.get(operation_name)
            .ok_or_else(|| format!("Operation not found: {}", operation_name))?;
        
        operation.execute(entity, args)
    }
}

// Example operations
#[derive(Debug)]
struct DriveOperation;

impl Operation for DriveOperation {
    fn execute(&self, entity: &Entity, _args: &[PropertyValue]) -> Result<PropertyValue, String> {
        if let Some(PropertyValue::String(maker)) = entity.get_property("maker") {
            Ok(PropertyValue::String(format!("Driving the {} vehicle", maker)))
        } else {
            Ok(PropertyValue::String("Driving vehicle".to_string()))
        }
    }

    fn name(&self) -> &str {
        "drive"
    }
}

#[derive(Debug)]
struct MaintenanceOperation;

impl Operation for MaintenanceOperation {
    fn execute(&self, entity: &Entity, _args: &[PropertyValue]) -> Result<PropertyValue, String> {
        if let Some(PropertyValue::Integer(miles)) = entity.get_property("miles") {
            let next_maintenance = miles + 5000;
            Ok(PropertyValue::String(format!("Next maintenance due at {} miles", next_maintenance)))
        } else {
            Ok(PropertyValue::String("Maintenance scheduled".to_string()))
        }
    }

    fn name(&self) -> &str {
        "perform_maintenance"
    }
}

// Usage example
fn example_usage() -> Result<(), String> {
    let mut registry = EntityRegistry::new();

    // Define vehicle type
    let mut vehicle_type = EntityType::new("Vehicle");
    vehicle_type.add_property_type(PropertyType::new("maker", "String", true));
    vehicle_type.add_property_type(PropertyType::new("model", "String", true));
    vehicle_type.add_property_type(PropertyType::new("year", "Integer", true));
    vehicle_type.add_property_type(PropertyType::new("miles", "Integer", false));
    
    vehicle_type.add_operation(Arc::new(DriveOperation));
    vehicle_type.add_operation(Arc::new(MaintenanceOperation));

    registry.register_type(vehicle_type);

    // Create a new entity instance
    registry.create_entity("Vehicle", "vehicle_1")?;
    
    // Get a mutable reference to the new entity and set its properties
    if let Some(car) = registry.get_entity_mut("vehicle_1") {
        car.add_property(Property::new("maker", PropertyValue::String("Tesla".to_string())));
        car.add_property(Property::new("model", PropertyValue::String("Model 3".to_string())));
        car.add_property(Property::new("year", PropertyValue::Integer(2022)));
        car.add_property(Property::new("miles", PropertyValue::Integer(15000)));
    }

    // Execute the drive operation and print the result
    let drive_result = registry.execute_operation("vehicle_1", "drive", &[])?;
    println!("Drive operation result: {:?}", drive_result);

    // Execute the maintenance operation and print the result
    let maintenance_result = registry.execute_operation("vehicle_1", "perform_maintenance", &[])?;
    println!("Maintenance operation result: {:?}", maintenance_result);

    Ok(())
}

fn main() {
    match example_usage() {
        Ok(_) => println!("Example completed successfully."),
        Err(e) => eprintln!("Error: {}", e),
    }
}

The Rust implementation provides several advantages:

  • Type Safety: Enum-based property values ensure type safety while maintaining flexibility.
  • Memory Safety: Rust’s ownership model prevents common memory issues found in dynamic systems.
  • Performance: Zero-cost abstractions and compile-time optimizations.
  • Concurrency: Built-in support for safe concurrent access to shared data.

TypeScript Implementation

TypeScript brings static typing to JavaScript’s dynamic nature, providing an excellent balance for AOM implementations:

// Type definitions for property values
type PropertyValue = string | number | boolean | Date;

interface PropertyType {
  name: string;
  valueType: string;
  required: boolean;
  defaultValue?: PropertyValue;
}

interface Property {
  propertyType: string;
  value: PropertyValue;
}

// Operation interface with proper typing
interface Operation {
  name: string;
  execute(entity: Entity, args: PropertyValue[]): PropertyValue;
}

// Generic constraint for entity properties
interface PropertyMap {
  [key: string]: PropertyValue;
}

class EntityType {
  private propertyTypes: Map<string, PropertyType> = new Map();
  private operations: Map<string, Operation> = new Map();

  constructor(public readonly typeName: string) {}

  addPropertyType(propertyType: PropertyType): void {
    this.propertyTypes.set(propertyType.name, propertyType);
  }

  addOperation(operation: Operation): void {
    this.operations.set(operation.name, operation);
  }

  getPropertyType(name: string): PropertyType | undefined {
    return this.propertyTypes.get(name);
  }

  getOperation(name: string): Operation | undefined {
    return this.operations.get(name);
  }

  getAllPropertyTypes(): PropertyType[] {
    return Array.from(this.propertyTypes.values());
  }

  // Type guard for property validation
  validateProperty(name: string, value: PropertyValue): boolean {
    const propertyType = this.getPropertyType(name);
    if (!propertyType) return false;

    switch (propertyType.valueType) {
      case 'string':
        return typeof value === 'string';
      case 'number':
        return typeof value === 'number';
      case 'boolean':
        return typeof value === 'boolean';
      case 'date':
        return value instanceof Date;
      default:
        return false;
    }
  }
}

class Entity {
  private properties: Map<string, Property> = new Map();

  constructor(public readonly entityType: EntityType) {
    // Initialize with default values
    entityType.getAllPropertyTypes().forEach(propType => {
      if (propType.defaultValue !== undefined) {
        this.setProperty(propType.name, propType.defaultValue);
      }
    });
  }

  setProperty(name: string, value: PropertyValue): boolean {
    if (!this.entityType.validateProperty(name, value)) {
      throw new Error(`Invalid property: ${name} with value ${value}`);
    }

    const propertyType = this.entityType.getPropertyType(name);
    if (!propertyType) {
      throw new Error(`Unknown property type: ${name}`);
    }

    this.properties.set(name, {
      propertyType: name,
      value
    });

    return true;
  }

  getProperty<T extends PropertyValue>(name: string): T | undefined {
    const property = this.properties.get(name);
    return property?.value as T;
  }

  executeOperation(operationName: string, args: PropertyValue[] = []): PropertyValue {
    const operation = this.entityType.getOperation(operationName);
    if (!operation) {
      throw new Error(`Unknown operation: ${operationName}`);
    }
    return operation.execute(this, args);
  }

  // Dynamic property access with Proxy
  static withDynamicAccess(entity: Entity): Entity & PropertyMap {
    return new Proxy(entity, {
      get(target, prop: string) {
        if (prop in target) {
          return (target as any)[prop];
        }
        return target.getProperty(prop);
      },
      set(target, prop: string, value: PropertyValue) {
        try {
          target.setProperty(prop, value);
          return true;
        } catch {
          return false;
        }
      }
    }) as Entity & PropertyMap;
  }
}

// Enhanced operation implementations
class DriveOperation implements Operation {
  name = 'drive';

  execute(entity: Entity, args: PropertyValue[]): PropertyValue {
    const maker = entity.getProperty<string>('maker') || 'Unknown';
    const speed = args[0] as number || 60;
    return `Driving the ${maker} at ${speed} mph`;
  }
}

class MaintenanceOperation implements Operation {
  name = 'performMaintenance';

  execute(entity: Entity, args: PropertyValue[]): PropertyValue {
    const miles = entity.getProperty<number>('miles') || 0;
    const maintenanceType = args[0] as string || 'basic';
    
    // Business logic for maintenance
    const cost = maintenanceType === 'premium' ? 150 : 75;
    const nextDue = miles + (maintenanceType === 'premium' ? 10000 : 5000);
    
    return `${maintenanceType} maintenance completed. Cost: $${cost}. Next due: ${nextDue} miles`;
  }
}

// Factory for creating entities with fluent interface
class EntityFactory {
  private types: Map<string, EntityType> = new Map();

  defineType(name: string): TypeBuilder {
    return new TypeBuilder(name, this);
  }

  registerType(entityType: EntityType): void {
    this.types.set(entityType.typeName, entityType);
  }

  createEntity(typeName: string): Entity {
    const type = this.types.get(typeName);
    if (!type) {
      throw new Error(`Unknown entity type: ${typeName}`);
    }
    return Entity.withDynamicAccess(new Entity(type));
  }
}

class TypeBuilder {
  private entityType: EntityType;

  constructor(typeName: string, private factory: EntityFactory) {
    this.entityType = new EntityType(typeName);
  }

  property(name: string, type: string, required = false, defaultValue?: PropertyValue): TypeBuilder {
    this.entityType.addPropertyType({ name, valueType: type, required, defaultValue });
    return this;
  }

  operation(operation: Operation): TypeBuilder {
    this.entityType.addOperation(operation);
    return this;
  }

  build(): EntityType {
    this.factory.registerType(this.entityType);
    return this.entityType;
  }
}

// Usage example with modern TypeScript features
const factory = new EntityFactory();

// Define vehicle type with fluent interface
factory.defineType('Vehicle')
  .property('maker', 'string', true)
  .property('model', 'string', true)
  .property('year', 'number', true, 2024)
  .property('miles', 'number', false, 0)
  .property('isElectric', 'boolean', false, false)
  .operation(new DriveOperation())
  .operation(new MaintenanceOperation())
  .build();

// Create and use vehicle with dynamic property access
const vehicle = factory.createEntity('Vehicle') as Entity & PropertyMap;

// Type-safe property access
vehicle.maker = 'Tesla';
vehicle.model = 'Model 3';
vehicle.isElectric = true;

console.log(vehicle.executeOperation('drive', [75]));
console.log(vehicle.executeOperation('performMaintenance', ['premium']));

// Dynamic property enumeration
Object.keys(vehicle).forEach(key => {
  console.log(`${key}: ${vehicle[key]}`);
});

The TypeScript implementation provides:

  • Gradual Typing: Mix dynamic and static typing as needed.
  • Modern Language Features: Generics, type guards, Proxy objects, and fluent interfaces.
  • Developer Experience: Excellent tooling support with autocomplete and type checking.
  • Flexibility: Easy migration from JavaScript while adding type safety incrementally.

Enhanced Ruby Implementation

Ruby’s metaprogramming capabilities make it particularly well-suited for AOM. Let’s enhance the original implementation with modern Ruby features:

require 'date'
require 'json'
require 'securerandom'

# Enhanced PropertyType with validation
class PropertyType
  attr_reader :name, :type, :required, :validator

  def initialize(name, type, required: false, default: nil, &validator)
    @name = name
    @type = type
    @required = required
    @default = default
    @validator = validator
  end

  def valid?(value)
    return false if @required && value.nil?
    return true if value.nil? && !@required
    
    type_valid = case @type
                 when :string then value.is_a?(String)
                 when :integer then value.is_a?(Integer)
                 when :float then value.is_a?(Float) || value.is_a?(Integer)
                 when :boolean then [true, false].include?(value)
                 when :date then value.is_a?(Date) || value.is_a?(Time)
                 else true
                 end
    
    type_valid && (@validator.nil? || @validator.call(value))
  end

  def default_value
    @default.respond_to?(:call) ? @default.call : @default
  end
end

# Enhanced EntityType with DSL
class EntityType
  attr_reader :name, :property_types, :operations, :validations

  def initialize(name, &block)
    @name = name
    @property_types = {}
    @operations = {}
    @validations = []
    
    instance_eval(&block) if block_given?
  end

  # DSL methods
  def property(name, type, **options, &validator)
    @property_types[name] = PropertyType.new(name, type, **options, &validator)
  end

  def operation(name, &block)
    @operations[name] = block
  end

  def validate(&block)
    @validations << block
  end

  def valid_entity?(entity)
    @validations.all? { |validation| validation.call(entity) }
  end

  def create_entity(**attributes)
    Entity.new(self, attributes)
  end
end

# Enhanced Entity with method delegation and validations
class Entity
  attr_reader :entity_type, :id

  def initialize(entity_type, attributes = {})
    @entity_type = entity_type
    @properties = {}
    @id = attributes.delete(:id) || SecureRandom.uuid
    
    # Set default values
    @entity_type.property_types.each do |name, prop_type|
      @properties[name] = prop_type.default_value unless prop_type.default_value.nil?
    end
    
    # Set provided attributes
    attributes.each { |name, value| set_property(name, value) }
    
    # Add dynamic methods for properties
    create_property_methods
    
    # Validate entity
    validate!
  end

  def set_property(name, value)
    prop_type = @entity_type.property_types[name]
    raise ArgumentError, "Unknown property: #{name}" unless prop_type
    raise ArgumentError, "Invalid value for #{name}" unless prop_type.valid?(value)
    
    @properties[name] = value
    # Removed the line `validate! if defined?(@properties)`
  end

  def get_property(name)
    @properties[name]
  end

  def execute_operation(name, *args)
    operation = @entity_type.operations[name]
    raise ArgumentError, "Unknown operation: #{name}" unless operation
    
    instance_exec(*args, &operation)
  end

  def to_h
    @properties.dup.merge(entity_type: @entity_type.name, id: @id)
  end

  def to_json(*args)
    to_h.to_json(*args)
  end

  private

  def create_property_methods
    @entity_type.property_types.each do |name, _|
      # Getter
      define_singleton_method(name) { get_property(name) }
      
      # Setter
      define_singleton_method("#{name}=") { |value| set_property(name, value) }
      
      # Predicate method for boolean properties
      if @entity_type.property_types[name].type == :boolean
        define_singleton_method("#{name}?") { !!get_property(name) }
      end
    end
  end

  def validate!
    # Check required properties
    @entity_type.property_types.each do |name, prop_type|
      if prop_type.required && @properties[name].nil?
        raise ArgumentError, "Required property missing: #{name}"
      end
    end
    
    # Run entity-level validations
    unless @entity_type.valid_entity?(self)
      raise ArgumentError, "Entity validation failed"
    end
  end
end

# Registry with persistence capabilities
class EntityRegistry
  def initialize
    @entity_types = {}
    @entities = {}
  end

  def define_type(name, &block)
    @entity_types[name] = EntityType.new(name, &block)
  end

  def create_entity(type_name, **attributes)
    entity_type = @entity_types[type_name]
    raise ArgumentError, "Unknown entity type: #{type_name}" unless entity_type
    
    entity = entity_type.create_entity(**attributes)
    @entities[entity.id] = entity
    entity
  end

  def find_entity(id)
    @entities[id]
  end

  def find_entities_by_type(type_name)
    @entities.values.select { |entity| entity.entity_type.name == type_name }
  end

  def export_to_json
    {
      entity_types: @entity_types.keys,
      entities: @entities.values.map(&:to_h)
    }.to_json
  end
end

# Usage example with modern Ruby features
registry = EntityRegistry.new

# Define vehicle type with validations
registry.define_type('Vehicle') do
  property :maker, :string, required: true
  property :model, :string, required: true
  property :year, :integer, required: true do |year|
    year.between?(1900, Date.today.year + 1)
  end
  property :miles, :integer, default: 0 do |miles|
    miles >= 0
  end
  property :electric, :boolean, default: false
  
  # Entity-level validation
  validate do |entity|
    # Electric vehicles should have zero emissions
    !entity.electric? || entity.year >= 2010
  end
  
  operation :drive do |distance = 10|
    current_miles = miles || 0
    self.miles = current_miles + distance
    "Drove #{distance} miles in #{maker} #{model}. Total miles: #{miles}"
  end
  
  operation :maintenance do |type = 'basic'|
    cost = type == 'premium' ? 150 : 75
    next_due = miles + (type == 'premium' ? 10000 : 5000)
    
    "#{type.capitalize} maintenance completed for #{maker} #{model}. " \
    "Cost: $#{cost}. Next maintenance due at #{next_due} miles."
  end
end

# Create and use vehicles
tesla = registry.create_entity('Vehicle', 
  maker: 'Tesla', 
  model: 'Model S', 
  year: 2023, 
  electric: true
)

toyota = registry.create_entity('Vehicle',
  maker: 'Toyota',
  model: 'Camry',
  year: 2022
)

# Use dynamic methods
puts tesla.execute_operation(:drive, 50)
puts toyota.execute_operation(:maintenance, 'premium')

# Access properties naturally
puts "#{tesla.maker} #{tesla.model} is electric: #{tesla.electric?}"
puts "Toyota has #{toyota.miles} miles"

# Export to JSON
puts registry.export_to_json

MongoDB Integration

Modern document databases like MongoDB provide natural storage for AOM entities. Here’s how to integrate AOM with MongoDB:

import { MongoClient, Collection, Db } from 'mongodb';

interface MongoEntity {
  _id?: string;
  entityType: string;
  properties: Record<string, any>;
  createdAt: Date;
  updatedAt: Date;
}

interface MongoEntityType {
  _id?: string;
  name: string;
  propertyTypes: Record<string, any>;
  version: number;
  createdAt: Date;
}

class MongoEntityStore {
  private db: Db;
  private entitiesCollection: Collection<MongoEntity>;
  private typesCollection: Collection<MongoEntityType>;

  constructor(db: Db) {
    this.db = db;
    this.entitiesCollection = db.collection('entities');
    this.typesCollection = db.collection('entity_types');
  }

  async saveEntityType(entityType: EntityType): Promise<void> {
    const mongoType: MongoEntityType = {
      name: entityType.typeName,
      propertyTypes: Object.fromEntries(
        entityType.getAllPropertyTypes().map(pt => [pt.name, pt])
      ),
      version: 1,
      createdAt: new Date()
    };

    await this.typesCollection.replaceOne(
      { name: entityType.typeName },
      mongoType,
      { upsert: true }
    );
  }

  async saveEntity(entity: Entity): Promise<string> {
    const mongoEntity: MongoEntity = {
      entityType: entity.entityType.typeName,
      properties: this.serializeProperties(entity),
      createdAt: new Date(),
      updatedAt: new Date()
    };

    const result = await this.entitiesCollection.insertOne(mongoEntity);
    return result.insertedId.toString();
  }

  async findEntitiesByType(typeName: string): Promise<any[]> {
    return await this.entitiesCollection
      .find({ entityType: typeName })
      .toArray();
  }

  async findEntity(id: string): Promise<MongoEntity | null> {
    return await this.entitiesCollection.findOne({ _id: id as any });
  }

  async updateEntity(id: string, updates: Record<string, any>): Promise<void> {
    await this.entitiesCollection.updateOne(
      { _id: id as any },
      { 
        $set: { 
          ...updates, 
          updatedAt: new Date() 
        } 
      }
    );
  }

  // Complex queries using MongoDB aggregation
  async getEntityStatistics(typeName: string): Promise<any> {
    return await this.entitiesCollection.aggregate([
      { $match: { entityType: typeName } },
      {
        $group: {
          _id: '$entityType',
          count: { $sum: 1 },
          avgMiles: { $avg: '$properties.miles' },
          makers: { $addToSet: '$properties.maker' }
        }
      }
    ]).toArray();
  }

  // Full-text search across entities
  async searchEntities(query: string): Promise<MongoEntity[]> {
    return await this.entitiesCollection
      .find({ $text: { $search: query } })
      .toArray();
  }

  private serializeProperties(entity: Entity): Record<string, any> {
    const result: Record<string, any> = {};
    entity.entityType.getAllPropertyTypes().forEach(pt => {
      const value = entity.getProperty(pt.name);
      if (value !== undefined) {
        result[pt.name] = value;
      }
    });
    return result;
  }
}

// Usage with indexes for performance
async function setupDatabase() {
  const client = new MongoClient('mongodb://localhost:27017');
  await client.connect();
  
  const db = client.db('aom_example');
  const store = new MongoEntityStore(db);

  // Create indexes for better performance
  await db.collection('entities').createIndex({ entityType: 1 });
  await db.collection('entities').createIndex({ 'properties.maker': 1 });
  await db.collection('entities').createIndex({ 'properties.year': 1 });
  await db.collection('entities').createIndex(
    { 
      'properties.maker': 'text', 
      'properties.model': 'text' 
    }
  );

  return store;
}

Benefits of Document Storage

  • Schema Evolution: MongoDB’s flexible schema allows entity types to evolve without database migrations.
  • Rich Querying: MongoDB’s query language supports complex operations on nested documents.
  • Indexing Strategy: Flexible indexing on any field, including nested properties.
  • Aggregation Pipeline: Powerful analytics capabilities for business intelligence.
  • Horizontal Scaling: Built-in sharding support for handling large datasets.

Modern Applications and Future Directions

Contemporary Usage Patterns

  • Configuration Management: Modern applications use AOM-like patterns for feature flags, A/B testing configurations, and user preference systems.
  • API Gateway Configuration: Services like Kong and AWS API Gateway use dynamic configuration patterns similar to AOM.
  • Workflow Engines: Business process management systems employ AOM patterns to define configurable workflows.
  • Multi-tenant SaaS: AOM enables SaaS applications to provide customizable data models per tenant.

Emerging Technologies

  • GraphQL Schema Stitching: Dynamic schema composition shares conceptual similarities with AOM’s type composition.
  • Serverless Functions: Event-driven architectures benefit from AOM’s dynamic behavior binding.
  • Container Orchestration: Kubernetes uses similar patterns for dynamic resource management and configuration.
  • Low-Code Platforms: Modern low-code solutions extensively use AOM principles for visual application building.

Performance Considerations and Optimizations

Caching Strategies

class CachedEntityStore {
  private cache: Map<string, Entity> = new Map();
  private typeCache: Map<string, EntityType> = new Map();

  async getEntity(id: string): Promise<Entity | null> {
    // Check cache first
    if (this.cache.has(id)) {
      return this.cache.get(id)!;
    }

    // Load from database
    const entity = await this.store.findEntity(id);
    if (entity) {
      this.cache.set(id, entity);
    }
    
    return entity;
  }

  invalidateEntity(id: string): void {
    this.cache.delete(id);
  }
}

Lazy Loading and Materialized Views

For complex entity relationships, implement lazy loading and consider materialized views for frequently accessed computed properties.

Schema Evolution and Versioning

One of the most critical aspects of production AOM systems is managing schema evolution over time. Unlike traditional systems where database migrations handle schema changes, AOM systems must support dynamic evolution while maintaining data integrity and backward compatibility.

Version Management Strategy

interface EntityTypeVersion {
  version: number;
  entityTypeName: string;
  changes: SchemaChange[];
  compatibleWith: number[];
  deprecatedIn?: number;
  migrations: Migration[];
  createdAt: Date;
}

interface SchemaChange {
  type: 'ADD_PROPERTY' | 'REMOVE_PROPERTY' | 'MODIFY_PROPERTY' | 'ADD_OPERATION';
  propertyName?: string;
  oldType?: string;
  newType?: string;
  defaultValue?: any;
  migrationRequired: boolean;
}

interface Migration {
  fromVersion: number;
  toVersion: number;
  transform: (entity: any) => any;
  reversible: boolean;
}

Backward Compatibility Patterns

Additive Changes: New properties should be optional with sensible defaults:

// Safe evolution - adding optional property
let mut vehicle_type_v2 = vehicle_type_v1.clone();
vehicle_type_v2.add_property_type(PropertyType::new(
    "fuel_efficiency", 
    "Float", 
    false // not required
));
vehicle_type_v2.version = 2;

Property Type Changes: Handle type evolution gracefully:

class PropertyMigration {
  static migrateStringToEnum(oldValue: string, enumValues: string[]): string {
    // Attempt intelligent mapping
    const lowercaseValue = oldValue.toLowerCase();
    const match = enumValues.find(val => 
      val.toLowerCase().includes(lowercaseValue) ||
      lowercaseValue.includes(val.toLowerCase())
    );
    return match || enumValues[0]; // fallback to first enum value
  }
}

Multi-Version Support: Systems should support multiple schema versions simultaneously:

class EntityStore
  def save_entity(entity, force_version: nil)
    target_version = force_version || @current_schema_version
    
    if entity.schema_version != target_version
      migrated_entity = migrate_entity(entity, target_version)
      store_with_version(migrated_entity, target_version)
    else
      store_with_version(entity, entity.schema_version)
    end
  end
  
  private def migrate_entity(entity, target_version)
    current_version = entity.schema_version
    
    while current_version < target_version
      migration = find_migration(current_version, current_version + 1)
      entity = migration.transform(entity)
      current_version += 1
    end
    
    entity.schema_version = target_version
    entity
  end
end

Deployment Strategies

Blue-Green Schema Deployment: Deploy new schemas alongside existing ones, gradually migrating entities:

  1. Deploy new schema version to “green” environment
  2. Run both old and new versions in parallel
  3. Migrate entities in batches with rollback capability
  4. Switch traffic to new version
  5. Decommission old version after validation period

Feature Flags for Schema Changes: Control schema availability through configuration:

class SchemaFeatureFlags {
  private flags: Map<string, boolean> = new Map();
  
  enableSchemaVersion(entityType: string, version: number): void {
    this.flags.set(`${entityType}_v${version}`, true);
  }
  
  isSchemaVersionEnabled(entityType: string, version: number): boolean {
    return this.flags.get(`${entityType}_v${version}`) || false;
  }
}

Performance Optimization Deep Dive

AOM systems face unique performance challenges due to their dynamic nature. However, careful optimization can achieve performance comparable to traditional systems while maintaining flexibility.

Caching Strategies

Entity Type Definition Caching: Cache compiled entity types to avoid repeated parsing:

use std::sync::{Arc, RwLock};
use std::collections::HashMap;

pub struct EntityTypeCache {
    types: RwLock<HashMap<String, Arc<EntityType>>>,
    compiled_operations: RwLock<HashMap<String, CompiledOperation>>,
}

impl EntityTypeCache {
    pub fn get_or_compile(&self, type_name: &str) -> Arc<EntityType> {
        // Try read lock first
        {
            let cache = self.types.read().unwrap();
            if let Some(entity_type) = cache.get(type_name) {
                return entity_type.clone();
            }
        }
        
        // Compile with write lock
        let mut cache = self.types.write().unwrap();
        // Double-check pattern to avoid race conditions
        if let Some(entity_type) = cache.get(type_name) {
            return entity_type.clone();
        }
        
        let compiled_type = self.compile_entity_type(type_name);
        let arc_type = Arc::new(compiled_type);
        cache.insert(type_name.to_string(), arc_type.clone());
        arc_type
    }
}

Property Access Optimization: Use property maps with optimized access patterns:

class OptimizedEntity {
  private propertyCache: Map<string, any> = new Map();
  private accessCounts: Map<string, number> = new Map();
  
  getProperty<T>(name: string): T | undefined {
    // Track access patterns for optimization
    this.accessCounts.set(name, (this.accessCounts.get(name) || 0) + 1);
    
    // Check cache first
    if (this.propertyCache.has(name)) {
      return this.propertyCache.get(name);
    }
    
    // Load from storage and cache frequently accessed properties
    const value = this.loadPropertyFromStorage(name);
    if (this.accessCounts.get(name)! > 3) {
      this.propertyCache.set(name, value);
    }
    
    return value;
  }
}

Database Optimization

Strategic Indexing: Create indexes based on query patterns rather than all properties:

// MongoDB optimization for AOM queries
await db.collection('entities').createIndex({
  'entityType': 1,
  'properties.status': 1,
  'updatedAt': -1
}, {
  name: 'entity_status_time_idx',
  partialFilterExpression: {
    'properties.status': { $exists: true }
  }
});

// Compound index for common query patterns
await db.collection('entities').createIndex({
  'entityType': 1,
  'properties.category': 1,
  'properties.priority': 1
});

Query Optimization Patterns: Use aggregation pipelines for complex queries:

class OptimizedEntityStore {
  async findEntitiesWithAggregation(criteria) {
    return await this.collection.aggregate([
      // Match stage - use indexes
      {
        $match: {
          entityType: criteria.type,
          'properties.status': { $in: criteria.statuses }
        }
      },
      
      // Project only needed fields early
      {
        $project: {
          _id: 1,
          entityType: 1,
          'properties.name': 1,
          'properties.status': 1,
          'properties.priority': 1
        }
      },
      
      // Sort with index support
      {
        $sort: { 'properties.priority': -1, _id: 1 }
      },
      
      // Limit results early
      { $limit: criteria.limit || 100 }
    ]).toArray();
  }
}

Connection Pooling and Read Replicas: Optimize database connections for high-throughput scenarios:

class DatabaseManager {
  private writePool: ConnectionPool;
  private readPools: ConnectionPool[];
  
  async saveEntity(entity: Entity): Promise<void> {
    // Use write connection for mutations
    const connection = await this.writePool.getConnection();
    try {
      await connection.save(entity);
    } finally {
      this.writePool.releaseConnection(connection);
    }
  }
  
  async findEntities(query: any): Promise<Entity[]> {
    // Use read replicas for queries
    const readPool = this.selectOptimalReadPool();
    const connection = await readPool.getConnection();
    try {
      return await connection.find(query);
    } finally {
      readPool.releaseConnection(connection);
    }
  }
}

Memory Management

Lazy Loading: Load entity properties on demand:

class LazyEntity
  def initialize(entity_type, id)
    @entity_type = entity_type
    @id = id
    @loaded_properties = {}
    @all_loaded = false
  end
  
  def method_missing(method_name, *args)
    property_name = method_name.to_s
    
    if @entity_type.has_property?(property_name)
      load_property(property_name) unless @loaded_properties.key?(property_name)
      @loaded_properties[property_name]
    else
      super
    end
  end
  
  private def load_property(property_name)
    # Load single property from database
    value = Database.load_property(@id, property_name)
    @loaded_properties[property_name] = value
  end
end

Weak References for Caches: Prevent memory leaks in entity caches:

use std::sync::Weak;
use std::collections::HashMap;

pub struct WeakEntityCache {
    entities: HashMap<String, Weak<Entity>>,
}

impl WeakEntityCache {
    pub fn get(&mut self, id: &str) -> Option<Arc<Entity>> {
        // Clean up dead references periodically
        if let Some(weak_ref) = self.entities.get(id) {
            if let Some(entity) = weak_ref.upgrade() {
                return Some(entity);
            } else {
                self.entities.remove(id);
            }
        }
        None
    }
    
    pub fn insert(&mut self, id: String, entity: Arc<Entity>) {
        self.entities.insert(id, Arc::downgrade(&entity));
    }
}

Security and Validation Framework

Security in AOM systems is critical due to the dynamic nature of schema and operations. Traditional security models must be extended to handle runtime modifications safely.

Authorization Framework

Schema Modification Permissions: Control who can modify entity types:

interface SchemaPermission {
  principal: string; // user or role
  entityType: string;
  actions: SchemaAction[];
  conditions?: PermissionCondition[];
}

enum SchemaAction {
  CREATE_TYPE = 'CREATE_TYPE',
  MODIFY_TYPE = 'MODIFY_TYPE',
  DELETE_TYPE = 'DELETE_TYPE',
  ADD_PROPERTY = 'ADD_PROPERTY',
  REMOVE_PROPERTY = 'REMOVE_PROPERTY',
  ADD_OPERATION = 'ADD_OPERATION'
}

class SchemaAuthorizationService {
  checkPermission(
    principal: string, 
    action: SchemaAction, 
    entityType: string
  ): boolean {
    const permissions = this.getPermissions(principal);
    
    return permissions.some(permission => 
      permission.entityType === entityType &&
      permission.actions.includes(action) &&
      this.evaluateConditions(permission.conditions)
    );
  }
}

Property-Level Access Control: Fine-grained access control for sensitive properties:

use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PropertyAccess {
    pub property_name: String,
    pub read_roles: Vec<String>,
    pub write_roles: Vec<String>,
    pub sensitive: bool,
}

impl Entity {
    pub fn get_property_secure(&self, name: &str, user_roles: &[String]) -> Result<Option<&PropertyValue>, SecurityError> {
        let access = self.entity_type.get_property_access(name)
            .ok_or(SecurityError::PropertyNotFound)?;
        
        if !access.read_roles.iter().any(|role| user_roles.contains(role)) {
            return Err(SecurityError::InsufficientPermissions);
        }
        
        if access.sensitive {
            self.audit_property_access(name, user_roles);
        }
        
        Ok(self.properties.get(name).map(|p| &p.value))
    }
}

Input Validation and Sanitization

Dynamic Property Validation: Validate properties based on runtime type definitions:

class PropertyValidator {
  static validate(
    property: Property, 
    propertyType: PropertyType, 
    context: ValidationContext
  ): ValidationResult {
    const errors: string[] = [];
    
    // Type validation
    if (!this.isValidType(property.value, propertyType.valueType)) {
      errors.push(`Invalid type for ${propertyType.name}`);
    }
    
    // Custom validation rules
    if (propertyType.validator) {
      try {
        const isValid = propertyType.validator(property.value, context);
        if (!isValid) {
          errors.push(`Custom validation failed for ${propertyType.name}`);
        }
      } catch (error) {
        errors.push(`Validation error: ${error.message}`);
      }
    }
    
    // Sanitization for string properties
    if (typeof property.value === 'string') {
      property.value = this.sanitizeString(property.value);
    }
    
    return {
      valid: errors.length === 0,
      errors,
      sanitizedValue: property.value
    };
  }
  
  private static sanitizeString(input: string): string {
    // Remove potentially dangerous content
    return input
      .replace(/<script\b[^<]*(?:(?!<\/script>)<[^<]*)*<\/script>/gi, '')
      .replace(/javascript:/gi, '')
      .replace(/on\w+\s*=/gi, '');
  }
}

Business Rule Enforcement: Implement complex validation rules across entities:

class BusinessRuleEngine
  def initialize
    @rules = {}
  end
  
  def add_rule(entity_type, rule_name, &block)
    @rules[entity_type] ||= {}
    @rules[entity_type][rule_name] = block
  end
  
  def validate_entity(entity)
    errors = []
    
    if rules = @rules[entity.entity_type.name]
      rules.each do |rule_name, rule_block|
        begin
          result = rule_block.call(entity)
          unless result.valid?
            errors.concat(result.errors.map { |e| "#{rule_name}: #{e}" })
          end
        rescue => e
          errors << "Rule #{rule_name} failed: #{e.message}"
        end
      end
    end
    
    ValidationResult.new(errors.empty?, errors)
  end
end

# Usage example
rule_engine = BusinessRuleEngine.new

rule_engine.add_rule('Vehicle', 'valid_year') do |entity|
  year = entity.get_property('year')
  if year && (year < 1900 || year > Date.current.year + 1)
    ValidationResult.new(false, ['Year must be between 1900 and next year'])
  else
    ValidationResult.new(true, [])
  end
end

Operation Security

Safe Operation Binding: Ensure operations cannot execute arbitrary code:

class SecureOperationBinder {
  private allowedOperations: Set<string> = new Set();
  private operationSandbox: OperationSandbox;
  
  constructor() {
    // Whitelist of safe operations
    this.allowedOperations.add('calculate');
    this.allowedOperations.add('format');
    this.allowedOperations.add('validate');
    
    this.operationSandbox = new OperationSandbox({
      allowedGlobals: ['Math', 'Date'],
      timeoutMs: 5000,
      memoryLimitMB: 10
    });
  }
  
  bindOperation(name: string, code: string): Operation {
    if (!this.allowedOperations.has(name)) {
      throw new Error(`Operation ${name} not in whitelist`);
    }
    
    // Static analysis for dangerous patterns
    if (this.containsDangerousPatterns(code)) {
      throw new Error('Operation contains dangerous patterns');
    }
    
    return this.operationSandbox.compile(code);
  }
  
  private containsDangerousPatterns(code: string): boolean {
    const dangerousPatterns = [
      /eval\s*\(/,
      /Function\s*\(/,
      /require\s*\(/,
      /import\s+/,
      /process\./,
      /global\./,
      /window\./
    ];
    
    return dangerousPatterns.some(pattern => pattern.test(code));
  }
}

Anti-patterns and Common Pitfalls

Learning from failures is crucial for successful AOM implementations. Here are the most common anti-patterns and how to avoid them.

1. Over-Engineering Stable Domains

Anti-pattern: Applying AOM to domains that rarely change

// DON'T: Using AOM for basic user authentication
const userType = new EntityType('User');
userType.addProperty('username', 'string');
userType.addProperty('passwordHash', 'string');
userType.addProperty('email', 'string');

// Better: Use traditional class for stable domain
class User {
  constructor(
    public username: string,
    public passwordHash: string,
    public email: string
  ) {}
}

When to avoid AOM:

  • Core business entities that haven’t changed in years
  • Performance-critical code paths
  • Simple CRUD operations
  • Well-established domain models

2. Performance Neglect

Anti-pattern: Ignoring performance implications of dynamic queries

// DON'T: Loading all entity properties for simple operations
async function getEntityName(id) {
  const entity = await entityStore.loadFullEntity(id); // Loads everything
  return entity.getProperty('name');
}

// Better: Load only needed properties
async function getEntityName(id) {
  return await entityStore.loadProperty(id, 'name');
}

Performance Guidelines:

  • Monitor query performance continuously
  • Use database profiling tools
  • Implement property-level lazy loading
  • Cache frequently accessed entity types

3. Type Explosion

Anti-pattern: Creating too many similar entity types instead of using properties

// DON'T: Creating separate types for minor variations
const sedanType = new EntityType('Sedan');
const suvType = new EntityType('SUV');
const truckType = new EntityType('Truck');

// Better: Use discriminator properties
const vehicleType = new EntityType('Vehicle');
vehicleType.addProperty('bodyType', 'enum', {
  values: ['sedan', 'suv', 'truck']
});

Type Design Guidelines:

  • Prefer composition over type proliferation
  • Use enums and discriminator fields
  • Consider type hierarchies carefully
  • Regular type audits to identify similar types

4. Missing Business Constraints

Anti-pattern: Focusing on technical flexibility while ignoring business rules

# DON'T: Allowing any combination of properties
vehicle = registry.create_entity('Vehicle',
  maker: 'Tesla',
  fuel_type: 'gasoline',  # This makes no sense!
  electric: true
)

# Better: Implement cross-property validation
class VehicleValidator
  def validate(entity)
    if entity.electric? && entity.fuel_type != 'electric'
      raise ValidationError, "Electric vehicles cannot have gasoline fuel type"
    end
  end
end

Constraint Guidelines:

  • Define business rules explicitly
  • Implement cross-property validation
  • Use state machines for complex business logic
  • Regular business rule audits

5. Cache Invalidation Problems

Anti-pattern: Inconsistent cache invalidation strategies

// DON'T: Forgetting to invalidate dependent caches
impl EntityStore {
    fn update_entity_type(&mut self, entity_type: EntityType) {
        self.entity_types.insert(entity_type.name.clone(), entity_type);
        // Forgot to invalidate entity instances cache!
    }
}

// Better: Comprehensive invalidation strategy
impl EntityStore {
    fn update_entity_type(&mut self, entity_type: EntityType) {
        let type_name = entity_type.name.clone();
        
        // Update type cache
        self.entity_types.insert(type_name.clone(), entity_type);
        
        // Invalidate all dependent caches
        self.entity_cache.invalidate_by_type(&type_name);
        self.query_cache.invalidate_by_type(&type_name);
        self.compiled_operations.remove(&type_name);
        
        // Notify cache invalidation to other systems
        self.event_bus.publish(CacheInvalidationEvent::new(type_name));
    }
}

6. Inadequate Error Handling

Anti-pattern: Generic error messages that don’t help debugging

// DON'T: Vague error messages
throw new Error('Property validation failed');

// Better: Detailed, actionable error messages
throw new PropertyValidationError({
  entityType: 'Vehicle',
  entityId: 'vehicle_123',
  property: 'year',
  value: 1850,
  constraint: 'must be between 1900 and 2025',
  suggestedFix: 'Check data source for year property'
});

7. Security Oversights

Anti-pattern: Treating dynamic properties like static ones for security

# DON'T: No access control on dynamic properties
def get_property(entity_id, property_name):
    entity = load_entity(entity_id)
    return entity.get_property(property_name)  # No security check!

# Better: Property-level security
def get_property(entity_id, property_name, user_context):
    entity = load_entity(entity_id)
    
    if not has_property_access(user_context, entity.type, property_name):
        raise SecurityError(f"Access denied to property {property_name}")
    
    if is_sensitive_property(entity.type, property_name):
        audit_log.record_access(user_context, entity_id, property_name)
    
    return entity.get_property(property_name)

8. Testing Gaps

Anti-pattern: Only testing the happy path with AOM systems

// DON'T: Only test valid configurations
test('creates vehicle entity', () => {
  const vehicle = factory.createEntity('Vehicle', {
    maker: 'Toyota',
    model: 'Camry'
  });
  expect(vehicle.maker).toBe('Toyota');
});

// Better: Test edge cases and error conditions
describe('Vehicle Entity', () => {
  test('rejects invalid property types', () => {
    expect(() => {
      factory.createEntity('Vehicle', {
        maker: 123, // Should be string
        model: 'Camry'
      });
    }).toThrow('Invalid property type');
  });
  
  test('handles missing required properties', () => {
    expect(() => {
      factory.createEntity('Vehicle', {
        model: 'Camry' // Missing required 'maker'
      });
    }).toThrow('Required property missing: maker');
  });
});

Prevention Strategies

  • Regular Architecture Reviews: Schedule periodic reviews of entity type proliferation and usage patterns.
  • Performance Monitoring: Implement continuous monitoring of query performance and cache hit rates.
  • Security Audits: Regular audits of property access patterns and operation bindings.
  • Automated Testing: Comprehensive test suites covering edge cases and error conditions.
  • Documentation Standards: Maintain clear documentation of business rules and constraints.

Practical Implementation

To demonstrate these concepts in practice, I’ve created a sample project with working implementations in all three languages discussed: AOM Sample Project.

The repository includes:

  • Rust implementation (cargo run) – Type-safe AOM with memory safety
  • TypeScript implementation (npx ts-node app.ts) – Gradual typing with modern JavaScript features
  • Ruby implementation (ruby app.rb) – Metaprogramming-powered flexibility

Conclusion

The Adaptive Object Model pattern continues to evolve with modern programming languages and database technologies. While the core concepts remain the same, implementation approaches have been refined to take advantage of:

  • Type safety in languages like Rust and TypeScript
  • Better performance through caching and optimized data structures
  • Improved developer experience with modern tooling and language features
  • Scalable persistence using document databases and modern storage patterns

The combination of dynamic languages with flexible type systems and schema-less databases provides a powerful foundation for building adaptable systems. From my consulting experience implementing AOM on large projects, I’ve seen mixed results that highlight important considerations. The pattern’s flexibility is both its greatest strength and potential weakness. Without proper architectural discipline, teams can easily create overly complex systems with inconsistent entity types and validation rules. The dynamic nature that makes AOM powerful also requires more sophisticated debugging skills and comprehensive testing strategies than traditional static systems. In my early implementations using relational databases, we suffered from performance issues due to the excessive joins required to reconstruct entities from the normalized AOM tables. This was before NoSQL and document-oriented databases became mainstream. Modern document databases have fundamentally changed the viability equation by storing AOM entities naturally without the join penalties that plagued earlier implementations.

The practical implementations available at https://github.com/bhatti/aom-sample demonstrate that AOM is not just theoretical but a viable architectural approach for real-world systems. By studying these examples and adapting them to your specific domain requirements, you can build systems that gracefully evolve with changing business needs.

August 30, 2025

Bridging HTTP and gRPC: A Standardized Approach to Header Mapping in Microservices

Filed under: Computing,Web Services — admin @ 10:49 pm

Modern microservices architectures often require supporting both HTTP REST APIs and gRPC services simultaneously. While Google’s gRPC-Gateway provides HTTP and gRPC transcoding capabilities, the challenge of bidirectional header mapping between these protocols remains a common source of inconsistency, bugs, and maintenance overhead across services. This article explores the technical challenges of HTTP-gRPC header mapping, examines current approaches and their limitations, and presents a standardized middleware solution that addresses these issues.

Understanding gRPC AIP and HTTP/gRPC Transcoding

Google’s Application Programming Interface Improvement (AIP) standards define how to build consistent, intuitive APIs. For example, AIP-127: HTTP and gRPC Transcoding enables a single service implementation to serve both HTTP REST and gRPC traffic through protocol transcoding.

How gRPC-Gateway Transcoding Works

The gRPC-Gateway acts as a reverse proxy that translates HTTP requests into gRPC calls:

HTTP Client ? gRPC-Gateway ? gRPC Server
     ?              ?            ?
REST Request   Proto Message   gRPC Service

Following is the transcoding process:

  1. URL Path to RPC Method: HTTP paths map to gRPC service methods
  2. HTTP Body to Proto Message: JSON payloads become protobuf messages
  3. Query Parameters to Fields: URL parameters populate message fields
  4. HTTP Headers to gRPC Metadata: Headers become gRPC metadata key-value pairs

The Header Mapping Challenge

While gRPC-Gateway handles most transcoding automatically, header mapping requires explicit configuration. Consider this common scenario:

HTTP Request:

POST /v1/users
Authorization: Bearer abc123
X-Request-ID: req-456
X-User-Role: admin
Content-Type: application/json

Desired gRPC Metadata:

metadata.MD{
    "authorization": []string{"Bearer abc123"},
    "request-id":    []string{"req-456"}, 
    "user-role":     []string{"admin"},
}

Response Headers Needed:

X-Request-ID: req-456
X-Processing-Time: 150ms
X-Server-Version: v1.2.0

Without proper configuration, headers are lost, inconsistently mapped, or require custom code in each service.

Current Problems and Anti-Patterns

Problem 1: Fragmented Header Mapping Solutions

Most services implement header mapping ad-hoc:

// Service A approach
func (s *ServiceA) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    md, _ := metadata.FromIncomingContext(ctx)
    authHeader := md.Get("authorization")
    userID := md.Get("x-user-id")
    // ... custom mapping logic
}

// Service B approach  
func (s *ServiceB) GetOrder(ctx context.Context, req *pb.GetOrderRequest) (*pb.Order, error) {
    // Different header names, different extraction logic
    md, _ := metadata.FromIncomingContext(ctx)
    auth := md.Get("auth")  // Different from Service A!
    requestID := md.Get("request_id")  // Different format!
}

This leads to:

  • Inconsistent header naming across services
  • Duplicated mapping logic in every service
  • Maintenance burden when headers change
  • Testing complexity due to custom implementations

Problem 2: Context Abuse and Memory Issues

I have often observed misuse of Go’s context for storing large amounts of data that puts the service at risk of being killed due to OOM:

// ANTI-PATTERN: Storing large objects in context
type UserContext struct {
    User        *User           // Large user object
    Permissions []Permission    // Array of permissions  
    Preferences *UserPrefs      // User preferences
    AuditLog    []AuditEntry   // Historical data
}

func StoreUserInContext(ctx context.Context, user *UserContext) context.Context {
    return context.WithValue(ctx, "user", user)  // BAD: Large object in context
}

Why This Causes Problems:

  1. Memory Leaks: Contexts are passed through the entire request chain and may not be garbage collected promptly
  2. Performance Degradation: Large context objects increase allocation pressure
  3. Goroutine Overhead: Each concurrent request carries this memory burden
  4. Service Instability: Under load, memory usage can spike and cause OOM kills

Proper Pattern:

// GOOD: Store only identifiers in context  
func StoreUserIDInContext(ctx context.Context, userID string) context.Context {
    return context.WithValue(ctx, "user_id", userID)  // Small string only
}

// Fetch data when needed from database/cache
func GetUserFromContext(ctx context.Context) (*User, error) {
    userID := ctx.Value("user_id").(string)
    return userService.GetUser(userID)  // Fetch from datastore
}

Problem 3: Inconsistent Response Header Handling

Setting response headers requires different approaches across the stack:

// gRPC: Set headers via metadata
grpc.SendHeader(ctx, metadata.New(map[string]string{
    "x-server-version": "v1.2.0",
}))

// HTTP: Set headers on ResponseWriter  
w.Header().Set("X-Server-Version", "v1.2.0")

// gRPC-Gateway: Headers must be set in specific metadata format
grpc.SetHeader(ctx, metadata.New(map[string]string{
    "grpc-metadata-x-server-version": "v1.2.0",  // Prefix required
}))

This complexity leads to missing response headers and inconsistent client experiences.

Solution: Standardized Header Mapping Middleware

The solution is a dedicated middleware that handles bidirectional header mapping declaratively, allowing services to focus on business logic while ensuring consistent header handling across the entire API surface.

Core Architecture

HTTP Request ? Gateway Middleware ? gRPC Interceptor ? Service
     ?              ?                    ?              ?
HTTP Headers ? Metadata Annotation ? Context Metadata ? Business Logic
                                                         ?
HTTP Response ? Response Modifier ? Header Metadata ? Service Response

The middleware operates at two key points:

  1. Gateway Level: Maps HTTP headers to gRPC metadata for incoming requests
  2. Interceptor Level: Processes metadata and manages response header mapping

Configuration-Driven Approach

Instead of custom code, header mapping is configured declaratively:

mapper := headermapper.NewBuilder().
    // Authentication headers
    AddIncomingMapping("Authorization", "authorization").WithRequired(true).
    AddIncomingMapping("X-API-Key", "api-key").
    
    // Request tracking (bidirectional)  
    AddBidirectionalMapping("X-Request-ID", "request-id").
    AddBidirectionalMapping("X-Trace-ID", "trace-id").
    
    // Response headers
    AddOutgoingMapping("processing-time", "X-Processing-Time").
    AddOutgoingMapping("server-version", "X-Server-Version").
    
    // Transformations
    AddIncomingMapping("Authorization", "auth-token").
    WithTransform(headermapper.ChainTransforms(
        headermapper.TrimSpace,
        headermapper.RemovePrefix("Bearer "),
    )).
    
    Build()

This configuration drives all header mapping behavior without requiring service-specific code.

How The Middleware Works: Step-by-Step

Step 1: HTTP Request Processing

When an HTTP request arrives at the gRPC-Gateway:

POST /v1/users HTTP/1.1
Authorization: Bearer abc123
X-Request-ID: req-456
X-User-Role: admin
Content-Type: application/json

The MetadataAnnotator processes configured incoming mappings:

func (hm *HeaderMapper) MetadataAnnotator() func(context.Context, *http.Request) metadata.MD {
    return func(ctx context.Context, req *http.Request) metadata.MD {
        md := metadata.New(map[string]string{})
        
        for _, mapping := range hm.config.Mappings {
            if mapping.Direction == Outgoing {
                continue  // Skip outgoing-only mappings
            }
            
            headerValue := req.Header.Get(mapping.HTTPHeader)
            if headerValue != "" {
                // Apply transformations if configured
                if mapping.Transform != nil {
                    headerValue = mapping.Transform(headerValue)
                }
                md.Set(mapping.GRPCMetadata, headerValue)
            }
        }
        return md
    }
}

Result: HTTP headers become gRPC metadata:

metadata.MD{
    "authorization": []string{"Bearer abc123"},
    "auth-token":    []string{"abc123"},        // Transformed  
    "request-id":    []string{"req-456"},
    "user-role":     []string{"admin"},
}

Step 2: gRPC Interceptor Processing

The gRPC unary interceptor receives the enhanced context:

func (hm *HeaderMapper) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // Context already contains mapped metadata from Step 1
        
        // Call the actual service method
        resp, err := handler(ctx, req)
        
        // Response headers are handled by ResponseModifier
        return resp, err
    }
}

Step 3: Service Implementation

The service method accesses headers through standard gRPC metadata APIs:

func (s *UserService) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    md, _ := metadata.FromIncomingContext(ctx)
    
    // Headers are consistently available
    authToken := getFirstValue(md, "auth-token")      // "abc123" (transformed)
    requestID := getFirstValue(md, "request-id")      // "req-456"  
    userRole := getFirstValue(md, "user-role")        // "admin"
    
    // Set response headers
    grpc.SetHeader(ctx, metadata.New(map[string]string{
        "processing-time": "150",
        "server-version": "v1.2.0",  
        "request-id": requestID,     // Echo back request ID
    }))
    
    return &pb.User{...}, nil
}

Step 4: Response Header Processing

The ResponseModifier maps gRPC metadata to HTTP response headers:

func (hm *HeaderMapper) ResponseModifier() func(context.Context, http.ResponseWriter, proto.Message) error {
    return func(ctx context.Context, w http.ResponseWriter, msg proto.Message) error {
        md, ok := runtime.ServerMetadataFromContext(ctx)
        if !ok {
            return nil
        }
        
        for _, mapping := range hm.config.Mappings {
            if mapping.Direction == Incoming {
                continue  // Skip incoming-only mappings  
            }
            
            values := md.HeaderMD.Get(mapping.GRPCMetadata)
            if len(values) > 0 {
                headerValue := values[0]
                
                // Apply transformations
                if mapping.Transform != nil {
                    headerValue = mapping.Transform(headerValue)  
                }
                
                w.Header().Set(mapping.HTTPHeader, headerValue)
            }
        }
        return nil
    }
}

Final HTTP Response:

HTTP/1.1 200 OK
X-Request-ID: req-456
X-Processing-Time: 150ms  
X-Server-Version: v1.2.0
Content-Type: application/json

{"user": {...}}

Advanced Features

Header Transformations

The middleware supports header value transformations:

// Extract JWT tokens
AddIncomingMapping("Authorization", "jwt-token").
WithTransform(headermapper.ChainTransforms(
    headermapper.TrimSpace,
    headermapper.RemovePrefix("Bearer "),
    headermapper.Truncate(100),  // Prevent large tokens
))

// Sanitize user agents
AddIncomingMapping("User-Agent", "client-info").  
WithTransform(headermapper.RegexReplace(`\d+\.\d+(\.\d+)*`, "x.x.x"))

// Format timestamps
AddOutgoingMapping("response-time", "X-Response-Time").
WithTransform(headermapper.AddSuffix("ms"))

Configuration from Files

For complex deployments, configuration can be externalized:

# header-mapping.yaml
mappings:
  - http_header: "Authorization"
    grpc_metadata: "authorization" 
    direction: 0  # Incoming
    required: true
    
  - http_header: "X-Request-ID"
    grpc_metadata: "request-id"
    direction: 2  # Bidirectional
    default_value: "auto-generated"

skip_paths:
  - "/health"
  - "/metrics"
  
debug: false
config, err := headermapper.LoadConfigFromFile("header-mapping.yaml")
if err != nil {
    log.Fatal("Failed to load config:", err)
}

mapper := headermapper.NewHeaderMapper(config)

Path-Based Filtering

Skip header processing for specific endpoints:

mapper := headermapper.NewBuilder().
    AddIncomingMapping("Authorization", "authorization").
    SkipPaths("/health", "/metrics", "/debug").  // No auth required
    Build()

Integration Guide

Basic Integration

package main

import (
    "github.com/your-org/grpc-header-mapper/headermapper"
    "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
)

func main() {
    // Create header mapper
    mapper := headermapper.NewBuilder().
        AddIncomingMapping("Authorization", "authorization").
        AddBidirectionalMapping("X-Request-ID", "request-id").
        Build()
    
    // Configure gRPC server
    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(mapper.UnaryServerInterceptor()),
    )
    
    // Configure HTTP gateway
    mux := headermapper.CreateGatewayMux(mapper)
    
    // Register services...
}

Production Deployment

func createProductionMapper() *headermapper.HeaderMapper {
    return headermapper.NewBuilder().
        // Authentication
        AddIncomingMapping("Authorization", "authorization").WithRequired(true).
        AddIncomingMapping("X-API-Key", "api-key").
        
        // Request correlation
        AddBidirectionalMapping("X-Request-ID", "request-id").
        AddBidirectionalMapping("X-Correlation-ID", "correlation-id"). 
        AddBidirectionalMapping("X-Trace-ID", "trace-id").
        
        // Client information
        AddIncomingMapping("User-Agent", "user-agent").
        AddIncomingMapping("X-Client-Version", "client-version").
        
        // Response headers
        AddOutgoingMapping("processing-time-ms", "X-Processing-Time").
        AddOutgoingMapping("server-version", "X-Server-Version").
        AddOutgoingMapping("rate-limit-remaining", "X-RateLimit-Remaining").
        
        // Security headers
        AddOutgoingMapping("content-security-policy", "Content-Security-Policy").
        WithDefault("default-src 'self'").
        
        // Skip system endpoints
        SkipPaths("/health", "/metrics", "/debug", "/admin").
        
        // Production settings
        Debug(false).
        OverwriteExisting(true).
        Build()
}

Performance and Reliability Benefits

Consistent Memory Usage

By standardizing header extraction and avoiding context abuse, services maintain predictable memory profiles:

// Before: Inconsistent, potentially large context values
ctx = context.WithValue(ctx, "user", largeUserObject)      // BAD
ctx = context.WithValue(ctx, "permissions", permissionList) // BAD

// After: Consistent, minimal context usage  
// Headers extracted to standard metadata, large objects fetched on-demand
func GetUserFromContext(ctx context.Context) (*User, error) {
    userID := getMetadata(ctx, "user-id")
    return userCache.Get(userID)  // Cached lookup
}

Reduced Code Duplication

Header mapping logic is centralized, eliminating per-service implementations:

Improved Observability

Consistent header handling enables better monitoring:

// All services automatically have request correlation
func (s *AnyService) AnyMethod(ctx context.Context, req *AnyRequest) (*AnyResponse, error) {
    requestID := getMetadata(ctx, "request-id")  // Always available
    log.WithField("request_id", requestID).Info("Processing request")
    
    // Business logic...
    
    return response, nil
}

Testing Benefits

Standardized header mapping simplifies integration testing:

func TestServiceWithHeaders(t *testing.T) {
    // Headers work consistently across all services
    client := pb.NewUserServiceClient(conn)
    
    ctx := metadata.NewOutgoingContext(context.Background(), metadata.New(map[string]string{
        "authorization": "Bearer test-token",
        "request-id":    "test-req-123",
    }))
    
    resp, err := client.CreateUser(ctx, &pb.CreateUserRequest{...})
    
    // Response headers are consistently available
    md, _ := metadata.FromIncomingContext(ctx)
    requestID := getMetadata(md, "request-id")  // "test-req-123"
}

Security Considerations

Header Validation

The middleware supports header validation and sanitization:

mapper := headermapper.NewBuilder().
    AddIncomingMapping("Authorization", "authorization").
    WithTransform(headermapper.ChainTransforms(
        headermapper.TrimSpace,
        headermapper.Truncate(512),  // Prevent oversized headers
        validateJWTFormat,           // Custom validation
    )).
    Build()

func validateJWTFormat(token string) string {
    if !strings.HasPrefix(token, "Bearer ") {
        return "invalid"  // Reject malformed tokens
    }
    return token
}

Sensitive Data Handling

Headers containing sensitive data can be masked in logs:

AddIncomingMapping("Authorization", "authorization").
WithTransform(headermapper.MaskSensitive(4)).  // Show first/last 4 chars

Rate Limiting Integration

Response headers can include rate limiting information:

AddOutgoingMapping("rate-limit-remaining", "X-RateLimit-Remaining").
AddOutgoingMapping("rate-limit-reset", "X-RateLimit-Reset").

Monitoring and Debugging

Debug Mode

Enable debug logging to verify header mapping:

mapper := headermapper.NewBuilder().
    Debug(true).  // Enable detailed logging
    Build()

mapper.SetLogger(customLogger)  // Use your logging framework

Debug Output:

[DEBUG] [HeaderMapper] Mapped incoming headers: map[authorization:[Bearer abc123] request-id:[req-456]]
[DEBUG] [HeaderMapper] Mapped outgoing headers to response  

Metrics Integration

The middleware can integrate with monitoring systems:

stats := mapper.GetStats()
prometheus.IncomingHeadersMappedCounter.Add(stats.IncomingMappings)
prometheus.OutgoingHeadersMappedCounter.Add(stats.OutgoingMappings)
prometheus.MappingErrorsCounter.Add(stats.FailedMappings)

Why This Matters

Microservices Consistency

In large microservices architectures, inconsistent header handling creates operational overhead:

  • Debugging becomes difficult when services use different header names
  • Client libraries must handle different header formats per service
  • Security policies cannot be uniformly enforced
  • Observability suffers from inconsistent request correlation

Standardized header mapping addresses these issues by ensuring consistency across the entire service mesh.

Developer Productivity

Developers spend significant time on infrastructure concerns rather than business logic. This middleware eliminates:

  • Boilerplate code for header extraction and response setting
  • Testing complexity around header handling edge cases
  • Documentation overhead for service-specific header requirements
  • Bug investigation related to missing or malformed headers

Operational Excellence

Standard header mapping enables:

  • Automated monitoring with consistent request correlation
  • Security scanning with predictable header formats
  • Performance analysis across service boundaries
  • Compliance auditing with standardized access logging

Conclusion

HTTP and gRPC transcoding is a powerful pattern for modern APIs, but header mapping complexity has been a persistent challenge. The gRPC Header Mapper middleware presented in this article provides a solution that enables true bidirectional header mapping between HTTP and gRPC protocols.

By providing a standardized, configuration-driven middleware solution available at github.com/bhatti/grpc-header-mapper, teams can:

  1. Eliminate inconsistencies across services with bidirectional header mapping
  2. Reduce maintenance burden through centralized configuration
  3. Improve reliability by avoiding context misuse and memory leaks
  4. Enhance developer productivity by removing boilerplate code
  5. Support complex transformations with built-in and custom transformation functions

The middleware’s bidirectional mapping capability means that headers flow seamlessly in both directions – HTTP requests to gRPC metadata for service processing, and gRPC metadata back to HTTP response headers for client consumption. This eliminates the common problem where request headers are available to services but response headers are lost or inconsistently handled.

The complete implementation, examples, and documentation are available at github.com/bhatti/grpc-header-mapper.

August 25, 2025

Beyond Vibe Coding: Using TLA+ and Executable Specifications with Claude

Filed under: Computing,Uncategorized — admin @ 9:45 pm

TL;DR: The Problem and Solution

Problem: AI-assisted coding fails when modifying existing systems because we give AI vague specifications.

Solution: Use TLA+ formal specifications as precise contracts that Claude can implement reliably.

Result: Transform Claude from a code generator into a reliable engineering partner that reasons about complex systems.

After months of using Claude for development, I discovered most AI-assisted coding fails not because the AI isn’t smart enough, but because we’re asking it to work from vague specifications. This post shows you how to move beyond “vibe coding” using executable specifications that turn Claude into a reliable engineering partner.

Here’s what changes when you use TLA+ with Claude:

Before (Vibe Coding):

  • “Create a task management API”
  • Claude guesses at requirements
  • Inconsistent behavior across edge cases
  • Bugs in corner cases

After (TLA+ Specifications):

  • Precise mathematical specification
  • Claude implements exactly what you specified
  • All edge cases defined upfront
  • Properties verified before deployment

The Vibe Coding Problem

AI assistants like Claude are primarily trained on greenfield development patterns. They excel at:

  • Writing new functions from scratch
  • Implementing well-known algorithms
  • Creating boilerplate code

But they struggle with:

  • Understanding implicit behavioral contracts in existing code
  • Maintaining invariants across system modifications
  • Reasoning about state transitions and edge cases
  • Preserving non-functional requirements (performance, security, etc.)

The solution isn’t better prompts – it’s better specifications.

Enter Executable Specifications

An executable specification is a formal description of system behavior that can be:

  1. Verified – Checked for logical consistency
  2. Validated – Tested against real-world scenarios
  3. Executed – Run to generate test cases or even implementations

I’ve tried many approaches to precise specifications over the years:

UML and Model Driven Development (2000s-2010s): I used tools like Rational Rose and Visual Paradigm in early 2000s that promised complete code generation from UML models. The reality was different:

  • Visual complexity: UML diagrams became unwieldy for anything non-trivial
  • Tool lock-in: Proprietary formats and expensive tooling
  • Impedance mismatch: The gap between UML models and real code was huge
  • Maintenance nightmare: Keeping models and code synchronized was nearly impossible
  • Limited expressiveness: UML couldn’t capture complex behavioral contracts

BDD and Gherkin (mid-2000s): I used BDD and Gherkin in mid 2000s, which were better than UML for behavioral specifications, but still limited:

  • Structured natural language: Readable but not truly executable
  • No logical reasoning: Couldn’t catch design contradictions
  • Testing focused: Good for acceptance criteria, poor for system design

TLA+ (present): Takes executable specifications to their logical conclusion:

  • Mathematical precision: Eliminates ambiguity completely
  • Model checking: Explores all possible execution paths
  • Tool independence: Plain text specifications, open source tools
  • Behavioral focus: Designed specifically for concurrent and distributed systems

Why TLA+ with Claude?

The magic happens when you combine TLA+’s precision with Claude’s implementation capabilities:

  1. TLA+ eliminates ambiguity – There’s only one way to interpret a formal specification
  2. Claude can read TLA+ – It understands the formal syntax and can translate it to code
  3. Verification catches design flaws – TLA+ model checking finds edge cases you’d miss
  4. Generated traces become tests – TLA+ execution paths become your test suite

Setting Up Your Claude and TLA+ Environment

Installing Claude Desktop

First, let’s get Claude running on your machine:

# Install via Homebrew (macOS)
brew install --cask claude

# Or download directly from Anthropic
# https://claude.ai/download
  • Set up project-specific contexts in ~/.claude/
  • Create TLA+ syntax rules for better code generation
  • Configure memory settings for specification patterns

Configuring Your Workspace

Once installed, I recommend creating a dedicated workspace structure. Here’s what works for me:

# Create a Claude workspace directory
mkdir -p ~/claude-workspace/{projects,templates,context}

# Add a context file for your coding standards
cat > ~/claude-workspace/context/coding-standards.md << 'EOF'
# My Coding Standards

- Use descriptive variable names
- Functions should do one thing well
- Write tests for all new features
- Handle errors explicitly
- Document complex logic
EOF

Installing TLA+ Tools

Choose based on your workflow

  • GUI users: TLA+ Toolbox for visual model checking
  • CLI users: tla2tools.jar for CI integration
  • Both: VS Code extension for syntax highlighting
# Download TLA+ Tools from https://github.com/tlaplus/tlaplus/releases
# Or use Homebrew on macOS
brew install --cask tla-plus-toolbox

# For command-line usage (recommended for CI)
wget https://github.com/tlaplus/tlaplus/releases/download/v1.8.0/tla2tools.jar

VS Code Extension

Install the TLA+ extension for syntax highlighting and basic validation:

code --install-extension alygin.vscode-tlaplus

Your First TLA+ Specification

Let’s start with a simple example to understand the syntax:

--------------------------- MODULE SimpleCounter ---------------------------
VARIABLE counter

Init == counter = 0

Increment == counter' = counter + 1

Decrement == counter' = counter - 1

Next == Increment \/ Decrement

Spec == Init /\ [][Next]_counter

TypeInvariant == counter \in Int

=============================================================================

This specification defines:

  • State: A counter variable
  • Initial condition: Counter starts at 0
  • Actions: Increment or decrement operations
  • Next state relation: Either action can occur
  • Invariant: Counter is always an integer

Real-World Example: Task Management API

Now let’s build something real. We’ll create a task management API using TLA+ specifications that Claude can implement in Go.

Step 1: Define the System State

First, we model what our system looks like (TaskManagement.tla):

--------------------------- MODULE TaskManagement ---------------------------
EXTENDS Integers, Sequences, FiniteSets, TLC

CONSTANTS
    Users,          \* Set of users
    MaxTasks,       \* Maximum number of tasks
    MaxTime,        \* Maximum time value for simulation
    Titles,         \* Set of possible task titles
    Descriptions    \* Set of possible task descriptions

VARIABLES
    tasks,          \* Function from task ID to task record
    userTasks,      \* Function from user ID to set of task IDs
    nextTaskId,     \* Counter for generating unique task IDs
    currentUser,    \* Currently authenticated user
    clock,          \* Global clock for timestamps
    sessions        \* Active user sessions

\* Task states enumeration with valid transitions
TaskStates == {"pending", "in_progress", "completed", "cancelled", "blocked"}

\* Priority levels
Priorities == {"low", "medium", "high", "critical"}

\* Valid state transitions
ValidTransitions == {
    <<"pending", "in_progress">>,
    <<"pending", "cancelled">>,
    <<"pending", "blocked">>,
    <<"in_progress", "completed">>,
    <<"in_progress", "cancelled">>,
    <<"in_progress", "blocked">>,
    <<"in_progress", "pending">>,      \* Allow reverting to pending
    <<"blocked", "pending">>,
    <<"blocked", "in_progress">>,
    <<"blocked", "cancelled">>
}


TaskRecord == [
    id: Nat,
    title: STRING,
    description: STRING,
    status: TaskStates,
    priority: {"low", "medium", "high"},
    assignee: Users,
    createdAt: Nat,
    dueDate: Nat \cup {NULL}
]

\* Type invariants
TypeInvariant == 
    /\ tasks \in [Nat -> TaskRecord]
    /\ userTasks \in [Users -> SUBSET Nat]
    /\ nextTaskId \in Nat
    /\ currentUser \in Users \cup {NULL}

Step 2: Define System Actions

Now we specify what operations are possible (TaskManagement.tla):

\* System initialization
Init ==
    /\ tasks = [i \in {} |-> CHOOSE x : FALSE]  \* Empty function
    /\ userTasks = [u \in Users |-> {}]
    /\ nextTaskId = 1
    /\ currentUser = "NULL"
    /\ clock = 0
    /\ sessions = [u \in Users |-> FALSE]

\* User authentication
Authenticate(user) ==
    /\ user \in Users
    /\ ~sessions[user]  \* User not already logged in
    /\ currentUser' = user
    /\ sessions' = [sessions EXCEPT ![user] = TRUE]
    /\ UNCHANGED <<tasks, userTasks, nextTaskId, clock>>

\* Create a new task
CreateTask(title, description, priority, dueDate) ==
    /\ currentUser # NULL
    /\ nextTaskId <= MaxTasks
    /\ LET newTask == [
           id |-> nextTaskId,
           title |-> title,
           description |-> description,
           status |-> "pending",
           priority |-> priority,
           assignee |-> currentUser,
           createdAt |-> nextTaskId, \* Simplified timestamp
           dueDate |-> dueDate
       ] IN
       /\ tasks' = tasks @@ (nextTaskId :> newTask)
       /\ userTasks' = [userTasks EXCEPT ![currentUser] = @ \cup {nextTaskId}]
       /\ nextTaskId' = nextTaskId + 1
       /\ UNCHANGED currentUser

\* Update task status
UpdateTaskStatus(taskId, newStatus) ==
    /\ currentUser # NULL
    /\ taskId \in DOMAIN tasks
    /\ taskId \in userTasks[currentUser]
    /\ newStatus \in TaskStates
    /\ tasks' = [tasks EXCEPT ![taskId].status = newStatus]
    /\ UNCHANGED <<userTasks, nextTaskId, currentUser>>

\* Delete a task
DeleteTask(taskId) ==
    /\ currentUser # NULL
    /\ taskId \in DOMAIN tasks
    /\ taskId \in userTasks[currentUser]
    /\ tasks' = [id \in (DOMAIN tasks \ {taskId}) |-> tasks[id]]
    /\ userTasks' = [userTasks EXCEPT ![currentUser] = @ \ {taskId}]
    /\ UNCHANGED <<nextTaskId, currentUser>>

Step 3: Safety and Liveness Properties

TLA+ shines when defining system properties (TaskManagement.tla):

\* Safety properties
NoOrphanTasks ==
    \A taskId \in DOMAIN tasks :
        \E user \in Users : taskId \in GetUserTasks(user)

TaskOwnership ==
    \A taskId \in DOMAIN tasks :
        tasks[taskId].assignee \in Users /\
        taskId \in GetUserTasks(tasks[taskId].assignee)

ValidTaskIds ==
    \A taskId \in DOMAIN tasks : 
        /\ taskId < nextTaskId
        /\ taskId >= 1

NoDuplicateTaskIds ==
    \A t1, t2 \in DOMAIN tasks :
        t1 = t2 \/ tasks[t1].id # tasks[t2].id

ValidStateTransitionsInvariant ==
    \A taskId \in DOMAIN tasks :
        tasks[taskId].status \in TaskStates

ConsistentTimestamps ==
    \A taskId \in DOMAIN tasks :
        /\ tasks[taskId].createdAt <= tasks[taskId].updatedAt
        /\ tasks[taskId].updatedAt <= clock

NoCyclicDependencies ==
    LET
        \* Transitive closure of dependencies
        RECURSIVE TransitiveDeps(_)
        TransitiveDeps(taskId) ==
            IF ~TaskExists(taskId) THEN {}
            ELSE LET directDeps == tasks[taskId].dependencies IN
                 directDeps \cup 
                 UNION {TransitiveDeps(dep) : dep \in directDeps}
    IN
    \A taskId \in DOMAIN tasks :
        taskId \notin TransitiveDeps(taskId)

AuthenticationRequired ==
    \* All task operations require authentication
    \A taskId \in DOMAIN tasks :
        tasks[taskId].createdBy \in Users

SafetyInvariant ==
    /\ NoOrphanTasks
    /\ TaskOwnership
    /\ ValidTaskIds
    /\ NoDuplicateTaskIds
    /\ ValidStateTransitionsInvariant
    /\ ConsistentTimestamps
    /\ NoCyclicDependencies
    /\ AuthenticationRequired

\* Next state relation
Next ==
    \/ AdvanceTime
    \/ \E user \in Users : Authenticate(user)
    \/ Logout
    \/ \E t \in Titles, d \in Descriptions, p \in Priorities, 
         u \in Users, dd \in 0..MaxTime \cup {"NULL"},
         tags \in SUBSET {"bug", "feature", "enhancement", "documentation"},
         deps \in SUBSET DOMAIN tasks :
       CreateTask(t, d, p, u, dd, tags, deps)
    \/ \E taskId \in DOMAIN tasks, newStatus \in TaskStates :
       UpdateTaskStatus(taskId, newStatus)
    \/ \E taskId \in DOMAIN tasks, newPriority \in Priorities :
       UpdateTaskPriority(taskId, newPriority)
    \/ \E taskId \in DOMAIN tasks, newAssignee \in Users :
       ReassignTask(taskId, newAssignee)
    \/ \E taskId \in DOMAIN tasks, t \in Titles, 
         d \in Descriptions, dd \in 0..MaxTime \cup {"NULL"} :
       UpdateTaskDetails(taskId, t, d, dd)
    \/ \E taskId \in DOMAIN tasks : DeleteTask(taskId)
    \/ CheckDependencies
    \/ \E taskIds \in SUBSET DOMAIN tasks, newStatus \in TaskStates :
       taskIds # {} /\ BulkUpdateStatus(taskIds, newStatus)

\* Properties to check
THEOREM TypeCorrectness == Spec => []TypeInvariant
THEOREM SafetyHolds == Spec => []SafetyInvariant
THEOREM LivenessHolds == Spec => (EventualCompletion /\ FairProgress)
THEOREM NoDeadlock == Spec => []<>Next
THEOREM Termination == Spec => <>(\A taskId \in DOMAIN tasks : 
                                    tasks[taskId].status \in {"completed", "cancelled"})
=============================================================================

Step 4: Model Checking and Trace Generation

Now we can run TLA+ model checking to verify our specification (TaskManagement.cfg):

\* Model configuration for TaskManagementImproved module
SPECIFICATION Spec

\* Constants definition
CONSTANTS
    Users = {alice, bob, charlie}
    MaxTasks = 5
    MaxTime = 20
    Titles = {task1, task2, task3, task4, task5}
    Descriptions = {desc1, desc2, desc3}

\* Model values for special constants
CONSTANT
    NULL = NULL
    EMPTY_STRING = EMPTY_STRING

\* Initial state constraint
CONSTRAINT
    /\ nextTaskId <= MaxTasks + 1
    /\ clock <= MaxTime
    /\ Cardinality(DOMAIN tasks) <= MaxTasks

\* State space reduction (optional, for faster checking)
ACTION_CONSTRAINT
    \* Limit number of active sessions
    /\ Cardinality({u \in Users : sessions[u] = TRUE}) <= 2
    \* Prevent creating too many tasks at once
    /\ nextTaskId <= MaxTasks

\* Invariants to check
INVARIANT TypeInvariant
INVARIANT SafetyInvariant
INVARIANT NoOrphanTasks
INVARIANT TaskOwnership
INVARIANT ValidTaskIds
INVARIANT NoDuplicateTaskIds
INVARIANT ValidStateTransitionsInvariant
INVARIANT ConsistentTimestamps
INVARIANT NoCyclicDependencies
INVARIANT AuthenticationRequired

\* Properties to check
PROPERTY EventualCompletion
PROPERTY FairProgress
PROPERTY EventualUnblocking
PROPERTY EventualAuthentication
PROPERTY NoStarvation

\* Check for deadlocks
CHECK_DEADLOCK TRUE

\* View for debugging (optional)
VIEW <<nextTaskId, Cardinality(DOMAIN tasks), clock>>

\* Alias for better state visualization
ALIAS TaskSummary == [
    totalTasks |-> Cardinality(DOMAIN tasks),
    pendingTasks |-> Cardinality({t \in DOMAIN tasks : tasks[t].status = "pending"}),
    inProgressTasks |-> Cardinality({t \in DOMAIN tasks : tasks[t].status = "in_progress"}),
    completedTasks |-> Cardinality({t \in DOMAIN tasks : tasks[t].status = "completed"}),
    blockedTasks |-> Cardinality({t \in DOMAIN tasks : tasks[t].status = "blocked"}),
    activeSessions |-> {u \in Users : sessions[u] = TRUE},
    currentUser |-> currentUser,
    time |-> clock
]
# Run the model checker
java -jar tla2tools.jar -config TaskManagement.cfg TaskManagement.tla

TLA+ will explore all possible execution paths and either:

  1. Verify that all properties hold
  2. Generate counterexamples showing property violations

Step 5: Claude Implementation with TLA+ Context

Now comes the magic. Instead of vague natural language, we give Claude the precise TLA+ specification:

My prompt to Claude:

Using this TLA+ specification for a task management system, implement a Go REST API 
that satisfies all the specified behaviors and properties.

[Include the full TLA+ specification here]

Requirements:
1. Implement all actions defined in the TLA+ spec
2. Ensure all safety properties hold at runtime
3. Use Go's type system to enforce the data structures
4. Include comprehensive tests that cover the execution traces
5. Add proper error handling for violation of preconditions

Focus on behavioral correctness over performance optimization.

Claude generates something like (task.go):

// Package domain contains the core business entities matching the TLA+ specification
package domain

import (
	"fmt"
	"time"
)

// TaskID represents a unique task identifier (maps to TLA+ task ID)
type TaskID int

// UserID represents a user identifier (maps to TLA+ Users set)
type UserID string

// TaskStatus represents the state of a task (maps to TLA+ TaskStates)
type TaskStatus string

const (
	StatusPending    TaskStatus = "pending"
	StatusInProgress TaskStatus = "in_progress"
	StatusCompleted  TaskStatus = "completed"
	StatusCancelled  TaskStatus = "cancelled"
	StatusBlocked    TaskStatus = "blocked"
)

// Priority represents task priority levels (maps to TLA+ Priorities)
type Priority string

const (
	PriorityLow      Priority = "low"
	PriorityMedium   Priority = "medium"
	PriorityHigh     Priority = "high"
	PriorityCritical Priority = "critical"
)

// Tag represents task categories (maps to TLA+ tags subset)
type Tag string

const (
	TagBug           Tag = "bug"
	TagFeature       Tag = "feature"
	TagEnhancement   Tag = "enhancement"
	TagDocumentation Tag = "documentation"
)

// Task represents a task entity (maps to TLA+ task record)
type Task struct {
	ID           TaskID            `json:"id"`
	Title        string            `json:"title"`
	Description  string            `json:"description"`
	Status       TaskStatus        `json:"status"`
	Priority     Priority          `json:"priority"`
	Assignee     UserID            `json:"assignee"`
	CreatedBy    UserID            `json:"created_by"`
	CreatedAt    time.Time         `json:"created_at"`
	UpdatedAt    time.Time         `json:"updated_at"`
	DueDate      *time.Time        `json:"due_date,omitempty"`
	Tags         []Tag             `json:"tags"`
	Dependencies map[TaskID]bool   `json:"dependencies"`
}

// ValidTransition represents a valid state transition (maps to TLA+ ValidTransitions)
type ValidTransition struct {
	From TaskStatus
	To   TaskStatus
}

// ValidTransitions defines all allowed state transitions
var ValidTransitions = map[ValidTransition]bool{
	{StatusPending, StatusInProgress}:    true,
	{StatusPending, StatusCancelled}:     true,
	{StatusPending, StatusBlocked}:       true,
	{StatusInProgress, StatusCompleted}:  true,
	{StatusInProgress, StatusCancelled}:  true,
	{StatusInProgress, StatusBlocked}:    true,
	{StatusInProgress, StatusPending}:    true, // Allow reverting
	{StatusBlocked, StatusPending}:       true,
	{StatusBlocked, StatusInProgress}:    true,
	{StatusBlocked, StatusCancelled}:     true,
}

// IsValidTransition checks if a state transition is valid (maps to TLA+ IsValidTransition)
func IsValidTransition(from, to TaskStatus) bool {
	return ValidTransitions[ValidTransition{From: from, To: to}]
}

// CanDelete checks if a task can be deleted (only completed or cancelled)
func (t *Task) CanDelete() bool {
	return t.Status == StatusCompleted || t.Status == StatusCancelled
}

// IsBlocked checks if task should be blocked based on dependencies
func (t *Task) IsBlocked(allTasks map[TaskID]*Task) bool {
	if len(t.Dependencies) == 0 {
		return false
	}
	
	for depID := range t.Dependencies {
		if dep, exists := allTasks[depID]; exists {
			if dep.Status != StatusCompleted {
				return true
			}
		}
	}
	return false
}

// ShouldUnblock checks if a blocked task can be unblocked
func (t *Task) ShouldUnblock(allTasks map[TaskID]*Task) bool {
	if t.Status != StatusBlocked {
		return false
	}
	
	for depID := range t.Dependencies {
		if dep, exists := allTasks[depID]; exists {
			if dep.Status != StatusCompleted {
				return false
			}
		}
	}
	return true
}

// Validate performs domain validation on the task
func (t *Task) Validate() error {
	if t.Title == "" {
		return fmt.Errorf("task title cannot be empty")
	}
	if t.Description == "" {
		return fmt.Errorf("task description cannot be empty")
	}
	if !isValidStatus(t.Status) {
		return fmt.Errorf("invalid task status: %s", t.Status)
	}
	if !isValidPriority(t.Priority) {
		return fmt.Errorf("invalid task priority: %s", t.Priority)
	}
	if t.Assignee == "" {
		return fmt.Errorf("task must have an assignee")
	}
	if t.CreatedBy == "" {
		return fmt.Errorf("task must have a creator")
	}
	if t.CreatedAt.After(t.UpdatedAt) {
		return fmt.Errorf("created time cannot be after updated time")
	}
	for _, tag := range t.Tags {
		if !isValidTag(tag) {
			return fmt.Errorf("invalid tag: %s", tag)
		}
	}
	return nil
}

func isValidStatus(status TaskStatus) bool {
	switch status {
	case StatusPending, StatusInProgress, StatusCompleted, StatusCancelled, StatusBlocked:
		return true
	default:
		return false
	}
}

func isValidPriority(priority Priority) bool {
	switch priority {
	case PriorityLow, PriorityMedium, PriorityHigh, PriorityCritical:
		return true
	default:
		return false
	}
}

func isValidTag(tag Tag) bool {
	switch tag {
	case TagBug, TagFeature, TagEnhancement, TagDocumentation:
		return true
	default:
		return false
	}
}
// Package usecase implements the TLA+ actions as use cases
package usecase

import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"time"
	
	"github.com/bhatti/sample-task-management/internal/domain"
	"github.com/bhatti/sample-task-management/internal/repository"
)

// TaskUseCase implements task-related TLA+ actions
type TaskUseCase struct {
	uow              repository.UnitOfWork
	invariantChecker InvariantChecker
}

// InvariantChecker interface for runtime invariant validation
type InvariantChecker interface {
	CheckAllInvariants(state *domain.SystemState) error
	CheckTaskInvariants(task *domain.Task, state *domain.SystemState) error
	CheckTransitionInvariant(from, to domain.TaskStatus) error
}

// NewTaskUseCase creates a new task use case
func NewTaskUseCase(uow repository.UnitOfWork, checker InvariantChecker) *TaskUseCase {
	return &TaskUseCase{
		uow:              uow,
		invariantChecker: checker,
	}
}

// Authenticate implements TLA+ Authenticate action
func (uc *TaskUseCase) Authenticate(userID domain.UserID) (*domain.Session, error) {
	// Preconditions from TLA+:
	// - user \in Users
	// - ~sessions[user]
	
	user, err := uc.uow.Users().GetUser(userID)
	if err != nil {
		return nil, fmt.Errorf("user not found: %w", err)
	}
	
	// Check if user already has an active session
	existingSession, _ := uc.uow.Sessions().GetSessionByUser(userID)
	if existingSession != nil && existingSession.IsValid() {
		return nil, fmt.Errorf("user %s already has an active session", userID)
	}
	
	// Create new session
	token := generateToken()
	session := &domain.Session{
		UserID:    user.ID,
		Token:     token,
		Active:    true,
		CreatedAt: time.Now(),
		ExpiresAt: time.Now().Add(24 * time.Hour),
	}
	
	// Update state
	if err := uc.uow.Sessions().CreateSession(session); err != nil {
		return nil, fmt.Errorf("failed to create session: %w", err)
	}
	
	if err := uc.uow.SystemState().SetCurrentUser(&userID); err != nil {
		return nil, fmt.Errorf("failed to set current user: %w", err)
	}
	
	// Check invariants
	state, _ := uc.uow.SystemState().GetSystemState()
	if err := uc.invariantChecker.CheckAllInvariants(state); err != nil {
		uc.uow.Rollback()
		return nil, fmt.Errorf("invariant violation: %w", err)
	}
	
	return session, nil
}


// CreateTask implements TLA+ CreateTask action
func (uc *TaskUseCase) CreateTask(
	title, description string,
	priority domain.Priority,
	assignee domain.UserID,
	dueDate *time.Time,
	tags []domain.Tag,
	dependencies []domain.TaskID,
) (*domain.Task, error) {
	// Preconditions from TLA+:
	// - currentUser # NULL
	// - currentUser \in Users
	// - nextTaskId <= MaxTasks
	// - deps \subseteq DOMAIN tasks
	// - \A dep \in deps : tasks[dep].status # "cancelled"
	
	currentUser, err := uc.uow.SystemState().GetCurrentUser()
	if err != nil || currentUser == nil {
		return nil, fmt.Errorf("authentication required")
	}
	
	// Check max tasks limit
	nextID, err := uc.uow.SystemState().GetNextTaskID()
	if err != nil {
		return nil, fmt.Errorf("failed to get next task ID: %w", err)
	}
	
	if nextID > domain.MaxTasks {
		return nil, fmt.Errorf("maximum number of tasks (%d) reached", domain.MaxTasks)
	}
	
	// Validate dependencies
	allTasks, err := uc.uow.Tasks().GetAllTasks()
	if err != nil {
		return nil, fmt.Errorf("failed to get tasks: %w", err)
	}
	
	depMap := make(map[domain.TaskID]bool)
	for _, depID := range dependencies {
		depTask, exists := allTasks[depID]
		if !exists {
			return nil, fmt.Errorf("dependency task %d does not exist", depID)
		}
		if depTask.Status == domain.StatusCancelled {
			return nil, fmt.Errorf("cannot depend on cancelled task %d", depID)
		}
		depMap[depID] = true
	}
	
	// Check for cyclic dependencies
	if err := uc.checkCyclicDependencies(nextID, depMap, allTasks); err != nil {
		return nil, err
	}
	
	// Determine initial status based on dependencies
	status := domain.StatusPending
	if len(dependencies) > 0 {
		// Check if all dependencies are completed
		allCompleted := true
		for depID := range depMap {
			if allTasks[depID].Status != domain.StatusCompleted {
				allCompleted = false
				break
			}
		}
		if !allCompleted {
			status = domain.StatusBlocked
		}
	}
	
	// Create task
	task := &domain.Task{
		ID:           nextID,
		Title:        title,
		Description:  description,
		Status:       status,
		Priority:     priority,
		Assignee:     assignee,
		CreatedBy:    *currentUser,
		CreatedAt:    time.Now(),
		UpdatedAt:    time.Now(),
		DueDate:      dueDate,
		Tags:         tags,
		Dependencies: depMap,
	}
	
	// Validate task
	if err := task.Validate(); err != nil {
		return nil, fmt.Errorf("task validation failed: %w", err)
	}
	
	// Save task
	if err := uc.uow.Tasks().CreateTask(task); err != nil {
		return nil, fmt.Errorf("failed to create task: %w", err)
	}
	
	// Increment next task ID
	if _, err := uc.uow.SystemState().IncrementNextTaskID(); err != nil {
		return nil, fmt.Errorf("failed to increment task ID: %w", err)
	}
	
	// Check invariants
	state, _ := uc.uow.SystemState().GetSystemState()
	if err := uc.invariantChecker.CheckAllInvariants(state); err != nil {
		uc.uow.Rollback()
		return nil, fmt.Errorf("invariant violation after task creation: %w", err)
	}
	
	return task, nil
}

// UpdateTaskStatus implements TLA+ UpdateTaskStatus action
func (uc *TaskUseCase) UpdateTaskStatus(taskID domain.TaskID, newStatus domain.TaskStatus) error {
	// Preconditions from TLA+:
	// - currentUser # NULL
	// - TaskExists(taskId)
	// - taskId \in GetUserTasks(currentUser)
	// - IsValidTransition(tasks[taskId].status, newStatus)
	// - newStatus = "in_progress" => all dependencies completed
	
	currentUser, err := uc.uow.SystemState().GetCurrentUser()
	if err != nil || currentUser == nil {
		return fmt.Errorf("authentication required")
	}
	
	task, err := uc.uow.Tasks().GetTask(taskID)
	if err != nil {
		return fmt.Errorf("task not found: %w", err)
	}
	
	// Check user owns the task
	userTasks, err := uc.uow.SystemState().GetUserTasks(*currentUser)
	if err != nil {
		return fmt.Errorf("failed to get user tasks: %w", err)
	}
	
	hasTask := false
	for _, id := range userTasks {
		if id == taskID {
			hasTask = true
			break
		}
	}
	
	if !hasTask {
		return fmt.Errorf("user does not have access to task %d", taskID)
	}
	
	// Check valid transition
	if !domain.IsValidTransition(task.Status, newStatus) {
		return fmt.Errorf("invalid transition from %s to %s", task.Status, newStatus)
	}
	
	// Check dependencies if moving to in_progress
	if newStatus == domain.StatusInProgress {
		allTasks, _ := uc.uow.Tasks().GetAllTasks()
		for depID := range task.Dependencies {
			if depTask, exists := allTasks[depID]; exists {
				if depTask.Status != domain.StatusCompleted {
					return fmt.Errorf("cannot start task: dependency %d is not completed", depID)
				}
			}
		}
	}
	
	// Update status
	task.Status = newStatus
	task.UpdatedAt = time.Now()
	
	if err := uc.uow.Tasks().UpdateTask(task); err != nil {
		return fmt.Errorf("failed to update task: %w", err)
	}
	
	// Check invariants
	state, _ := uc.uow.SystemState().GetSystemState()
	if err := uc.invariantChecker.CheckAllInvariants(state); err != nil {
		uc.uow.Rollback()
		return fmt.Errorf("invariant violation: %w", err)
	}
	
	return nil
}
...

Step 6: TLA+ Generated Tests

The real power comes when we use TLA+ execution traces to generate comprehensive tests:

My prompt to Claude:

Generate Go tests that verify the implementation satisfies the TLA+ specification.
Create test cases that:

1. Test all TLA+ actions with valid preconditions
2. Test safety property violations
3. Test edge cases from the TLA+ model boundary conditions
4. Use property-based testing where appropriate

Include tests that would catch the execution traces TLA+ model checker explores.

Claude generates (invariant_test.go):

// Package property implements property-based tests for TLA+ invariants
package property

import (
	"math/rand"
	"testing"
	"time"

	"github.com/bhatti/sample-task-management/internal/domain"
	"github.com/bhatti/sample-task-management/internal/infrastructure/memory"
	"github.com/bhatti/sample-task-management/internal/usecase"
	"github.com/bhatti/sample-task-management/pkg/invariants"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
)

// TestInvariantsHoldAfterOperations verifies invariants hold after each operation
func TestInvariantsHoldAfterOperations(t *testing.T) {
	repo := memory.NewMemoryRepository()
	uow := memory.NewMemoryUnitOfWork(repo)
	checker := invariants.NewInvariantChecker()
	uc := usecase.NewTaskUseCase(uow, checker)

	// Setup initial users
	users := []domain.UserID{"alice", "bob", "charlie"}
	for _, userID := range users {
		user := &domain.User{
			ID:       userID,
			Name:     string(userID),
			Email:    string(userID) + "@example.com",
			JoinedAt: time.Now(),
		}
		require.NoError(t, repo.CreateUser(user))
	}

	// Property: Invariants hold after authentication
	t.Run("InvariantsAfterAuthentication", func(t *testing.T) {
		for _, userID := range users {
			session, err := uc.Authenticate(userID)
			assert.NoError(t, err)
			assert.NotNil(t, session)

			state, _ := repo.GetSystemState()
			assert.NoError(t, checker.CheckAllInvariants(state))

			// Cleanup
			_ = uc.Logout(userID)
		}
	})

	// Property: Invariants hold after task creation
	t.Run("InvariantsAfterTaskCreation", func(t *testing.T) {
		uc.Authenticate("alice")

		for i := 0; i < 10; i++ {
			task, err := uc.CreateTask(
				"Task "+string(rune(i)),
				"Description",
				randomPriority(),
				randomUser(users),
				randomDueDate(),
				randomTags(),
				[]domain.TaskID{}, // No dependencies initially
			)

			assert.NoError(t, err)
			assert.NotNil(t, task)

			state, _ := repo.GetSystemState()
			assert.NoError(t, checker.CheckAllInvariants(state))
		}
	})

	// Property: Invariants hold after status transitions
	t.Run("InvariantsAfterStatusTransitions", func(t *testing.T) {
		uc.Authenticate("alice")

		// Create a task
		task, _ := uc.CreateTask(
			"Test Task",
			"Description",
			domain.PriorityMedium,
			"alice",
			nil,
			[]domain.Tag{domain.TagFeature},
			[]domain.TaskID{},
		)

		// Valid transitions
		validTransitions := []domain.TaskStatus{
			domain.StatusInProgress,
			domain.StatusCompleted,
		}

		for _, status := range validTransitions {
			err := uc.UpdateTaskStatus(task.ID, status)
			if err == nil {
				state, _ := repo.GetSystemState()
				assert.NoError(t, checker.CheckAllInvariants(state))
			}
		}
	})

	// Property: No cyclic dependencies can be created
	t.Run("NoCyclicDependencies", func(t *testing.T) {
		uc.Authenticate("alice")

		// Create tasks with potential cycles
		task1, _ := uc.CreateTask("Task1", "Desc", domain.PriorityLow, "alice", nil, nil, []domain.TaskID{})
		task2, _ := uc.CreateTask("Task2", "Desc", domain.PriorityLow, "alice", nil, nil, []domain.TaskID{task1.ID})
		task3, _ := uc.CreateTask("Task3", "Desc", domain.PriorityLow, "alice", nil, nil, []domain.TaskID{task2.ID})

		// Attempting to create a cycle should fail
		_, err := uc.CreateTask("Task4", "Desc", domain.PriorityLow, "alice", nil, nil,
			[]domain.TaskID{task3.ID, task1.ID}) // This would create a cycle
		assert.NoError(t, err)

		// Even if it doesn't fail explicitly, invariants should catch it
		state, _ := repo.GetSystemState()
		assert.NoError(t, checker.CheckAllInvariants(state))
	})
}

// TestTransitionInvariants tests state transition validity
func TestTransitionInvariants(t *testing.T) {
	checker := invariants.NewInvariantChecker()

	// Test all valid transitions
	validTransitions := []struct {
		from domain.TaskStatus
		to   domain.TaskStatus
	}{
		{domain.StatusPending, domain.StatusInProgress},
		{domain.StatusPending, domain.StatusCancelled},
		{domain.StatusInProgress, domain.StatusCompleted},
		{domain.StatusInProgress, domain.StatusCancelled},
		{domain.StatusBlocked, domain.StatusPending},
		{domain.StatusBlocked, domain.StatusCancelled},
	}

	for _, trans := range validTransitions {
		t.Run(string(trans.from)+"_to_"+string(trans.to), func(t *testing.T) {
			err := checker.CheckTransitionInvariant(trans.from, trans.to)
			assert.NoError(t, err)
		})
	}

	// Test invalid transitions
	invalidTransitions := []struct {
		from domain.TaskStatus
		to   domain.TaskStatus
	}{
		{domain.StatusCompleted, domain.StatusPending},
		{domain.StatusCompleted, domain.StatusInProgress},
		{domain.StatusCancelled, domain.StatusInProgress},
		{domain.StatusPending, domain.StatusCompleted}, // Must go through in_progress
	}

	for _, trans := range invalidTransitions {
		t.Run("Invalid_"+string(trans.from)+"_to_"+string(trans.to), func(t *testing.T) {
			err := checker.CheckTransitionInvariant(trans.from, trans.to)
			assert.Error(t, err)
		})
	}
}

// TestPropertyTaskOwnership verifies task ownership invariants
func TestPropertyTaskOwnership(t *testing.T) {
	repo := memory.NewMemoryRepository()
	uow := memory.NewMemoryUnitOfWork(repo)
	checker := invariants.NewInvariantChecker()
	uc := usecase.NewTaskUseCase(uow, checker)

	// Setup users
	users := []domain.UserID{"alice", "bob"}
	for _, userID := range users {
		user := &domain.User{
			ID:       userID,
			Name:     string(userID),
			Email:    string(userID) + "@example.com",
			JoinedAt: time.Now(),
		}
		repo.CreateUser(user)
	}

	// Property: Task reassignment maintains ownership invariants
	t.Run("ReassignmentMaintainsOwnership", func(t *testing.T) {
		uc.Authenticate("alice")

		// Create task assigned to Alice
		task, err := uc.CreateTask(
			"Test Task",
			"Description",
			domain.PriorityHigh,
			"alice",
			nil,
			[]domain.Tag{domain.TagBug},
			[]domain.TaskID{},
		)
		require.NoError(t, err)

		// Check initial ownership
		state, _ := repo.GetSystemState()
		assert.NoError(t, checker.CheckAllInvariants(state))

		aliceTasks := state.GetUserTasks("alice")
		assert.Contains(t, aliceTasks, task.ID)

		// Reassign to Bob
		err = uc.ReassignTask(task.ID, "bob")
		require.NoError(t, err)

		// Check ownership after reassignment
		state, _ = repo.GetSystemState()
		assert.NoError(t, checker.CheckAllInvariants(state))

		aliceTasks = state.GetUserTasks("alice")
		bobTasks := state.GetUserTasks("bob")
		assert.NotContains(t, aliceTasks, task.ID)
		assert.Contains(t, bobTasks, task.ID)
	})
}

// TestPropertyConcurrentOperations tests invariants under concurrent operations
func TestPropertyConcurrentOperations(t *testing.T) {
	repo := memory.NewMemoryRepository()
	uow := memory.NewMemoryUnitOfWork(repo)
	checker := invariants.NewInvariantChecker()

	// Setup users
	users := []domain.UserID{"user1", "user2", "user3"}
	for _, userID := range users {
		user := &domain.User{
			ID:       userID,
			Name:     string(userID),
			Email:    string(userID) + "@example.com",
			JoinedAt: time.Now(),
		}
		repo.CreateUser(user)
	}

	// Run concurrent operations
	done := make(chan bool, len(users))

	for _, userID := range users {
		go func(uid domain.UserID) {
			uc := usecase.NewTaskUseCase(uow, checker)

			// Authenticate
			uc.Authenticate(uid)

			// Create multiple tasks
			for i := 0; i < 5; i++ {
				uc.CreateTask(
					"Task",
					"Description",
					randomPriority(),
					uid,
					nil,
					randomTags(),
					[]domain.TaskID{},
				)

				// Random delay
				time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
			}

			done <- true
		}(userID)
	}

	// Wait for all goroutines
	for i := 0; i < len(users); i++ {
		<-done
	}

	// Check invariants after concurrent operations
	state, _ := repo.GetSystemState()
	assert.NoError(t, checker.CheckAllInvariants(state))
}

// Helper functions

func randomPriority() domain.Priority {
	priorities := []domain.Priority{
		domain.PriorityLow,
		domain.PriorityMedium,
		domain.PriorityHigh,
		domain.PriorityCritical,
	}
	return priorities[rand.Intn(len(priorities))]
}

func randomUser(users []domain.UserID) domain.UserID {
	return users[rand.Intn(len(users))]
}

func randomDueDate() *time.Time {
	if rand.Float32() < 0.5 {
		return nil
	}
	due := time.Now().Add(time.Duration(rand.Intn(30)) * 24 * time.Hour)
	return &due
}

func randomTags() []domain.Tag {
	allTags := []domain.Tag{
		domain.TagBug,
		domain.TagFeature,
		domain.TagEnhancement,
		domain.TagDocumentation,
	}

	numTags := rand.Intn(len(allTags) + 1)
	if numTags == 0 {
		return nil
	}

	tags := make([]domain.Tag, 0, numTags)
	used := make(map[domain.Tag]bool)

	for len(tags) < numTags {
		tag := allTags[rand.Intn(len(allTags))]
		if !used[tag] {
			tags = append(tags, tag)
			used[tag] = true
		}
	}

	return tags
}

Advanced TLA+ Patterns with Claude

Modeling Concurrent Operations

One of TLA+’s strengths is modeling concurrent systems. Let’s extend our specification to handle concurrent task updates:

\* Concurrent task updates with conflict resolution
ConcurrentUpdateTask(taskId, newStatus, version) ==
    /\ currentUser # NULL
    /\ taskId \in DOMAIN tasks
    /\ taskId \in userTasks[currentUser]
    /\ tasks[taskId].version = version  \* Optimistic concurrency control
    /\ tasks' = [tasks EXCEPT ![taskId] = [
                     @ EXCEPT 
                     !.status = newStatus,
                     !.version = @ + 1,
                     !.lastModified = currentUser
                 ]]
    /\ UNCHANGED <<userTasks, nextTaskId, currentUser>>

Prompt to Claude:

Implement optimistic concurrency control for the task updates based on this 
TLA+ specification. Include version tracking and conflict detection.

Modeling Complex Business Rules

TLA+ excels at capturing complex business logic:

\* Business rule: High priority tasks cannot be cancelled directly
ValidStatusTransition(currentStatus, newStatus, priority) ==
    \/ newStatus = currentStatus
    \/ /\ currentStatus = "pending" 
       /\ newStatus \in {"in_progress", "cancelled"}
    \/ /\ currentStatus = "in_progress"
       /\ newStatus \in {"completed", "pending"}
    \/ /\ currentStatus = "in_progress"
       /\ newStatus = "cancelled"
       /\ priority # "high"  \* High priority tasks cannot be cancelled

Lessons Learned

After applying this TLA+ approach to several experimental projects, here are the key insights:

1. Start Small

Begin with core actions and properties. TLA+ specifications can grow complex quickly, so start with the essential behaviors:

\* Start with basic CRUD
Init, CreateTask, UpdateTask, DeleteTask

\* Add complexity incrementally  
Authentication, Authorization, Concurrency, Business Rules

Avoid Initially: Complex distributed systems, performance-critical algorithms

Graduate To: Multi-service interactions, complex business logic

2. Properties Drive Design

Writing TLA+ properties often reveals design flaws before implementation:

\* This property might fail, revealing a design issue
ConsistencyProperty == 
    \A user \in Users:
        \A taskId \in userTasks[user]:
            /\ taskId \in DOMAIN tasks
            /\ tasks[taskId].assignee = user
            /\ tasks[taskId].status # "deleted"  \* Soft delete consideration

3. Model Checking Finds Edge Cases

TLA+ model checking explores execution paths you’d never think to test:

# TLA+ finds this counterexample:
# Step 1: User1 creates Task1
# Step 2: User1 deletes Task1  
# Step 3: User2 creates Task2 (gets same ID due to reuse)
# Step 4: User1 tries to update Task1 -> Security violation!

This led to using UUIDs instead of incrementing integers for task IDs.

4. Generated Tests Are Comprehensive

TLA+ execution traces become your regression test suite. When Claude implements based on TLA+ specs, you get:

  • Complete coverage – All specification paths tested
  • Edge case detection – Boundary conditions from model checking
  • Behavioral contracts – Tests verify actual system properties

Documentation Generation

Prompt to Claude:

Generate API documentation from this TLA+ specification that includes:
1. Endpoint descriptions derived from TLA+ actions
2. Request/response schemas from TLA+ data structures  
3. Error conditions from TLA+ preconditions
4. Behavioral guarantees from TLA+ properties

Code Review Guidelines

With TLA+ specifications, code reviews become more focused:

  1. Does implementation satisfy the TLA+ spec?
  2. Are all preconditions checked?
  3. Do safety properties hold?
  4. Are error conditions handled as specified?

Comparing Specification Approaches

ApproachPrecisionAI EffectivenessMaintenanceLearning CurveTool ComplexityCode Generation
Vibe CodingLowInconsistentHighLowLowN/A
UML/MDDMediumPoorVery HighHighVery HighBrittle
BDD/GherkinMediumBetterMediumMediumLowLimited
TLA+ SpecsHighExcellentLowHighLowReliable

Tools and Resources

Essential TLA+ Resources

  • Learn TLA+: https://learntla.com – Interactive tutorial
  • TLA+ Video Course: Leslie Lamport’s official course
  • Practical TLA+: Hillel Wayne’s book – focus on software systems
  • TLA+ Examples: https://github.com/tlaplus/Examples

Common Mistakes

1. Avoid These Mistakes

? Writing TLA+ like code

\* Wrong - this looks like pseudocode
CreateTask == 
    if currentUser != null then
        task = new Task()

? Writing TLA+ as mathematical relations

\* Right - mathematical specification  
CreateTask == 
    /\ currentUser # NULL
    /\ tasks' = tasks @@ (nextTaskId :> newTask)

? Asking Claude to “fix the TLA+ to match the code”

The spec is the truth – fix the code to match the spec

? Asking Claude to “implement this TLA+ specification correctly”

? Specification scope creep: Starting with entire system architecture ? Incremental approach: Begin with one core workflow, expand gradually

2. Claude Integration Pitfalls

? “Fix the spec to match my code”: Treating specifications as documentation ? “Fix the code to match the spec”: Specifications are the source of truth

3. The Context Overload Trap

Problem: Dumping too much information at once
Solution: Break complex features into smaller, focused requests

4. The “Fix My Test” Antipattern

Problem: When tests fail, asking Claude to modify the test instead of the code
Solution: Always fix the implementation, not the test (unless the test is genuinely wrong)

5. The Blind Trust Mistake

Problem: Accepting generated code without understanding it
Solution: Always review and understand the code before committing

Proven Patterns

1. Save effective prompts:

# ~/.claude/tla-prompts/implementation.md
Implement [language] code that satisfies this TLA+ specification:

[SPEC]

Requirements:
- All TLA+ actions become functions/methods
- All preconditions become runtime checks  
- All data structures match TLA+ types
- Include comprehensive tests covering specification traces

Create specification templates:

--------------------------- MODULE [ModuleName] ---------------------------
EXTENDS Integers, Sequences, FiniteSets

CONSTANTS [Constants]

VARIABLES [StateVariables]

[TypeDefinitions]

Init == [InitialConditions]

[Actions]

Next == [ActionDisjunction]

Spec == Init /\ [][Next]_[StateVariables]

[SafetyProperties]

[LivenessProperties]

=============================================================================

2. The “Explain First” Pattern

Before asking Claude to implement something complex, I ask for an explanation:

Explain how you would implement real-time task updates using WebSockets. 
What are the trade-offs between Socket.io and native WebSockets?
What state management challenges should I consider?

3. The “Progressive Enhancement” Pattern

Start simple, then add complexity:

1. First: "Create a basic task model with CRUD operations"
2. Then: "Add validation and error handling"
3. Then: "Add authentication and authorization"
4. Finally: "Add real-time updates and notifications"

4. The “Code Review” Pattern

After implementation, I ask Claude to review its own code:

Review the task API implementation for:
- Security vulnerabilities
- Performance issues
- Code style consistency
- Missing error cases

Be critical and suggest improvements.

What’s Next

As I’ve developed this TLA+/Claude workflow, I’ve realized we’re approaching something profound: specifications as the primary artifact. Instead of writing code and hoping it’s correct, we’re defining correct behavior formally and letting AI generate the implementation. This inverts the traditional relationship between specification and code.

Implications for Software Engineering

  1. Design-first development becomes natural
  2. Bug prevention replaces bug fixing
  3. Refactoring becomes re-implementation from stable specs
  4. Documentation is always up-to-date (it’s the spec)

I’m currently experimenting with:

  • TLA+ to test case generation – Automated comprehensive testing
  • Multi-language implementations – Same spec, different languages
  • Specification composition – Building larger systems from verified components
  • Quint specifications – A modern executable specification language with simpler syntax than TLA+

Conclusion: The End of Vibe Coding

After using TLA+ with Claude, I can’t go back to vibe coding. The precision, reliability, and confidence that comes from executable specifications has transformed how I build software. The complete working example—TLA+ specs, Go implementation, comprehensive tests, and CI/CD pipeline—is available at github.com/bhatti/sample-task-management.

Yes, there’s a learning curve. Yes, writing TLA+ specifications takes time upfront. But the payoff—in terms of correctness, maintainability, and development speed—is extraordinary. Claude becomes not just a code generator, but a reliable engineering partner that can reason about complex systems precisely because we’ve given it precise specifications to work from. We’re moving from “code and hope” to “specify and know”—and that changes everything.


August 16, 2025

The Complete Guide to gRPC Load Balancing in Kubernetes and Istio

Filed under: Computing,Web Services — Tags: , — admin @ 12:05 pm

TL;DR – The Test Results Matrix

ConfigurationLoad BalancingWhy
Local gRPC? NoneSingle server instance
Kubernetes + gRPC? NoneConnection-level LB only
Kubernetes + Istio? PerfectL7 proxy with request-level LB
Client-side LB?? LimitedRequires multiple endpoints
kubectl port-forward + Istio? NoneBypasses service mesh

Complete test suite ?


Introduction: The gRPC Load Balancing Problem

When you deploy a gRPC service in Kubernetes with multiple replicas, you expect load balancing. You won’t get it. This guide tests every possible configuration to prove why, and shows exactly how to fix it. According to the official gRPC documentation:

“gRPC uses HTTP/2, which multiplexes multiple calls on a single TCP connection. This means that once the connection is established, all gRPC calls will go to the same backend.”


Complete Test Matrix

We’ll test 6 different configurations:

  1. Baseline: Local Testing (Single server)
  2. Kubernetes without Istio (Standard deployment)
  3. Kubernetes with Istio (Service mesh solution)
  4. Client-side Load Balancing (gRPC built-in)
  5. Advanced Connection Testing (Multiple connections)
  6. Real-time Monitoring (Live traffic analysis)

Prerequisites

git clone https://github.com/bhatti/grpc-lb-test
cd grpc-lb-test

# Build all components
make build

Test 1: Baseline – Local Testing

Purpose: Establish baseline behavior with a single server.

# Terminal 1: Start local server
./bin/server

# Terminal 2: Test with basic client
./bin/client -target localhost:50051 -requests 50

Expected Result:

? Load Distribution Results:
Server: unknown-1755316152
Pod: unknown (IP: unknown)
Requests: 50 (100.0%)
????????????????????
? Total servers hit: 1
?? WARNING: All requests went to a single server!
This indicates NO load balancing is happening.

Analysis: This confirms our client implementation works correctly and establishes the baseline.


Test 2: Kubernetes Without Istio

Purpose: Prove that standard Kubernetes doesn’t provide gRPC request-level load balancing.

Deploy the Service

# Deploy 5 replicas without Istio
./scripts/test-without-istio.sh

The k8s/without-istio/deployment.yaml creates:

  • 5 gRPC server replicas
  • Standard Kubernetes Service
  • No Istio annotations

Test Results

???? Load Distribution Results:
================================
Server: grpc-echo-server-5b657689db-gh5z5-1755316388
  Pod: grpc-echo-server-5b657689db-gh5z5 (IP: 10.1.4.148)
  Requests: 30 (100.0%)
  ????????????????????

???? Total servers hit: 1
??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.

???? Connection Analysis:
Without Istio, gRPC maintains a single TCP connection to the Kubernetes Service IP.
The kube-proxy performs L4 load balancing, but gRPC reuses the same connection.

???? Cleaning up...
deployment.apps "grpc-echo-server" deleted
service "grpc-echo-service" deleted
./scripts/test-without-istio.sh: line 57: 17836 Terminated: 15   
kubectl port-forward service/grpc-echo-service 50051:50051 > /dev/null 2>&1

??  RESULT: No load balancing observed - all requests went to single pod!

Why This Happens

The Kubernetes Service documentation explains:

“For each Service, kube-proxy installs iptables rules which capture traffic to the Service’s clusterIP and port, and redirect that traffic to one of the Service’s backend endpoints.”

Kubernetes Services perform L4 (connection-level) load balancing, but gRPC maintains persistent connections.

Connection Analysis

Run the analysis tool to see connection behavior:

./bin/analysis -target localhost:50051 -requests 100 -test-scenarios true

Result:

? NO LOAD BALANCING: All requests to single server

???? Connection Reuse Analysis:
  Average requests per connection: 1.00
  ??  Low connection reuse (many short connections)

? Connection analysis complete!

Test 3: Kubernetes With Istio

Purpose: Demonstrate how Istio’s L7 proxy solves the load balancing problem.

Install Istio

./scripts/install-istio.sh

This follows Istio’s official installation guide:

istioctl install --set profile=demo -y
kubectl label namespace default istio-injection=enabled

Deploy With Istio

./scripts/test-with-istio.sh

The k8s/with-istio/deployment.yaml includes:

annotations:
  sidecar.istio.io/inject: "true"
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: grpc-echo-service
spec:
  host: grpc-echo-service
  trafficPolicy:
    connectionPool:
      http:
        http2MaxRequests: 100
        maxRequestsPerConnection: 10
    loadBalancer:
      simple: ROUND_ROBIN

Critical Testing Gotcha

? Wrong way (what most people do):

kubectl port-forward service/grpc-echo-service 50051:50051
./bin/client -target localhost:50051 -requests 50
# Result: Still no load balancing!

According to Istio’s architecture docs, kubectl port-forward bypasses the Envoy sidecar proxy.

? Correct Testing Method

Test from inside the service mesh:

./scripts/test-with-istio.sh

Test Results With Istio

???? Load Distribution Results:
================================

Server: grpc-echo-server-579dfbc76b-m2v7x-1755357769
  Pod: grpc-echo-server-579dfbc76b-m2v7x (IP: 10.1.4.237)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-fkgkk-1755357769
  Pod: grpc-echo-server-579dfbc76b-fkgkk (IP: 10.1.4.240)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-bsjdv-1755357769
  Pod: grpc-echo-server-579dfbc76b-bsjdv (IP: 10.1.4.241)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-dw2m7-1755357770
  Pod: grpc-echo-server-579dfbc76b-dw2m7 (IP: 10.1.4.236)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-x85jm-1755357769
  Pod: grpc-echo-server-579dfbc76b-x85jm (IP: 10.1.4.238)
  Requests: 10 (20.0%)
  ????????

???? Total unique servers: 5

? Load balancing detected across 5 servers!
   Expected requests per server: 10.0
   Distribution variance: 0.00

How Istio Solves This

From Istio’s traffic management documentation:

“Envoy proxies are deployed as sidecars to services, logically augmenting the services with traffic management capabilities… Envoy proxies are the only Istio components that interact with data plane traffic.”

Istio’s solution:

  1. Envoy sidecar intercepts all traffic
  2. Performs L7 (application-level) load balancing
  3. Maintains connection pools to all backends
  4. Routes each request independently

Test 4: Client-Side Load Balancing

Purpose: Test gRPC’s built-in client-side load balancing capabilities.

Standard Client-Side LB

./scripts/test-client-lb.sh

The cmd/client-lb/main.go implements gRPC’s native load balancing:

conn, err := grpc.Dial(
    "dns:///"+target,
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
)

Results and Limitations

 Load Distribution Results:
================================
Server: grpc-echo-server-5b657689db-g9pbw-1755359830
  Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
  Requests: 10 (100.0%)
  ????????????????????

???? Total servers hit: 1
??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.
? Normal client works - service is accessible

???? Test 2: Client-side round-robin (from inside cluster)
?????????????????????????????????????????????????????
Creating test pod inside cluster for proper DNS resolution...
pod "client-lb-test" deleted
./scripts/test-client-lb.sh: line 71: 48208 Terminated: 15          kubectl port-forward service/grpc-echo-service 50051:50051 > /dev/null 2>&1

??  Client-side LB limitation explanation:
   gRPC client-side round-robin expects multiple A records
   But Kubernetes Services return only one ClusterIP
   Result: 'no children to pick from' error

???? What happens with client-side LB:
   1. Client asks DNS for: grpc-echo-service
   2. DNS returns: 10.105.177.23 (single IP)
   3. gRPC round-robin needs: multiple IPs for load balancing
   4. Result: Error 'no children to pick from'

? This proves client-side LB doesn't work with K8s Services!

???? Test 3: Demonstrating the DNS limitation
?????????????????????????????????????????????
What gRPC client-side LB sees:
   Service name: grpc-echo-service:50051
   DNS resolution: 10.105.177.23:50051
   Available endpoints: 1 (needs multiple for round-robin)

What gRPC client-side LB needs:
   Multiple A records from DNS, like:
   grpc-echo-service ? 10.1.4.241:50051
   grpc-echo-service ? 10.1.4.240:50051
   grpc-echo-service ? 10.1.4.238:50051
   (But Kubernetes Services don't provide this)

???? Test 4: Alternative - Multiple connections
????????????????????????????????????????????
Testing alternative approach with multiple connections...

???? Configuration:
   Target: localhost:50052
   API: grpc.Dial
   Load Balancing: round-robin
   Multi-endpoint: true
   Requests: 20

???? Using multi-endpoint resolver

???? Sending 20 unary requests...

? Request 1 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 2 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 3 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 4 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 5 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 6 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 7 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 8 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 9 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 10 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 11 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)

? Successful requests: 20/20

???? Load Distribution Results:
================================

Server: grpc-echo-server-5b657689db-g9pbw-1755359830
  Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
  Requests: 20 (100.0%)
  ????????????????????????????????????????

???? Total unique servers: 1

??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.
   This is expected for gRPC without Istio or special configuration.
? Multi-connection approach works!
   (This simulates multiple endpoints for testing)

???????????????????????????????????????????????????????????????
                         SUMMARY
???????????????????????????????????????????????????????????????

? KEY FINDINGS:
   • Standard gRPC client: Works (uses single connection)
   • Client-side round-robin: Fails (needs multiple IPs)
   • Kubernetes DNS: Returns single ClusterIP only
   • Alternative: Multiple connections can work

???? CONCLUSION:
   Client-side load balancing doesn't work with standard
   Kubernetes Services because they provide only one IP address.
   This proves why Istio (L7 proxy) is needed for gRPC load balancing!

Why this fails: Kubernetes Services provide a single ClusterIP, not multiple IPs for DNS resolution.

From the gRPC load balancing documentation:

“The gRPC client will use the list of IP addresses returned by the name resolver and distribute RPCs among them.”

Alternative: Multiple Connections

Start five instances of servers with different ports:

# Terminal 1
GRPC_PORT=50051 ./bin/server

# Terminal 2  
GRPC_PORT=50052 ./bin/server

# Terminal 3
GRPC_PORT=50053 ./bin/server

# Terminal 4
GRPC_PORT=50054 ./bin/server

# Terminal 5
GRPC_PORT=50055 ./bin/server

The cmd/client-v2/main.go implements manual connection management:

./bin/client-v2 -target localhost:50051 -requests 50 -multi-endpoint

Results:

???? Load Distribution Results:
================================

Server: unknown-1755360953
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360963
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360970
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360980
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360945
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

???? Total unique servers: 5

? Load balancing detected across 5 servers!
   Expected requests per server: 10.0
   Distribution variance: 0.00

Test 5: Advanced Connection Testing

Purpose: Analyze connection patterns and performance implications.

Multiple Connection Strategy

./bin/advanced-client \
  -target localhost:50051 \
  -requests 1000 \
  -clients 10 \
  -connections 5

Results:

???? Detailed Load Distribution Results:
=====================================
Test Duration: 48.303709ms
Total Requests: 1000
Failed Requests: 0
Requests/sec: 20702.34

Server Distribution:

Server: unknown-1755360945
  Pod: unknown (IP: unknown)
  Requests: 1000 (100.0%)
  First seen: 09:18:51.842
  Last seen: 09:18:51.874
  ????????????????????????????????????????

???? Analysis:
Total unique servers: 1
Average requests per server: 1000.00
Standard deviation: 0.00

??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.
   This is expected behavior for gRPC without Istio.

Even sophisticated connection pooling can’t overcome the fundamental issue:
• Multiple connections to SAME endpoint = same server
• Advanced client techniques ? load balancing
• Connection management ? request distribution

Performance Comparison

./scripts/benchmark.sh

???? Key Insights:
• Single server: High performance, no load balancing
• Multiple connections: Same performance, still no LB
• Kubernetes: Small overhead, still no LB
• Istio: Small additional overhead, but enables LB
• Client-side LB: Complex setup, limited effectiveness


Official Documentation References

gRPC Load Balancing

From the official gRPC blog:

“Load balancing within gRPC happens on a per-call basis, not a per-connection basis. In other words, even if all requests come from a single client, we want to distribute them across all servers.”

The problem: Standard deployments don’t achieve per-call balancing.

Istio’s Solution

From Istio’s service mesh documentation:

“Istio’s data plane is composed of a set of intelligent proxies (Envoy) deployed as sidecars. These proxies mediate and control all network communication between microservices.”

Kubernetes Service Limitations

From Kubernetes networking concepts:

“kube-proxy… only supports TCP and UDP… doesn’t understand HTTP and doesn’t provide load balancing for HTTP requests.”


Complete Test Results Summary

After running comprehensive tests across all possible gRPC load balancing configurations, here are the definitive results that prove the fundamental limitations and solutions:

???? Core Test Matrix Results

ConfigurationLoad BalancingServers HitDistributionKey Insight
Local gRPC? None1/1 (100%)Single serverBaseline behavior confirmed
Kubernetes + gRPC? None1/5 (100%)Single podK8s Services don’t solve it
Kubernetes + Istio? Perfect5/5 (20% each)Even distributionIstio enables true LB
Client-side LB? Failed1/5 (100%)Single podDNS limitation fatal
kubectl port-forward + Istio? None1/5 (100%)Single podTesting methodology matters
Advanced multi-connection? None1/1 (100%)Single endpointComplex ? effective

???? Detailed Test Scenario Analysis

Scenario 1: Baseline Tests

Local single server:     ? PASS - 50 requests ? 1 server (100%)
Local multiple conn:     ? PASS - 1000 requests ? 1 server (100%)

Insight: Confirms gRPC’s connection persistence behavior. Multiple connections to same endpoint don’t change distribution.

Scenario 2: Kubernetes Standard Deployment

K8s without Istio:      ? PASS - 50 requests ? 1 pod (100%)
Expected behavior:      ? NO load balancing
Actual behavior:        ? NO load balancing  

Insight: Standard Kubernetes deployment with 5 replicas provides zero request-level load balancing for gRPC services.

Scenario 3: Istio Service Mesh

K8s with Istio (port-forward):  ??  BYPASS - 50 requests ? 1 pod (100%)
K8s with Istio (in-mesh):       ? SUCCESS - 50 requests ? 5 pods (20% each)

Insight: Istio provides perfect load balancing when tested correctly. Port-forward testing gives false negatives.

Scenario 4: Client-Side Approaches

DNS round-robin:        ? FAIL - "no children to pick from"
Multi-endpoint client:  ? PARTIAL - Works with manual endpoint management
Advanced connections:   ? FAIL - Still single endpoint limitation

Insight: Client-side solutions are complex, fragile, and limited in Kubernetes environments.

???? Deep Technical Analysis

The DNS Problem (Root Cause)

Our testing revealed the fundamental architectural issue:

# What Kubernetes provides
nslookup grpc-echo-service
? 10.105.177.23 (single ClusterIP)

# What gRPC client-side LB needs  
nslookup grpc-echo-service
? 10.1.4.241, 10.1.4.242, 10.1.4.243, 10.1.4.244, 10.1.4.245 (multiple IPs)

Impact: This single vs. multiple IP difference makes client-side load balancing architecturally impossible with standard Kubernetes Services.

Connection Persistence Evidence

Our advanced client test with 1000 requests, 10 concurrent clients, and 5 connections:

Test Duration: 48ms
Requests/sec: 20,702
Servers Hit: 1 (100%)
Connection Reuse: Perfect (efficient but unbalanced)

Conclusion: Even sophisticated connection management can’t overcome the single-endpoint limitation.

Istio’s L7 Magic

Comparing the same test scenario:

# Without Istio
50 requests ? grpc-echo-server-abc123 (100%)

# With Istio  
50 requests ? 5 different pods (20% each)
Distribution variance: 0.00 (perfect)

Technical Detail: Istio’s Envoy sidecar performs request-level routing, creating independent routing decisions for each gRPC call.

? Performance Impact Analysis

Based on our benchmark results:

ConfigurationReq/sOverheadLoad BalancingProduction Suitable
Local baseline~25,0000%None? Not scalable
K8s standard~22,00012%None? Unbalanced
K8s + Istio~20,00020%Perfect? Recommended
Client-side~23,0008%Complex?? Maintenance burden

Insight: Istio’s 20% performance overhead is a reasonable trade-off for enabling proper load balancing and gaining a production-ready service mesh.


Production Recommendations

For Development Teams:

  1. Standard Kubernetes deployment of gRPC services will not load balance
  2. Istio is the proven solution for production gRPC load balancing
  3. Client-side approaches add complexity without solving the fundamental issue
  4. Testing methodology critically affects results (avoid port-forward for Istio tests)

For Architecture Decisions:

  1. Plan for Istio if deploying multiple gRPC services
  2. Accept the 20% performance cost for operational benefits
  3. Avoid client-side load balancing in Kubernetes environments
  4. Use proper testing practices to validate service mesh behavior

For Production Readiness:

  1. Istio + DestinationRules provide enterprise-grade gRPC load balancing
  2. Monitoring and observability come built-in with Istio
  3. Circuit breaking and retry policies integrate seamlessly
  4. Zero client-side complexity reduces maintenance burden

???? Primary Recommendation: Istio Service Mesh

Our testing proves Istio is the only solution that provides reliable gRPC load balancing in Kubernetes:

# Production-tested DestinationRule configuration
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: grpc-service-production
spec:
  host: grpc-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http2MaxRequests: 1000
        maxRequestsPerConnection: 10  # Tested: Ensures request distribution
        connectTimeout: 30s
    loadBalancer:
      simple: LEAST_REQUEST  # Better than ROUND_ROBIN for varying request costs
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50

Why this configuration works:

  • maxRequestsPerConnection: 10 – Forces connection rotation (tested in our scenario)
  • LEAST_REQUEST – Better performance than round-robin for real workloads
  • outlierDetection – Automatic failure handling (something client-side LB can’t provide)

Expected results based on our testing:

  • ? Perfect 20% distribution across 5 replicas
  • ? ~20% performance overhead (trade-off worth it)
  • ? Built-in observability and monitoring
  • ? Zero client-side complexity

???? Configuration Best Practices

1. Enable Istio Injection Properly

# Enable for entire namespace (recommended)
kubectl label namespace production istio-injection=enabled

# Or per-deployment (more control)
metadata:
  annotations:
    sidecar.istio.io/inject: "true"

2. Validate Load Balancing is Working

# WRONG: This will show false negatives
kubectl port-forward service/grpc-service 50051:50051

# CORRECT: Test from inside the mesh
kubectl run test-client --rm -it --restart=Never \
  --image=your-grpc-client \
  --annotations="sidecar.istio.io/inject=true" \
  -- ./client -target grpc-service:50051 -requests 100

3. Monitor Distribution Quality

# Check Envoy stats for load balancing
kubectl exec deployment/grpc-service -c istio-proxy -- \
  curl localhost:15000/stats | grep upstream_rq_

?? What NOT to Do (Based on Our Test Failures)

1. Don’t Rely on Standard Kubernetes Services

# This WILL NOT load balance gRPC traffic
apiVersion: v1
kind: Service
metadata:
  name: grpc-service
spec:
  ports:
  - port: 50051
  selector:
    app: grpc-server
# Result: 100% traffic to single pod (proven in our tests)

2. Don’t Use Client-Side Load Balancing

// This approach FAILS in Kubernetes (tested and failed)
conn, err := grpc.Dial(
    "dns:///grpc-service:50051",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
// Error: "no children to pick from" (proven in our tests)

3. Don’t Implement Complex Connection Pooling

// This adds complexity without solving the core issue
type LoadBalancedClient struct {
    conns []grpc.ClientConnInterface
    next  int64
}
// Still results in 100% traffic to single endpoint (proven in our tests)

???? Alternative Solutions (If Istio Not Available)

If you absolutely cannot use Istio, here are the only viable alternatives (with significant caveats):

Option 1: External Load Balancer with HTTP/2 Support

# Use nginx/envoy/haproxy outside Kubernetes
apiVersion: v1
kind: Service
metadata:
  name: grpc-service-lb
spec:
  type: LoadBalancer
  ports:
  - port: 50051
    targetPort: 50051

Limitations: Requires external infrastructure, loss of Kubernetes-native benefits

Option 2: Headless Service + Custom Service Discovery

apiVersion: v1
kind: Service
metadata:
  name: grpc-service-headless
spec:
  clusterIP: None  # Headless service
  ports:
  - port: 50051
  selector:
    app: grpc-server

Limitations: Complex client implementation, manual health checking


Conclusion

After testing every possible gRPC load balancing configuration in Kubernetes, the evidence is clear and definitive:

  • Standard Kubernetes + gRPC = Zero load balancing (100% traffic to single pod)
  • The problem is architectural, not implementation
  • Client-side solutions fail due to DNS limitations (“no children to pick from”)
  • Complex workarounds add overhead without solving the core issue

???? Istio is the Proven Solution

The evidence overwhelmingly supports Istio as the production solution:

  • ? Perfect load balancing: 20% distribution across 5 pods (0.00 variance)
  • ? Reasonable overhead: 20% performance cost for complete solution
  • ? Production features: Circuit breaking, retries, observability included
  • ? Zero client complexity: Works transparently with existing gRPC clients

???? Critical Testing Insight

Our testing revealed a major pitfall that leads to incorrect conclusions:

  • kubectl port-forward bypasses Istio ? false negative results
  • Most developers get wrong results when testing Istio + gRPC
  • Always test from inside the service mesh for accurate results

Full test suite and results ?

August 15, 2025

Building Robust Error Handling with gRPC and REST APIs

Filed under: Computing,Web Services — admin @ 2:23 pm

Introduction

Error handling is often an afterthought in API development, yet it’s one of the most critical aspects of a good developer experience. For example, a cryptic error message like { "error": "An error occurred" } can lead to hours of frustrating debugging. In this guide, we will build a robust, production-grade error handling framework for a Go application that serves both gRPC and a REST/HTTP proxy based on industry standards like RFC9457 (Problem Details for HTTP APIs) and RFC7807 (obsoleted).

Tenets

Following are tenets of a great API error:

  1. Structured: machine-readable, not just a string.
  2. Actionable: explains the developer why the error occurred and, if possible, how to fix it.
  3. Consistent: all errors, from validation to authentication to server faults, follow the same format.
  4. Secure: never leaks sensitive internal information like stack traces or database schemas.

Our North Star for HTTP errors will be the Problem Details for HTTP APIs (RFC 9457/7807):

{
  "type": "https://example.com/docs/errors/validation-failed",
  "title": "Validation Failed",
  "status": 400,
  "detail": "The request body failed validation.",
  "instance": "/v1/todos",
  "invalid_params": [
    {
      "field": "title",
      "reason": "must not be empty"
    }
  ]
}

We will adapt this model for gRPC by embedding a similar structure in the gRPC status details, creating a single source of truth for all errors.

API Design

Let’s start by defining our TODO API in Protocol Buffers:

syntax = "proto3";

package todo.v1;

import "google/api/annotations.proto";
import "google/api/field_behavior.proto";
import "google/api/resource.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/field_mask.proto";
import "buf/validate/validate.proto";

option go_package = "github.com/bhatti/todo-api-errors/api/proto/todo/v1;todo";

// TodoService provides task management operations
service TodoService {
  // CreateTask creates a new task
  rpc CreateTask(CreateTaskRequest) returns (Task) {
    option (google.api.http) = {
      post: "/v1/tasks"
      body: "*"
    };
  }

  // GetTask retrieves a specific task
  rpc GetTask(GetTaskRequest) returns (Task) {
    option (google.api.http) = {
      get: "/v1/{name=tasks/*}"
    };
  }

  // ListTasks retrieves all tasks
  rpc ListTasks(ListTasksRequest) returns (ListTasksResponse) {
    option (google.api.http) = {
      get: "/v1/tasks"
    };
  }

  // UpdateTask updates an existing task
  rpc UpdateTask(UpdateTaskRequest) returns (Task) {
    option (google.api.http) = {
      patch: "/v1/{task.name=tasks/*}"
      body: "task"
    };
  }

  // DeleteTask removes a task
  rpc DeleteTask(DeleteTaskRequest) returns (DeleteTaskResponse) {
    option (google.api.http) = {
      delete: "/v1/{name=tasks/*}"
    };
  }

  // BatchCreateTasks creates multiple tasks at once
  rpc BatchCreateTasks(BatchCreateTasksRequest) returns (BatchCreateTasksResponse) {
    option (google.api.http) = {
      post: "/v1/tasks:batchCreate"
      body: "*"
    };
  }
}

// Task represents a TODO item
message Task {
  option (google.api.resource) = {
    type: "todo.example.com/Task"
    pattern: "tasks/{task}"
    singular: "task"
    plural: "tasks"
  };

  // Resource name of the task
  string name = 1 [
    (google.api.field_behavior) = IDENTIFIER,
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // Task title
  string title = 2 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).string = {
      min_len: 1
      max_len: 200
    }
  ];

  // Task description
  string description = 3 [
    (google.api.field_behavior) = OPTIONAL,
    (buf.validate.field).string = {
      max_len: 1000
    }
  ];

  // Task status
  Status status = 4 [
    (google.api.field_behavior) = REQUIRED
  ];

  // Task priority
  Priority priority = 5 [
    (google.api.field_behavior) = OPTIONAL
  ];

  // Due date for the task
  google.protobuf.Timestamp due_date = 6 [
    (google.api.field_behavior) = OPTIONAL,
    (buf.validate.field).timestamp = {
      gt_now: true
    }
  ];

  // Task creation time
  google.protobuf.Timestamp create_time = 7 [
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // Task last update time
  google.protobuf.Timestamp update_time = 8 [
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // User who created the task
  string created_by = 9 [
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // Tags associated with the task
  repeated string tags = 10 [
    (buf.validate.field).repeated = {
      max_items: 10
      items: {
        string: {
          pattern: "^[a-z0-9-]+$"
          max_len: 50
        }
      }
    }
  ];
}

// Task status enumeration
enum Status {
  STATUS_UNSPECIFIED = 0;
  STATUS_PENDING = 1;
  STATUS_IN_PROGRESS = 2;
  STATUS_COMPLETED = 3;
  STATUS_CANCELLED = 4;
}

// Task priority enumeration
enum Priority {
  PRIORITY_UNSPECIFIED = 0;
  PRIORITY_LOW = 1;
  PRIORITY_MEDIUM = 2;
  PRIORITY_HIGH = 3;
  PRIORITY_CRITICAL = 4;
}

// CreateTaskRequest message
message CreateTaskRequest {
  // Task to create
  Task task = 1 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).required = true
  ];
}

// GetTaskRequest message
message GetTaskRequest {
  // Resource name of the task
  string name = 1 [
    (google.api.field_behavior) = REQUIRED,
    (google.api.resource_reference) = {
      type: "todo.example.com/Task"
    },
    (buf.validate.field).string = {
      pattern: "^tasks/[a-zA-Z0-9-]+$"
    }
  ];
}

// ListTasksRequest message
message ListTasksRequest {
  // Maximum number of tasks to return
  int32 page_size = 1 [
    (buf.validate.field).int32 = {
      gte: 0
      lte: 1000
    }
  ];

  // Page token for pagination
  string page_token = 2;

  // Filter expression
  string filter = 3;

  // Order by expression
  string order_by = 4;
}

// ListTasksResponse message
message ListTasksResponse {
  // List of tasks
  repeated Task tasks = 1;

  // Token for next page
  string next_page_token = 2;

  // Total number of tasks
  int32 total_size = 3;
}

// UpdateTaskRequest message
message UpdateTaskRequest {
  // Task to update
  Task task = 1 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).required = true
  ];

  // Fields to update
  google.protobuf.FieldMask update_mask = 2 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).required = true
  ];
}

// DeleteTaskRequest message
message DeleteTaskRequest {
  // Resource name of the task
  string name = 1 [
    (google.api.field_behavior) = REQUIRED,
    (google.api.resource_reference) = {
      type: "todo.example.com/Task"
    }
  ];
}

// DeleteTaskResponse message
message DeleteTaskResponse {
  // Confirmation message
  string message = 1;
}

// BatchCreateTasksRequest message
message BatchCreateTasksRequest {
  // Tasks to create
  repeated CreateTaskRequest requests = 1 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).repeated = {
      min_items: 1
      max_items: 100
    }
  ];
}

// BatchCreateTasksResponse message
message BatchCreateTasksResponse {
  // Created tasks
  repeated Task tasks = 1;
}
syntax = "proto3";

package errors.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

option go_package = "github.com/bhatti/todo-api-errors/api/proto/errors/v1;errors";

// ErrorDetail provides a structured, machine-readable error payload.
// It is designed to be embedded in the `details` field of a `google.rpc.Status` message.
message ErrorDetail {
  // A unique, application-specific error code.
  string code = 1;
  // A short, human-readable summary of the problem type.
  string title = 2;
  // A human-readable explanation specific to this occurrence of the problem.
  string detail = 3;
  // A list of validation errors, useful for INVALID_ARGUMENT responses.
  repeated FieldViolation field_violations = 4;
  // Optional trace ID for request correlation
  string trace_id = 5;
  // Optional timestamp when the error occurred
  google.protobuf.Timestamp timestamp = 6;
  // Optional instance path where the error occurred
  string instance = 7;
  // Optional extensions for additional error context
  map<string, google.protobuf.Any> extensions = 8;
}

// Describes a single validation failure.
message FieldViolation {
  // The path to the field that failed validation, e.g., "title".
  string field = 1;
  // A developer-facing description of the validation rule that failed.
  string description = 2;
  // Application-specific error code for this validation failure
  string code = 3;
}

// AppErrorCode defines a list of standardized, application-specific error codes.
enum AppErrorCode {
  APP_ERROR_CODE_UNSPECIFIED = 0;

  // Validation failures
  VALIDATION_FAILED = 1;
  REQUIRED_FIELD = 2;
  TOO_SHORT = 3;
  TOO_LONG = 4;
  INVALID_FORMAT = 5;
  MUST_BE_FUTURE = 6;
  INVALID_VALUE = 7;
  DUPLICATE_TAG = 8;
  INVALID_TAG_FORMAT = 9;
  OVERDUE_COMPLETION = 10;
  EMPTY_BATCH = 11;
  BATCH_TOO_LARGE = 12;
  DUPLICATE_TITLE = 13;

  // Resource errors
  RESOURCE_NOT_FOUND = 1001;
  RESOURCE_CONFLICT = 1002;

  // Authentication and authorization
  AUTHENTICATION_FAILED = 2001;
  PERMISSION_DENIED = 2002;

  // Rate limiting and service availability
  RATE_LIMIT_EXCEEDED = 3001;
  SERVICE_UNAVAILABLE = 3002;

  // Internal errors
  INTERNAL_ERROR = 9001;
}

Error Handling Implementation

Now let’s implement our error handling framework:

package errors

import (
	"fmt"

	errorspb "github.com/bhatti/todo-api-errors/api/proto/errors/v1"
	"google.golang.org/genproto/googleapis/rpc/errdetails"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/anypb"
	"google.golang.org/protobuf/types/known/timestamppb"
)

// AppError is our custom error type using protobuf definitions.
type AppError struct {
	GRPCCode        codes.Code
	AppCode         errorspb.AppErrorCode
	Title           string
	Detail          string
	FieldViolations []*errorspb.FieldViolation
	TraceID         string
	Instance        string
	Extensions      map[string]*anypb.Any
	CausedBy        error // For internal logging
}

func (e *AppError) Error() string {
	return fmt.Sprintf("gRPC Code: %s, App Code: %s, Title: %s, Detail: %s", e.GRPCCode, e.AppCode, e.Title, e.Detail)
}

// ToGRPCStatus converts our AppError into a gRPC status.Status.
func (e *AppError) ToGRPCStatus() *status.Status {
	st := status.New(e.GRPCCode, e.Title)

	errorDetail := &errorspb.ErrorDetail{
		Code:            e.AppCode.String(),
		Title:           e.Title,
		Detail:          e.Detail,
		FieldViolations: e.FieldViolations,
		TraceId:         e.TraceID,
		Timestamp:       timestamppb.Now(),
		Instance:        e.Instance,
		Extensions:      e.Extensions,
	}

	// For validation errors, we also attach the standard BadRequest detail
	// so that gRPC-Gateway and other standard tools can understand it.
	if e.GRPCCode == codes.InvalidArgument && len(e.FieldViolations) > 0 {
		br := &errdetails.BadRequest{}
		for _, fv := range e.FieldViolations {
			br.FieldViolations = append(br.FieldViolations, &errdetails.BadRequest_FieldViolation{
				Field:       fv.Field,
				Description: fv.Description,
			})
		}
		st, _ = st.WithDetails(br, errorDetail)
		return st
	}

	st, _ = st.WithDetails(errorDetail)
	return st
}

// Helper functions for creating common errors

func NewValidationFailed(violations []*errorspb.FieldViolation, traceID string) *AppError {
	return &AppError{
		GRPCCode:        codes.InvalidArgument,
		AppCode:         errorspb.AppErrorCode_VALIDATION_FAILED,
		Title:           "Validation Failed",
		Detail:          fmt.Sprintf("The request contains %d validation errors", len(violations)),
		FieldViolations: violations,
		TraceID:         traceID,
	}
}

func NewNotFound(resource string, id string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.NotFound,
		AppCode:  errorspb.AppErrorCode_RESOURCE_NOT_FOUND,
		Title:    "Resource Not Found",
		Detail:   fmt.Sprintf("%s with ID '%s' was not found.", resource, id),
		TraceID:  traceID,
	}
}

func NewConflict(resource, reason string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.AlreadyExists,
		AppCode:  errorspb.AppErrorCode_RESOURCE_CONFLICT,
		Title:    "Resource Conflict",
		Detail:   fmt.Sprintf("Conflict creating %s: %s", resource, reason),
		TraceID:  traceID,
	}
}

func NewInternal(message string, traceID string, causedBy error) *AppError {
	return &AppError{
		GRPCCode: codes.Internal,
		AppCode:  errorspb.AppErrorCode_INTERNAL_ERROR,
		Title:    "Internal Server Error",
		Detail:   message,
		TraceID:  traceID,
		CausedBy: causedBy,
	}
}

func NewPermissionDenied(resource, action string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.PermissionDenied,
		AppCode:  errorspb.AppErrorCode_PERMISSION_DENIED,
		Title:    "Permission Denied",
		Detail:   fmt.Sprintf("You don't have permission to %s %s", action, resource),
		TraceID:  traceID,
	}
}

func NewServiceUnavailable(message string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.Unavailable,
		AppCode:  errorspb.AppErrorCode_SERVICE_UNAVAILABLE,
		Title:    "Service Unavailable",
		Detail:   message,
		TraceID:  traceID,
	}
}

func NewRequiredField(field, message string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.InvalidArgument,
		AppCode:  errorspb.AppErrorCode_VALIDATION_FAILED,
		Title:    "Validation Failed",
		Detail:   "The request contains validation errors",
		FieldViolations: []*errorspb.FieldViolation{
			{
				Field:       field,
				Code:        errorspb.AppErrorCode_REQUIRED_FIELD.String(),
				Description: message,
			},
		},
		TraceID: traceID,
	}
}

Validation Framework

Let’s implement validation that returns all errors at once:

package validation

import (
	"errors"
	"fmt"
	"regexp"
	"strings"

	"buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go/buf/validate"
	"buf.build/go/protovalidate"
	errorspb "github.com/bhatti/todo-api-errors/api/proto/errors/v1"
	todopb "github.com/bhatti/todo-api-errors/api/proto/todo/v1"
	apperrors "github.com/bhatti/todo-api-errors/internal/errors"
	"google.golang.org/protobuf/proto"
)

var pv protovalidate.Validator

func init() {
	var err error
	pv, err = protovalidate.New()
	if err != nil {
		panic(fmt.Sprintf("failed to initialize protovalidator: %v", err))
	}
}

// ValidateRequest checks a proto message and returns an AppError with all violations.
func ValidateRequest(req proto.Message, traceID string) error {
	if err := pv.Validate(req); err != nil {
		var validationErrs *protovalidate.ValidationError
		if errors.As(err, &validationErrs) {
			var violations []*errorspb.FieldViolation
			for _, violation := range validationErrs.Violations {
				fieldPath := ""
				if violation.Proto.GetField() != nil {
					fieldPath = formatFieldPath(violation.Proto.GetField())
				}

				ruleId := violation.Proto.GetRuleId()
				message := violation.Proto.GetMessage()

				violations = append(violations, &errorspb.FieldViolation{
					Field:       fieldPath,
					Description: message,
					Code:        mapConstraintToCode(ruleId),
				})
			}
			return apperrors.NewValidationFailed(violations, traceID)
		}
		return apperrors.NewInternal("Validation failed", traceID, err)
	}
	return nil
}

// ValidateTask performs additional business logic validation
func ValidateTask(task *todopb.Task, traceID string) error {
	var violations []*errorspb.FieldViolation

	// Proto validation first
	if err := ValidateRequest(task, traceID); err != nil {
		if appErr, ok := err.(*apperrors.AppError); ok {
			violations = append(violations, appErr.FieldViolations...)
		}
	}

	// Additional business rules
	if task.Status == todopb.Status_STATUS_COMPLETED && task.DueDate != nil {
		if task.UpdateTime != nil && task.UpdateTime.AsTime().After(task.DueDate.AsTime()) {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       "due_date",
				Code:        errorspb.AppErrorCode_OVERDUE_COMPLETION.String(),
				Description: "Task was completed after the due date",
			})
		}
	}

	// Validate tags format
	for i, tag := range task.Tags {
		if !isValidTag(tag) {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       fmt.Sprintf("tags[%d]", i),
				Code:        errorspb.AppErrorCode_INVALID_TAG_FORMAT.String(),
				Description: fmt.Sprintf("Tag '%s' must be lowercase letters, numbers, and hyphens only", tag),
			})
		}
	}

	// Check for duplicate tags
	tagMap := make(map[string]bool)
	for i, tag := range task.Tags {
		if tagMap[tag] {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       fmt.Sprintf("tags[%d]", i),
				Code:        errorspb.AppErrorCode_DUPLICATE_TAG.String(),
				Description: fmt.Sprintf("Tag '%s' appears multiple times", tag),
			})
		}
		tagMap[tag] = true
	}

	if len(violations) > 0 {
		return apperrors.NewValidationFailed(violations, traceID)
	}

	return nil
}

// ValidateBatchCreateTasks validates batch operations
func ValidateBatchCreateTasks(req *todopb.BatchCreateTasksRequest, traceID string) error {
	var violations []*errorspb.FieldViolation

	// Check batch size
	if len(req.Requests) == 0 {
		violations = append(violations, &errorspb.FieldViolation{
			Field:       "requests",
			Code:        errorspb.AppErrorCode_EMPTY_BATCH.String(),
			Description: "Batch must contain at least one task",
		})
	}

	if len(req.Requests) > 100 {
		violations = append(violations, &errorspb.FieldViolation{
			Field:       "requests",
			Code:        errorspb.AppErrorCode_BATCH_TOO_LARGE.String(),
			Description: fmt.Sprintf("Batch size %d exceeds maximum of 100", len(req.Requests)),
		})
	}

	// Validate each task
	for i, createReq := range req.Requests {
		if createReq.Task == nil {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       fmt.Sprintf("requests[%d].task", i),
				Code:        errorspb.AppErrorCode_REQUIRED_FIELD.String(),
				Description: "Task is required",
			})
			continue
		}

		// Validate task
		if err := ValidateTask(createReq.Task, traceID); err != nil {
			if appErr, ok := err.(*apperrors.AppError); ok {
				for _, violation := range appErr.FieldViolations {
					violation.Field = fmt.Sprintf("requests[%d].task.%s", i, violation.Field)
					violations = append(violations, violation)
				}
			}
		}
	}

	// Check for duplicate titles
	titleMap := make(map[string][]int)
	for i, createReq := range req.Requests {
		if createReq.Task != nil && createReq.Task.Title != "" {
			titleMap[createReq.Task.Title] = append(titleMap[createReq.Task.Title], i)
		}
	}

	for title, indices := range titleMap {
		if len(indices) > 1 {
			for _, idx := range indices {
				violations = append(violations, &errorspb.FieldViolation{
					Field:       fmt.Sprintf("requests[%d].task.title", idx),
					Code:        errorspb.AppErrorCode_DUPLICATE_TITLE.String(),
					Description: fmt.Sprintf("Title '%s' is used by multiple tasks in the batch", title),
				})
			}
		}
	}

	if len(violations) > 0 {
		return apperrors.NewValidationFailed(violations, traceID)
	}

	return nil
}

// Helper functions
func formatFieldPath(fieldPath *validate.FieldPath) string {
	if fieldPath == nil {
		return ""
	}

	// Build field path from elements
	var parts []string
	for _, element := range fieldPath.GetElements() {
		if element.GetFieldName() != "" {
			parts = append(parts, element.GetFieldName())
		} else if element.GetFieldNumber() != 0 {
			parts = append(parts, fmt.Sprintf("field_%d", element.GetFieldNumber()))
		}
	}

	return strings.Join(parts, ".")
}

func mapConstraintToCode(ruleId string) string {
	switch {
	case strings.Contains(ruleId, "required"):
		return errorspb.AppErrorCode_REQUIRED_FIELD.String()
	case strings.Contains(ruleId, "min_len"):
		return errorspb.AppErrorCode_TOO_SHORT.String()
	case strings.Contains(ruleId, "max_len"):
		return errorspb.AppErrorCode_TOO_LONG.String()
	case strings.Contains(ruleId, "pattern"):
		return errorspb.AppErrorCode_INVALID_FORMAT.String()
	case strings.Contains(ruleId, "gt_now"):
		return errorspb.AppErrorCode_MUST_BE_FUTURE.String()
	case ruleId == "":
		return errorspb.AppErrorCode_VALIDATION_FAILED.String()
	default:
		return errorspb.AppErrorCode_INVALID_VALUE.String()
	}
}

var validTagPattern = regexp.MustCompile(`^[a-z0-9-]+$`)

func isValidTag(tag string) bool {
	return len(tag) <= 50 && validTagPattern.MatchString(tag)
}

Error Handler Middleware

Now let’s create middleware to handle errors consistently:

package middleware

import (
	"context"
	"errors"
	"log"

	apperrors "github.com/bhatti/todo-api-errors/internal/errors"
	"google.golang.org/grpc"
	"google.golang.org/grpc/status"
)

// UnaryErrorInterceptor translates application errors into gRPC statuses.
func UnaryErrorInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	resp, err := handler(ctx, req)
	if err == nil {
		return resp, nil
	}

	var appErr *apperrors.AppError
	if errors.As(err, &appErr) {
		if appErr.CausedBy != nil {
			log.Printf("ERROR: %s, Original cause: %v", appErr.Title, appErr.CausedBy)
		}
		return nil, appErr.ToGRPCStatus().Err()
	}

	if _, ok := status.FromError(err); ok {
		return nil, err // Already a gRPC status
	}

	log.Printf("UNEXPECTED ERROR: %v", err)
	return nil, apperrors.NewInternal("An unexpected error occurred", "", err).ToGRPCStatus().Err()
}
package middleware

import (
	"context"
	"encoding/json"
	"net/http"
	"runtime/debug"
	"time"

	errorspb "github.com/bhatti/todo-api-errors/api/proto/errors/v1"
	apperrors "github.com/bhatti/todo-api-errors/internal/errors"
	"github.com/google/uuid"
	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/encoding/protojson"
)

// HTTPErrorHandler handles errors for HTTP endpoints
func HTTPErrorHandler(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Add trace ID to context
		traceID := r.Header.Get("X-Trace-ID")
		if traceID == "" {
			traceID = uuid.New().String()
		}
		ctx := context.WithValue(r.Context(), "traceID", traceID)
		r = r.WithContext(ctx)

		// Create response wrapper to intercept errors
		wrapped := &responseWriter{
			ResponseWriter: w,
			request:        r,
			traceID:        traceID,
		}

		// Handle panics
		defer func() {
			if err := recover(); err != nil {
				handlePanic(wrapped, err)
			}
		}()

		// Process request
		next.ServeHTTP(wrapped, r)
	})
}

// responseWriter wraps http.ResponseWriter to intercept errors
type responseWriter struct {
	http.ResponseWriter
	request    *http.Request
	traceID    string
	statusCode int
	written    bool
}

func (w *responseWriter) WriteHeader(code int) {
	if !w.written {
		w.statusCode = code
		w.ResponseWriter.WriteHeader(code)
		w.written = true
	}
}

func (w *responseWriter) Write(b []byte) (int, error) {
	if !w.written {
		w.WriteHeader(http.StatusOK)
	}
	return w.ResponseWriter.Write(b)
}

// handlePanic converts panics to proper error responses
func handlePanic(w *responseWriter, recovered interface{}) {
	// Log stack trace
	debug.PrintStack()

	appErr := apperrors.NewInternal("An unexpected error occurred. Please try again later.", w.traceID, nil)
	writeErrorResponse(w, appErr)
}

// CustomHTTPError handles gRPC gateway error responses
func CustomHTTPError(ctx context.Context, mux *runtime.ServeMux,
	marshaler runtime.Marshaler, w http.ResponseWriter, r *http.Request, err error) {

	// Extract trace ID
	traceID := r.Header.Get("X-Trace-ID")
	if traceID == "" {
		if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
			traceID = span.SpanContext().TraceID().String()
		} else {
			traceID = uuid.New().String()
		}
	}

	// Convert gRPC error to HTTP response
	st, _ := status.FromError(err)

	// Check if we have our custom error detail in status details
	for _, detail := range st.Details() {
		if errorDetail, ok := detail.(*errorspb.ErrorDetail); ok {
			// Update the error detail with current request context
			errorDetail.TraceId = traceID
			errorDetail.Instance = r.URL.Path

			// Convert to JSON and write response
			w.Header().Set("Content-Type", "application/problem+json")
			w.WriteHeader(runtime.HTTPStatusFromCode(st.Code()))

			// Create a simplified JSON response that matches RFC 7807
			response := map[string]interface{}{
				"type":      getTypeForCode(errorDetail.Code),
				"title":     errorDetail.Title,
				"status":    runtime.HTTPStatusFromCode(st.Code()),
				"detail":    errorDetail.Detail,
				"instance":  errorDetail.Instance,
				"traceId":   errorDetail.TraceId,
				"timestamp": errorDetail.Timestamp,
			}

			// Add field violations if present
			if len(errorDetail.FieldViolations) > 0 {
				violations := make([]map[string]interface{}, len(errorDetail.FieldViolations))
				for i, fv := range errorDetail.FieldViolations {
					violations[i] = map[string]interface{}{
						"field":   fv.Field,
						"code":    fv.Code,
						"message": fv.Description,
					}
				}
				response["errors"] = violations
			}

			// Add extensions if present
			if len(errorDetail.Extensions) > 0 {
				extensions := make(map[string]interface{})
				for k, v := range errorDetail.Extensions {
					// Convert Any to JSON
					if jsonBytes, err := protojson.Marshal(v); err == nil {
						var jsonData interface{}
						if err := json.Unmarshal(jsonBytes, &jsonData); err == nil {
							extensions[k] = jsonData
						}
					}
				}
				if len(extensions) > 0 {
					response["extensions"] = extensions
				}
			}

			if err := json.NewEncoder(w).Encode(response); err != nil {
				http.Error(w, `{"error": "Failed to encode error response"}`, 500)
			}
			return
		}
	}

	// Fallback: create new error response
	fallbackErr := apperrors.NewInternal(st.Message(), traceID, nil)
	fallbackErr.GRPCCode = st.Code()
	writeAppErrorResponse(w, fallbackErr, r.URL.Path)
}

// Helper functions
func getTypeForCode(code string) string {
	switch code {
	case errorspb.AppErrorCode_VALIDATION_FAILED.String():
		return "https://api.example.com/errors/validation-failed"
	case errorspb.AppErrorCode_RESOURCE_NOT_FOUND.String():
		return "https://api.example.com/errors/resource-not-found"
	case errorspb.AppErrorCode_RESOURCE_CONFLICT.String():
		return "https://api.example.com/errors/resource-conflict"
	case errorspb.AppErrorCode_PERMISSION_DENIED.String():
		return "https://api.example.com/errors/permission-denied"
	case errorspb.AppErrorCode_INTERNAL_ERROR.String():
		return "https://api.example.com/errors/internal-error"
	case errorspb.AppErrorCode_SERVICE_UNAVAILABLE.String():
		return "https://api.example.com/errors/service-unavailable"
	default:
		return "https://api.example.com/errors/unknown"
	}
}

func writeErrorResponse(w http.ResponseWriter, err error) {
	if appErr, ok := err.(*apperrors.AppError); ok {
		writeAppErrorResponse(w, appErr, "")
	} else {
		http.Error(w, err.Error(), http.StatusInternalServerError)
	}
}

func writeAppErrorResponse(w http.ResponseWriter, appErr *apperrors.AppError, instance string) {
	statusCode := runtime.HTTPStatusFromCode(appErr.GRPCCode)

	response := map[string]interface{}{
		"type":      getTypeForCode(appErr.AppCode.String()),
		"title":     appErr.Title,
		"status":    statusCode,
		"detail":    appErr.Detail,
		"traceId":   appErr.TraceID,
		"timestamp": time.Now(),
	}

	if instance != "" {
		response["instance"] = instance
	}

	if len(appErr.FieldViolations) > 0 {
		violations := make([]map[string]interface{}, len(appErr.FieldViolations))
		for i, fv := range appErr.FieldViolations {
			violations[i] = map[string]interface{}{
				"field":   fv.Field,
				"code":    fv.Code,
				"message": fv.Description,
			}
		}
		response["errors"] = violations
	}

	w.Header().Set("Content-Type", "application/problem+json")
	w.WriteHeader(statusCode)
	json.NewEncoder(w).Encode(response)
}

Service Implementation

Now let’s implement our TODO service with proper error handling:

package service

import (
	"context"
	"fmt"
	todopb "github.com/bhatti/todo-api-errors/api/proto/todo/v1"
	"github.com/bhatti/todo-api-errors/internal/errors"
	"github.com/bhatti/todo-api-errors/internal/repository"
	"github.com/bhatti/todo-api-errors/internal/validation"
	"github.com/google/uuid"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	"strings"
)

var tracer = otel.Tracer("todo-service")

// TodoService implements the TODO API
type TodoService struct {
	todopb.UnimplementedTodoServiceServer
	repo repository.TodoRepository
}

// NewTodoService creates a new TODO service
func NewTodoService(repo repository.TodoRepository) (*TodoService, error) {
	return &TodoService{
		repo: repo,
	}, nil
}

// CreateTask creates a new task
func (s *TodoService) CreateTask(ctx context.Context, req *todopb.CreateTaskRequest) (*todopb.Task, error) {
	ctx, span := tracer.Start(ctx, "CreateTask")
	defer span.End()

	// Get trace ID for error responses
	traceID := span.SpanContext().TraceID().String()

	// Validate request
	if req.Task == nil {
		return nil, errors.NewRequiredField("task", "Task object is required", traceID)
	}

	// Validate task fields using the new validation package
	if err := validation.ValidateTask(req.Task, traceID); err != nil {
		span.SetAttributes(attribute.String("validation.error", err.Error()))
		return nil, err
	}

	// Check for duplicate title
	existing, err := s.repo.GetTaskByTitle(ctx, req.Task.Title)
	if err != nil && !repository.IsNotFound(err) {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	if existing != nil {
		return nil, errors.NewConflict("task", "A task with this title already exists", traceID)
	}

	// Generate task ID
	taskID := uuid.New().String()
	task := &todopb.Task{
		Name:        fmt.Sprintf("tasks/%s", taskID),
		Title:       req.Task.Title,
		Description: req.Task.Description,
		Status:      req.Task.Status,
		Priority:    req.Task.Priority,
		DueDate:     req.Task.DueDate,
		Tags:        req.Task.Tags,
		CreateTime:  timestamppb.Now(),
		UpdateTime:  timestamppb.Now(),
		CreatedBy:   s.getUserFromContext(ctx),
	}

	// Set defaults
	if task.Status == todopb.Status_STATUS_UNSPECIFIED {
		task.Status = todopb.Status_STATUS_PENDING
	}
	if task.Priority == todopb.Priority_PRIORITY_UNSPECIFIED {
		task.Priority = todopb.Priority_PRIORITY_MEDIUM
	}

	// Save to repository
	if err := s.repo.CreateTask(ctx, task); err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	span.SetAttributes(
		attribute.String("task.id", taskID),
		attribute.String("task.title", task.Title),
	)

	return task, nil
}

// GetTask retrieves a specific task
func (s *TodoService) GetTask(ctx context.Context, req *todopb.GetTaskRequest) (*todopb.Task, error) {
	ctx, span := tracer.Start(ctx, "GetTask")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request using the new validation package
	if err := validation.ValidateRequest(req, traceID); err != nil {
		return nil, err
	}

	// Extract task ID
	parts := strings.Split(req.Name, "/")
	if len(parts) != 2 || parts[0] != "tasks" {
		return nil, errors.NewRequiredField("name", "Task name must be in format 'tasks/{id}'", traceID)
	}

	taskID := parts[1]
	span.SetAttributes(attribute.String("task.id", taskID))

	// Get from repository
	task, err := s.repo.GetTask(ctx, taskID)
	if err != nil {
		if repository.IsNotFound(err) {
			return nil, errors.NewNotFound("Task", taskID, traceID)
		}
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Check permissions
	if !s.canAccessTask(ctx, task) {
		return nil, errors.NewPermissionDenied("task", "read", traceID)
	}

	return task, nil
}

// ListTasks retrieves all tasks
func (s *TodoService) ListTasks(ctx context.Context, req *todopb.ListTasksRequest) (*todopb.ListTasksResponse, error) {
	ctx, span := tracer.Start(ctx, "ListTasks")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request using the new validation package
	if err := validation.ValidateRequest(req, traceID); err != nil {
		return nil, err
	}

	// Default page size
	pageSize := req.PageSize
	if pageSize == 0 {
		pageSize = 50
	}
	if pageSize > 1000 {
		pageSize = 1000
	}

	span.SetAttributes(
		attribute.Int("page.size", int(pageSize)),
		attribute.String("filter", req.Filter),
	)

	// Parse filter
	filter, err := s.parseFilter(req.Filter)
	if err != nil {
		return nil, errors.NewRequiredField("filter", fmt.Sprintf("Failed to parse filter: %v", err), traceID)
	}

	// Get tasks from repository
	tasks, nextPageToken, err := s.repo.ListTasks(ctx, repository.ListOptions{
		PageSize:  int(pageSize),
		PageToken: req.PageToken,
		Filter:    filter,
		OrderBy:   req.OrderBy,
		UserID:    s.getUserFromContext(ctx),
	})

	if err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Get total count
	totalSize, err := s.repo.CountTasks(ctx, filter, s.getUserFromContext(ctx))
	if err != nil {
		// Log but don't fail the request
		span.RecordError(err)
		totalSize = -1
	}

	return &todopb.ListTasksResponse{
		Tasks:         tasks,
		NextPageToken: nextPageToken,
		TotalSize:     int32(totalSize),
	}, nil
}

// UpdateTask updates an existing task
func (s *TodoService) UpdateTask(ctx context.Context, req *todopb.UpdateTaskRequest) (*todopb.Task, error) {
	ctx, span := tracer.Start(ctx, "UpdateTask")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request
	if req.Task == nil {
		return nil, errors.NewRequiredField("task", "Task object is required", traceID)
	}

	if req.UpdateMask == nil || len(req.UpdateMask.Paths) == 0 {
		return nil, errors.NewRequiredField("update_mask", "Update mask must specify which fields to update", traceID)
	}

	// Extract task ID
	parts := strings.Split(req.Task.Name, "/")
	if len(parts) != 2 || parts[0] != "tasks" {
		return nil, errors.NewRequiredField("task.name", "Invalid task name format", traceID)
	}

	taskID := parts[1]
	span.SetAttributes(attribute.String("task.id", taskID))

	// Get existing task
	existing, err := s.repo.GetTask(ctx, taskID)
	if err != nil {
		if repository.IsNotFound(err) {
			return nil, errors.NewNotFound("Task", taskID, traceID)
		}
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Check permissions
	if !s.canModifyTask(ctx, existing) {
		return nil, errors.NewPermissionDenied("task", "update", traceID)
	}

	// Apply updates based on field mask
	updated := s.applyFieldMask(existing, req.Task, req.UpdateMask)
	updated.UpdateTime = timestamppb.Now()

	// Validate updated task using the new validation package
	if err := validation.ValidateTask(updated, traceID); err != nil {
		return nil, err
	}

	// Save to repository
	if err := s.repo.UpdateTask(ctx, updated); err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	return updated, nil
}

// DeleteTask removes a task
func (s *TodoService) DeleteTask(ctx context.Context, req *todopb.DeleteTaskRequest) (*todopb.DeleteTaskResponse, error) {
	ctx, span := tracer.Start(ctx, "DeleteTask")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request using the new validation package
	if err := validation.ValidateRequest(req, traceID); err != nil {
		return nil, err
	}

	// Extract task ID
	parts := strings.Split(req.Name, "/")
	if len(parts) != 2 || parts[0] != "tasks" {
		return nil, errors.NewRequiredField("name", "Invalid task name format", traceID)
	}

	taskID := parts[1]
	span.SetAttributes(attribute.String("task.id", taskID))

	// Get existing task to check permissions
	existing, err := s.repo.GetTask(ctx, taskID)
	if err != nil {
		if repository.IsNotFound(err) {
			return nil, errors.NewNotFound("Task", taskID, traceID)
		}
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Check permissions
	if !s.canModifyTask(ctx, existing) {
		return nil, errors.NewPermissionDenied("task", "delete", traceID)
	}

	// Delete from repository
	if err := s.repo.DeleteTask(ctx, taskID); err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	return &todopb.DeleteTaskResponse{
		Message: fmt.Sprintf("Task %s deleted successfully", req.Name),
	}, nil
}

// BatchCreateTasks creates multiple tasks at once
func (s *TodoService) BatchCreateTasks(ctx context.Context, req *todopb.BatchCreateTasksRequest) (*todopb.BatchCreateTasksResponse, error) {
	ctx, span := tracer.Start(ctx, "BatchCreateTasks")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate batch request using the new validation package
	if err := validation.ValidateBatchCreateTasks(req, traceID); err != nil {
		span.SetAttributes(attribute.String("validation.error", err.Error()))
		return nil, err
	}

	// Process each task
	var created []*todopb.Task
	var batchErrors []string

	for i, createReq := range req.Requests {
		task, err := s.CreateTask(ctx, createReq)
		if err != nil {
			// Collect errors for batch response
			batchErrors = append(batchErrors, fmt.Sprintf("Task %d: %s", i, err.Error()))
			continue
		}
		created = append(created, task)
	}

	// If all tasks failed, return error
	if len(created) == 0 && len(batchErrors) > 0 {
		return nil, errors.NewInternal("All batch operations failed", traceID, nil)
	}

	// Return partial success
	response := &todopb.BatchCreateTasksResponse{
		Tasks: created,
	}

	// Add partial errors to response metadata if any
	if len(batchErrors) > 0 {
		span.SetAttributes(
			attribute.Int("batch.total", len(req.Requests)),
			attribute.Int("batch.success", len(created)),
			attribute.Int("batch.failed", len(batchErrors)),
		)
	}

	return response, nil
}

// Helper methods

func (s *TodoService) handleRepositoryError(err error, traceID string) error {
	if repository.IsConnectionError(err) {
		return errors.NewServiceUnavailable("Unable to connect to the database. Please try again later.", traceID)
	}

	// Log internal error details
	span := trace.SpanFromContext(context.Background())
	if span != nil {
		span.RecordError(err)
	}

	return errors.NewInternal("An unexpected error occurred while processing your request", traceID, err)
}

func (s *TodoService) getUserFromContext(ctx context.Context) string {
	// In a real implementation, this would extract user info from auth context
	if user, ok := ctx.Value("user").(string); ok {
		return user
	}
	return "anonymous"
}

func (s *TodoService) canAccessTask(ctx context.Context, task *todopb.Task) bool {
	// In a real implementation, check if user can access this task
	user := s.getUserFromContext(ctx)
	return user == task.CreatedBy || user == "admin"
}

func (s *TodoService) canModifyTask(ctx context.Context, task *todopb.Task) bool {
	// In a real implementation, check if user can modify this task
	user := s.getUserFromContext(ctx)
	return user == task.CreatedBy || user == "admin"
}

func (s *TodoService) parseFilter(filter string) (map[string]interface{}, error) {
	// Simple filter parser - in production, use a proper parser
	parsed := make(map[string]interface{})

	if filter == "" {
		return parsed, nil
	}

	// Example: "status=COMPLETED AND priority=HIGH"
	parts := strings.Split(filter, " AND ")
	for _, part := range parts {
		kv := strings.Split(strings.TrimSpace(part), "=")
		if len(kv) != 2 {
			return nil, fmt.Errorf("invalid filter expression: %s", part)
		}

		key := strings.TrimSpace(kv[0])
		value := strings.Trim(strings.TrimSpace(kv[1]), "'\"")

		// Validate filter keys
		switch key {
		case "status", "priority", "created_by":
			parsed[key] = value
		default:
			return nil, fmt.Errorf("unknown filter field: %s", key)
		}
	}

	return parsed, nil
}

func (s *TodoService) applyFieldMask(existing, update *todopb.Task, mask *fieldmaskpb.FieldMask) *todopb.Task {
	result := *existing

	for _, path := range mask.Paths {
		switch path {
		case "title":
			result.Title = update.Title
		case "description":
			result.Description = update.Description
		case "status":
			result.Status = update.Status
		case "priority":
			result.Priority = update.Priority
		case "due_date":
			result.DueDate = update.DueDate
		case "tags":
			result.Tags = update.Tags
		}
	}
	return &result
}

Server Implementation

Now let’s put it all together in our server:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	todopb "github.com/bhatti/todo-api-errors/api/proto/todo/v1"
	"github.com/bhatti/todo-api-errors/internal/middleware"
	"github.com/bhatti/todo-api-errors/internal/monitoring"
	"github.com/bhatti/todo-api-errors/internal/repository"
	"github.com/bhatti/todo-api-errors/internal/service"

	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/reflection"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/encoding/protojson"
)

func main() {
	// Initialize monitoring
	if err := monitoring.InitOpenTelemetryMetrics(); err != nil {
		log.Printf("Failed to initialize OpenTelemetry metrics: %v", err)
		// Continue without OpenTelemetry - Prometheus will still work
	}

	// Initialize repository
	repo := repository.NewInMemoryRepository()

	// Initialize service
	todoService, err := service.NewTodoService(repo)
	if err != nil {
		log.Fatalf("Failed to create service: %v", err)
	}

	// Start gRPC server
	grpcPort := ":50051"
	go func() {
		if err := startGRPCServer(grpcPort, todoService); err != nil {
			log.Fatalf("Failed to start gRPC server: %v", err)
		}
	}()

	// Start HTTP gateway
	httpPort := ":8080"
	go func() {
		if err := startHTTPGateway(httpPort, grpcPort); err != nil {
			log.Fatalf("Failed to start HTTP gateway: %v", err)
		}
	}()

	// Start metrics server
	go func() {
		http.Handle("/metrics", promhttp.Handler())
		if err := http.ListenAndServe(":9090", nil); err != nil {
			log.Printf("Failed to start metrics server: %v", err)
		}
	}()

	log.Printf("TODO API server started")
	log.Printf("gRPC server listening on %s", grpcPort)
	log.Printf("HTTP gateway listening on %s", httpPort)
	log.Printf("Metrics available at :9090/metrics")

	// Wait for interrupt signal
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh

	log.Println("Shutting down...")
}

func startGRPCServer(port string, todoService todopb.TodoServiceServer) error {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		return fmt.Errorf("failed to listen: %w", err)
	}

	// Create gRPC server with interceptors - now using the new UnaryErrorInterceptor
	opts := []grpc.ServerOption{
		grpc.ChainUnaryInterceptor(
			middleware.UnaryErrorInterceptor, // Using new protobuf-based error interceptor
			loggingInterceptor(),
			recoveryInterceptor(),
		),
	}

	server := grpc.NewServer(opts...)

	// Register service
	todopb.RegisterTodoServiceServer(server, todoService)

	// Register reflection for debugging
	reflection.Register(server)

	return server.Serve(lis)
}

func startHTTPGateway(httpPort, grpcPort string) error {
	ctx := context.Background()

	// Create gRPC connection
	conn, err := grpc.DialContext(
		ctx,
		"localhost"+grpcPort,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		return fmt.Errorf("failed to dial gRPC server: %w", err)
	}

	// Create gateway mux with custom error handler
	mux := runtime.NewServeMux(
		runtime.WithErrorHandler(middleware.CustomHTTPError), // Using new protobuf-based error handler
		runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
			MarshalOptions: protojson.MarshalOptions{
				UseProtoNames:   true,
				EmitUnpopulated: false,
			},
			UnmarshalOptions: protojson.UnmarshalOptions{
				DiscardUnknown: true,
			},
		}),
	)

	// Register service handler
	if err := todopb.RegisterTodoServiceHandler(ctx, mux, conn); err != nil {
		return fmt.Errorf("failed to register service handler: %w", err)
	}

	// Create HTTP server with middleware
	handler := middleware.HTTPErrorHandler( // Using new protobuf-based HTTP error handler
		corsMiddleware(
			authMiddleware(
				loggingHTTPMiddleware(mux),
			),
		),
	)

	server := &http.Server{
		Addr:         httpPort,
		Handler:      handler,
		ReadTimeout:  10 * time.Second,
		WriteTimeout: 10 * time.Second,
		IdleTimeout:  120 * time.Second,
	}

	return server.ListenAndServe()
}

// Middleware implementations

func loggingInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		start := time.Now()

		// Call handler
		resp, err := handler(ctx, req)

		// Log request
		duration := time.Since(start)
		statusCode := "OK"
		if err != nil {
			statusCode = status.Code(err).String()
		}

		log.Printf("gRPC: %s %s %s %v", info.FullMethod, statusCode, duration, err)

		return resp, err
	}
}

func recoveryInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		defer func() {
			if r := recover(); r != nil {
				log.Printf("Recovered from panic: %v", r)
				monitoring.RecordPanicRecovery(ctx)
				err = status.Error(codes.Internal, "Internal server error")
			}
		}()

		return handler(ctx, req)
	}
}

func loggingHTTPMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		start := time.Now()

		// Wrap response writer to capture status
		wrapped := &statusResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}

		// Process request
		next.ServeHTTP(wrapped, r)

		// Log request
		duration := time.Since(start)
		log.Printf("HTTP: %s %s %d %v", r.Method, r.URL.Path, wrapped.statusCode, duration)
	})
}

func corsMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Access-Control-Allow-Origin", "*")
		w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS, PATCH")
		w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Trace-ID")

		if r.Method == "OPTIONS" {
			w.WriteHeader(http.StatusOK)
			return
		}

		next.ServeHTTP(w, r)
	})
}

func authMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Simple auth for demo - in production use proper authentication
		authHeader := r.Header.Get("Authorization")
		if authHeader == "" {
			authHeader = "Bearer anonymous"
		}

		// Extract user from token
		user := "anonymous"
		if len(authHeader) > 7 && authHeader[:7] == "Bearer " {
			user = authHeader[7:]
		}

		// Add user to context
		ctx := context.WithValue(r.Context(), "user", user)
		next.ServeHTTP(w, r.WithContext(ctx))
	})
}

type statusResponseWriter struct {
	http.ResponseWriter
	statusCode int
}

func (w *statusResponseWriter) WriteHeader(code int) {
	w.statusCode = code
	w.ResponseWriter.WriteHeader(code)
}

Example API Usage

Let’s see our error handling in action with some example requests:

Example 1: Validation Error with Multiple Issues

Request with multiple validation errors

curl -X POST http://localhost:8080/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"task": {
"title": "",
"description": "This description is wayyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy too long…",
"status": "INVALID_STATUS",
"tags": ["INVALID TAG", "tag-1", "tag-1"]
}
}'

Response

< HTTP/1.1 422 Unprocessable Entity
< Content-Type: application/problem+json
{
  "detail": "The request contains 5 validation errors",
  "errors": [
    {
      "code": "TOO_SHORT",
      "field": "title",
      "message": "value length must be at least 1 characters"
    },
    {
      "code": "TOO_LONG",
      "field": "description",
      "message": "value length must be at most 100 characters"
    },
    {
      "code": "INVALID_FORMAT",
      "field": "tags",
      "message": "value does not match regex pattern `^[a-z0-9-]+$`"
    },
    {
      "code": "INVALID_TAG_FORMAT",
      "field": "tags[0]",
      "message": "Tag 'INVALID TAG' must be lowercase letters, numbers, and hyphens only"
    },
    {
      "code": "DUPLICATE_TAG",
      "field": "tags[2]",
      "message": "Tag 'tag-1' appears multiple times"
    }
  ],
  "instance": "/v1/tasks",
  "status": 400,
  "timestamp": {
    "seconds": 1755288524,
    "nanos": 484865000
  },
  "title": "Validation Failed",
  "traceId": "eb4bfb3f-9397-4547-8618-ce9952a16067",
  "type": "https://api.example.com/errors/validation-failed"
}

Example 2: Not Found Error

Request for non-existent task

curl http://localhost:8080/v1/tasks/non-existent-id

Response

< HTTP/1.1 404 Not Found
< Content-Type: application/problem+json
{
  "detail": "Task with ID 'non-existent-id' was not found.",
  "instance": "/v1/tasks/non-existent-id",
  "status": 404,
  "timestamp": {
    "seconds": 1755288565,
    "nanos": 904607000
  },
  "title": "Resource Not Found",
  "traceId": "6ce00cd8-d0b7-47f1-b6f6-9fc1375c26a4",
  "type": "https://api.example.com/errors/resource-not-found"
}

Example 3: Conflict Error

curl -X POST http://localhost:8080/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"task": {
"title": "Existing Task Title"
}
}'

curl -X POST http://localhost:8080/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"task": {
"title": "Existing Task Title"
}
}'

Response

< HTTP/1.1 409 Conflict
< Content-Type: application/problem+json
{
  "detail": "Conflict creating task: A task with this title already exists",
  "instance": "/v1/tasks",
  "status": 409,
  "timestamp": {
    "seconds": 1755288593,
    "nanos": 594458000
  },
  "title": "Resource Conflict",
  "traceId": "ed2e78d2-591d-492a-8d71-6b6843ce86f7",
  "type": "https://api.example.com/errors/resource-conflict"
}

Example 4: Service Unavailable (Transient Error)

When database is down

curl http://localhost:8080/v1/tasks

Response

HTTP/1.1 503 Service Unavailable
Content-Type: application/problem+json
Retry-After: 30
{
  "type": "https://api.example.com/errors/service-unavailable",
  "title": "Service Unavailable",
  "status": 503,
  "detail": "Database connection pool exhausted. Please try again later.",
  "instance": "/v1/tasks",
  "traceId": "db-pool-001",
  "timestamp": "2025-08-15T10:30:00Z",
  "extensions": {
    "retryable": true,
    "retryAfter": "2025-08-15T10:30:30Z",
    "maxRetries": 3,
    "backoffType": "exponential",
    "backoffMs": 1000,
    "errorCategory": "database"
  }
}

Best Practices Summary

Our implementation demonstrates several key best practices:

1. Consistent Error Format

All errors follow RFC 9457 (Problem Details) format, providing:

  • Machine-readable type URIs
  • Human-readable titles and details
  • HTTP status codes
  • Request tracing
  • Extensible metadata

2. Comprehensive Validation

  • All validation errors are returned at once, not one by one
  • Clear field paths for nested objects
  • Descriptive error codes and messages
  • Support for batch operations with partial success

3. Security-Conscious Design

  • No sensitive information in error messages
  • Internal errors are logged but not exposed
  • Generic messages for authentication failures
  • Request IDs for support without exposing internals

4. Developer Experience

  • Clear, actionable error messages
  • Helpful suggestions for fixing issues
  • Consistent error codes across protocols
  • Rich metadata for debugging

5. Protocol Compatibility

  • Seamless translation between gRPC and HTTP
  • Proper status code mapping
  • Preservation of error details across protocols

6. Observability

  • Structured logging with trace IDs
  • Prometheus metrics for monitoring
  • OpenTelemetry integration
  • Error categorization for analysis

Conclusion

This comprehensive guide demonstrates how to build robust error handling for modern APIs. By treating errors as a first-class feature of our API, we’ve achieved several key benefits:

  • Consistency: All errors, regardless of their source, are presented to clients in a predictable format.
  • Clarity: Developers consuming our API get clear, actionable feedback, helping them debug and integrate faster.
  • Developer Ergonomics: Our internal service code is cleaner, as handlers focus on business logic while the middleware handles the boilerplate of error conversion.
  • Security: We have a clear separation between internal error details (for logging) and public error responses, preventing leaks.

Additional Resources

You can find the full source code for this example in this GitHub repository.

July 17, 2025

Zero-Downtime Services with Lifecycle Management on Kubernetes and Istio

Filed under: Computing,Web Services — admin @ 3:12 pm

Introduction

In the world of cloud-native applications, service lifecycle management is often an afterthought—until it causes a production outage. Whether you’re running gRPC or REST APIs on Kubernetes with Istio, proper lifecycle management is the difference between smooth deployments and 3 AM incident calls. Consider these scenarios:

  • Your service takes 45 seconds to warm up its cache, but Kubernetes kills it after 30 seconds of startup wait.
  • During deployments, clients receive connection errors as pods terminate abruptly.
  • A hiccup in a database or dependent service causes your entire service mesh to cascade fail.
  • Your service mesh sidecar shuts down before your application is terminated or drops in-flight requests.
  • A critical service receives SIGKILL during transaction processing, leaving data in inconsistent states.
  • After a regional outage, services restart but data drift goes undetected for hours.
  • Your RTO target is 15 seconds, but services take 30 seconds just to start up properly.

These aren’t edge cases—they’re common problems that proper lifecycle management solves. More critically, unsafe shutdowns can cause data corruption, financial losses, and breach compliance requirements. This guide covers what you need to know about building services that start safely, shut down gracefully, and handle failures intelligently.

The Hidden Complexity of Service Lifecycles

Modern microservices don’t exist in isolation. A typical request might flow through:

Typical Request Flow.

Each layer adds complexity to startup and shutdown sequences. Without proper coordination, you’ll experience:

  • Startup race conditions: Application tries to make network calls before the sidecar proxy is ready
  • Shutdown race conditions: Sidecar terminates while the application is still processing requests
  • Premature traffic: Load balancer routes traffic before the application is truly ready
  • Dropped connections: Abrupt shutdowns leave clients hanging
  • Data corruption: In-flight transactions get interrupted, leaving databases in inconsistent states
  • Compliance violations: Financial services may face regulatory penalties for data integrity failures

Core Concepts: The Three Types of Health Checks

Kubernetes provides three distinct probe types, each serving a specific purpose:

1. Liveness Probe: “Is the process alive?”

  • Detects deadlocks and unrecoverable states
  • Should be fast and simple (e.g., HTTP GET /healthz)
  • Failure triggers container restart
  • Common mistake: Making this check too complex

2. Readiness Probe: “Can the service handle traffic?”

  • Validates all critical dependencies are available
  • Prevents routing traffic to pods that aren’t ready
  • Should perform “deep” checks of dependencies
  • Common mistake: Using the same check as liveness

3. Startup Probe: “Is the application still initializing?”

  • Provides grace period for slow-starting containers
  • Disables liveness/readiness probes until successful
  • Prevents restart loops during initialization
  • Common mistake: Not using it for slow-starting apps

The Hidden Dangers of Unsafe Shutdowns

While graceful shutdown is ideal, it’s not always possible. Kubernetes will send SIGKILL after the termination grace period, and infrastructure failures can terminate pods instantly. This creates serious risks:

Data Corruption Scenarios

Financial Transaction Example:

// DANGEROUS: Non-atomic operation
func (s *PaymentService) ProcessPayment(req *PaymentRequest) error {
    // Step 1: Debit source account
    if err := s.debitAccount(req.FromAccount, req.Amount); err != nil {
        return err
    }
    
    // ???? SIGKILL here leaves money debited but not credited
    // Step 2: Credit destination account  
    if err := s.creditAccount(req.ToAccount, req.Amount); err != nil {
        // Money is lost! Source debited but destination not credited
        return err
    }
    
    // Step 3: Record transaction
    return s.recordTransaction(req)
}

E-commerce Inventory Example:

// DANGEROUS: Race condition during shutdown
func (s *InventoryService) ReserveItem(req *ReserveRequest) error {
    // Check availability
    if s.getStock(req.ItemID) < req.Quantity {
        return ErrInsufficientStock
    }
    
    // ???? SIGKILL here can cause double-reservation
    // Another request might see the same stock level
    
    // Reserve the item
    return s.updateStock(req.ItemID, -req.Quantity)
}

RTO/RPO Impact

Recovery Time Objective (RTO): How quickly can we restore service?

  • Poor lifecycle management increases startup time
  • Services may need manual intervention to reach consistent state
  • Cascading failures extend recovery time across the entire system

Recovery Point Objective (RPO): How much data can we afford to lose?

  • Unsafe shutdowns can corrupt recent transactions
  • Without idempotency, replay of messages may create duplicates
  • Data inconsistencies may not be detected until much later

The Anti-Entropy Solution

Since graceful shutdown isn’t always possible, production systems need reconciliation processes to detect and repair inconsistencies:

// Anti-entropy pattern for data consistency
type ReconciliationService struct {
    paymentDB    PaymentDatabase
    accountDB    AccountDatabase
    auditLog     AuditLogger
    alerting     AlertingService
}

func (r *ReconciliationService) ReconcilePayments(ctx context.Context) error {
    // Find payments without matching account entries
    orphanedPayments, err := r.paymentDB.FindOrphanedPayments(ctx)
    if err != nil {
        return err
    }
    
    for _, payment := range orphanedPayments {
        // Check if this was a partial transaction
        sourceDebit, _ := r.accountDB.GetTransaction(payment.FromAccount, payment.ID)
        destCredit, _ := r.accountDB.GetTransaction(payment.ToAccount, payment.ID)
        
        switch {
        case sourceDebit != nil && destCredit == nil:
            // Complete the transaction
            if err := r.creditAccount(payment.ToAccount, payment.Amount); err != nil {
                r.alerting.SendAlert("Failed to complete orphaned payment", payment.ID)
                continue
            }
            r.auditLog.RecordReconciliation("completed_payment", payment.ID)
            
        case sourceDebit == nil && destCredit != nil:
            // Reverse the credit
            if err := r.debitAccount(payment.ToAccount, payment.Amount); err != nil {
                r.alerting.SendAlert("Failed to reverse orphaned credit", payment.ID)
                continue
            }
            r.auditLog.RecordReconciliation("reversed_credit", payment.ID)
            
        default:
            // Both or neither exist - needs investigation
            r.alerting.SendAlert("Ambiguous payment state", payment.ID)
        }
    }
    
    return nil
}

// Run reconciliation periodically
func (r *ReconciliationService) Start(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.ReconcilePayments(ctx); err != nil {
                log.Printf("Reconciliation failed: %v", err)
            }
        }
    }
}

Building a Resilient Service: Complete Example

Let’s build a production-ready service that demonstrates all best practices. We’ll create two versions: one with anti-patterns (bad-service) and one with best practices (good-service).

Sequence diagram of a typical API with proper Kubernetes and Istio configuration.

The Application Code

//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative api/demo.proto

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "net"
    "net/http"
    "os"
    "os/signal"
    "sync/atomic"
    "syscall"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    health "google.golang.org/grpc/health/grpc_health_v1"
    "google.golang.org/grpc/status"
)

// Service represents our application with health state
type Service struct {
    isHealthy         atomic.Bool
    isShuttingDown    atomic.Bool
    activeRequests    atomic.Int64
    dependencyHealthy atomic.Bool
}

// HealthChecker implements the gRPC health checking protocol
type HealthChecker struct {
    svc *Service
}

func (h *HealthChecker) Check(ctx context.Context, req *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
    service := req.GetService()
    
    // Liveness: Simple check - is the process responsive?
    if service == "" || service == "liveness" {
        if h.svc.isShuttingDown.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        return &health.HealthCheckResponse{
            Status: health.HealthCheckResponse_SERVING,
        }, nil
    }
    
    // Readiness: Deep check - can we handle traffic?
    if service == "readiness" {
        // Check application health
        if !h.svc.isHealthy.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        
        // Check critical dependencies
        if !h.svc.dependencyHealthy.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        
        // Check if shutting down
        if h.svc.isShuttingDown.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        
        return &health.HealthCheckResponse{
            Status: health.HealthCheckResponse_SERVING,
        }, nil
    }
    
    // Synthetic readiness: Complex business logic check for monitoring
    if service == "synthetic-readiness" {
        // Simulate a complex health check that validates business logic
        // This would make actual API calls, database queries, etc.
        if !h.performSyntheticCheck(ctx) {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        return &health.HealthCheckResponse{
            Status: health.HealthCheckResponse_SERVING,
        }, nil
    }
    
    return nil, status.Errorf(codes.NotFound, "unknown service: %s", service)
}

func (h *HealthChecker) performSyntheticCheck(ctx context.Context) bool {
    // In a real service, this would:
    // 1. Create a test transaction
    // 2. Query the database
    // 3. Call dependent services
    // 4. Validate the complete flow works
    return h.svc.isHealthy.Load() && h.svc.dependencyHealthy.Load()
}

func (h *HealthChecker) Watch(req *health.HealthCheckRequest, server health.Health_WatchServer) error {
    return status.Error(codes.Unimplemented, "watch not implemented")
}

// DemoServiceServer implements your business logic
type DemoServiceServer struct {
    UnimplementedDemoServiceServer
    svc *Service
}

func (s *DemoServiceServer) ProcessRequest(ctx context.Context, req *ProcessRequest) (*ProcessResponse, error) {
    s.svc.activeRequests.Add(1)
    defer s.svc.activeRequests.Add(-1)
    
    // Simulate processing
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case <-time.After(100 * time.Millisecond):
        return &ProcessResponse{
            Result: fmt.Sprintf("Processed: %s", req.GetData()),
        }, nil
    }
}

func main() {
    var (
        port         = flag.Int("port", 8080, "gRPC port")
        mgmtPort     = flag.Int("mgmt-port", 8090, "Management port")
        startupDelay = flag.Duration("startup-delay", 10*time.Second, "Startup delay")
    )
    flag.Parse()
    
    svc := &Service{}
    svc.dependencyHealthy.Store(true) // Assume healthy initially
    
    // Management endpoints for testing
    mux := http.NewServeMux()
    mux.HandleFunc("/toggle-health", func(w http.ResponseWriter, r *http.Request) {
        current := svc.dependencyHealthy.Load()
        svc.dependencyHealthy.Store(!current)
        fmt.Fprintf(w, "Dependency health toggled to: %v\n", !current)
    })
    mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintf(w, "active_requests %d\n", svc.activeRequests.Load())
        fmt.Fprintf(w, "is_healthy %v\n", svc.isHealthy.Load())
        fmt.Fprintf(w, "is_shutting_down %v\n", svc.isShuttingDown.Load())
    })
    
    mgmtServer := &http.Server{
        Addr:    fmt.Sprintf(":%d", *mgmtPort),
        Handler: mux,
    }
    
    // Start management server
    go func() {
        log.Printf("Management server listening on :%d", *mgmtPort)
        if err := mgmtServer.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatalf("Management server failed: %v", err)
        }
    }()
    
    // Simulate slow startup
    log.Printf("Starting application (startup delay: %v)...", *startupDelay)
    time.Sleep(*startupDelay)
    svc.isHealthy.Store(true)
    log.Println("Application initialized and ready")
    
    // Setup gRPC server
    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    
    grpcServer := grpc.NewServer()
    RegisterDemoServiceServer(grpcServer, &DemoServiceServer{svc: svc})
    health.RegisterHealthServer(grpcServer, &HealthChecker{svc: svc})
    
    // Start gRPC server
    go func() {
        log.Printf("gRPC server listening on :%d", *port)
        if err := grpcServer.Serve(lis); err != nil {
            log.Fatalf("gRPC server failed: %v", err)
        }
    }()
    
    // Wait for shutdown signal
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    sig := <-sigCh
    
    log.Printf("Received signal: %v, starting graceful shutdown...", sig)
    
    // Graceful shutdown sequence
    svc.isShuttingDown.Store(true)
    svc.isHealthy.Store(false) // Fail readiness immediately
    
    // Stop accepting new requests
    grpcServer.GracefulStop()
    
    // Wait for active requests to complete
    timeout := time.After(30 * time.Second)
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-timeout:
            log.Println("Shutdown timeout reached, forcing exit")
            os.Exit(1)
        case <-ticker.C:
            active := svc.activeRequests.Load()
            if active == 0 {
                log.Println("All requests completed")
                goto shutdown
            }
            log.Printf("Waiting for %d active requests to complete...", active)
        }
    }
    
shutdown:
    // Cleanup
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    mgmtServer.Shutdown(ctx)
    
    log.Println("Graceful shutdown complete")
}

Kubernetes Manifests: Anti-Patterns vs Best Practices

Bad Service (Anti-Patterns)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: bad-service
  namespace: demo
spec:
  replicas: 2
  selector:
    matchLabels:
      app: bad-service
  template:
    metadata:
      labels:
        app: bad-service
      # MISSING: Critical Istio annotations!
    spec:
      # DEFAULT: Only 30s grace period
      containers:
      - name: app
        image: myregistry/demo-service:latest
        ports:
        - containerPort: 8080
          name: grpc
        - containerPort: 8090
          name: mgmt
        args: ["--startup-delay=45s"]  # Longer than default probe timeout!
        
        # ANTI-PATTERN: Identical liveness and readiness probes
        livenessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080"]
          initialDelaySeconds: 10
          periodSeconds: 10
          failureThreshold: 3  # Will fail after 40s total
          
        readinessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080"]  # Same as liveness!
          initialDelaySeconds: 10
          periodSeconds: 10
        
        # MISSING: No startup probe for slow initialization
        # MISSING: No preStop hook for graceful shutdown

Good Service (Best Practices)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: good-service
  namespace: demo
spec:
  replicas: 2
  selector:
    matchLabels:
      app: good-service
  template:
    metadata:
      labels:
        app: good-service
      annotations:
        # Critical for Istio/Envoy sidecar lifecycle management
        sidecar.istio.io/holdApplicationUntilProxyStarts: "true"
        proxy.istio.io/config: |
          proxyMetadata:
            EXIT_ON_ZERO_ACTIVE_CONNECTIONS: "true"
        sidecar.istio.io/proxyCPU: "100m"
        sidecar.istio.io/proxyMemory: "128Mi"
    spec:
      # Extended grace period: preStop (15s) + app shutdown (30s) + buffer (20s)
      terminationGracePeriodSeconds: 65
      
      containers:
      - name: app
        image: myregistry/demo-service:latest
        ports:
        - containerPort: 8080
          name: grpc
        - containerPort: 8090
          name: mgmt
        args: ["--startup-delay=45s"]
        
        # Resource management for predictable performance
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
        
        # Startup probe for slow initialization
        startupProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080", "-service=readiness"]
          initialDelaySeconds: 0
          periodSeconds: 5
          failureThreshold: 24  # 5s * 24 = 120s total startup time
          successThreshold: 1
        
        # Simple liveness check
        livenessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080", "-service=liveness"]
          initialDelaySeconds: 0  # Startup probe handles initialization
          periodSeconds: 10
          failureThreshold: 3
          timeoutSeconds: 5
        
        # Deep readiness check
        readinessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080", "-service=readiness"]
          initialDelaySeconds: 0
          periodSeconds: 5
          failureThreshold: 2
          successThreshold: 1
          timeoutSeconds: 5
        
        # Graceful shutdown coordination
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 15"]  # Allow LB to drain
        
        # Environment variables for cloud provider integration
        env:
        - name: CLOUD_PROVIDER
          value: "auto-detect"  # Works with GCP, AWS, Azure
        - name: ENABLE_PROFILING
          value: "true"

Istio Service Mesh: Beyond Basic Lifecycle Management

While proper health checks and graceful shutdown are foundational, Istio adds critical production-grade capabilities that dramatically improve fault tolerance:

Automatic Retries and Circuit Breaking

apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: payment-service
  namespace: demo
spec:
  host: payment-service.demo.svc.cluster.local
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 100
        maxRequestsPerConnection: 2
    circuitBreaker:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
    retryPolicy:
      attempts: 3
      perTryTimeout: 2s
      retryOn: 5xx,gateway-error,connect-failure,refused-stream
      retryRemoteLocalities: true

Key Benefits for Production Systems

  1. Automatic Request Retries: If a pod fails or becomes unavailable, Istio automatically retries requests to healthy instances
  2. Circuit Breaking: Prevents cascading failures by temporarily cutting off traffic to unhealthy services
  3. Load Balancing: Distributes traffic intelligently across healthy pods
  4. Mutual TLS: Secures service-to-service communication without code changes
  5. Observability: Provides detailed metrics, tracing, and logging for all inter-service communication
  6. Canary Deployments: Enables safe rollouts with automatic traffic shifting
  7. Rate Limiting: Protects services from being overwhelmed
  8. Timeout Management: Prevents hanging requests with configurable timeouts

Termination Grace Period Calculation

The critical formula for calculating termination grace periods:

terminationGracePeriodSeconds = preStop delay + application shutdown timeout + buffer

Examples:
- Simple service: 10s + 20s + 5s = 35s
- Complex service: 15s + 45s + 5s = 65s
- Batch processor: 30s + 120s + 10s = 160s

Important: Services requiring more than 90-120 seconds to shut down should be re-architected using checkpoint-and-resume patterns.

Advanced Patterns for Production

1. Idempotency: Handling Duplicate Requests

Critical for production: When pods restart or network issues occur, clients may retry requests. Without idempotency, this can cause duplicate transactions, corrupted state, or financial losses. This is mandatory for all state-modifying operations.

package idempotency

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "time"
    "sync"
    "errors"
)

var (
    ErrDuplicateRequest = errors.New("duplicate request detected")
    ErrProcessingInProgress = errors.New("request is currently being processed")
)

// IdempotencyStore tracks request execution with persistence
type IdempotencyStore struct {
    mu        sync.RWMutex
    records   map[string]*Record
    persister PersistenceLayer // Database or Redis for durability
}

type Record struct {
    Key         string
    Response    interface{}
    Error       error
    Status      ProcessingStatus
    ExpiresAt   time.Time
    CreatedAt   time.Time
    ProcessedAt *time.Time
}

type ProcessingStatus int

const (
    StatusPending ProcessingStatus = iota
    StatusProcessing
    StatusCompleted
    StatusFailed
)

// ProcessIdempotent ensures exactly-once processing semantics
func (s *IdempotencyStore) ProcessIdempotent(
    ctx context.Context,
    key string,
    ttl time.Duration,
    fn func() (interface{}, error),
) (interface{}, error) {
    // Check if we've seen this request before
    s.mu.RLock()
    record, exists := s.records[key]
    s.mu.RUnlock()
    
    if exists {
        switch record.Status {
        case StatusCompleted:
            if time.Now().Before(record.ExpiresAt) {
                return record.Response, record.Error
            }
        case StatusProcessing:
            return nil, ErrProcessingInProgress
        case StatusFailed:
            if time.Now().Before(record.ExpiresAt) {
                return record.Response, record.Error
            }
        }
    }
    
    // Mark as processing
    record = &Record{
        Key:       key,
        Status:    StatusProcessing,
        ExpiresAt: time.Now().Add(ttl),
        CreatedAt: time.Now(),
    }
    
    s.mu.Lock()
    s.records[key] = record
    s.mu.Unlock()
    
    // Persist the processing state
    if err := s.persister.Save(ctx, record); err != nil {
        return nil, err
    }
    
    // Execute the function
    response, err := fn()
    processedAt := time.Now()
    
    // Update record with result
    s.mu.Lock()
    record.Response = response
    record.Error = err
    record.ProcessedAt = &processedAt
    if err != nil {
        record.Status = StatusFailed
    } else {
        record.Status = StatusCompleted
    }
    s.mu.Unlock()
    
    // Persist the final state
    s.persister.Save(ctx, record)
    
    return response, err
}

// Example: Idempotent payment processing
func (s *PaymentService) ProcessPayment(ctx context.Context, req *PaymentRequest) (*PaymentResponse, error) {
    // Generate idempotency key from request
    key := generateIdempotencyKey(req)
    
    result, err := s.idempotencyStore.ProcessIdempotent(
        ctx,
        key,
        24*time.Hour, // Keep records for 24 hours
        func() (interface{}, error) {
            // Atomic transaction processing
            return s.processPaymentTransaction(ctx, req)
        },
    )
    
    if err != nil {
        return nil, err
    }
    return result.(*PaymentResponse), nil
}

// Atomic transaction processing
func (s *PaymentService) processPaymentTransaction(ctx context.Context, req *PaymentRequest) (*PaymentResponse, error) {
    // Use database transaction for atomicity
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()
    
    // Step 1: Validate accounts
    if err := s.validateAccounts(ctx, tx, req); err != nil {
        return nil, err
    }
    
    // Step 2: Process payment atomically
    paymentID, err := s.executePayment(ctx, tx, req)
    if err != nil {
        return nil, err
    }
    
    // Step 3: Commit transaction
    if err := tx.Commit(); err != nil {
        return nil, err
    }
    
    return &PaymentResponse{
        PaymentID: paymentID,
        Status:    "completed",
        Timestamp: time.Now(),
    }, nil
}

2. Checkpoint and Resume: Long-Running Operations

For operations that may exceed the termination grace period, implement checkpointing:

package checkpoint

import (
    "context"
    "encoding/json"
    "time"
)

type CheckpointStore interface {
    Save(ctx context.Context, id string, state interface{}) error
    Load(ctx context.Context, id string, state interface{}) error
    Delete(ctx context.Context, id string) error
}

type BatchProcessor struct {
    store          CheckpointStore
    checkpointFreq int
}

type BatchState struct {
    JobID      string    `json:"job_id"`
    TotalItems int       `json:"total_items"`
    Processed  int       `json:"processed"`
    LastItem   string    `json:"last_item"`
    StartedAt  time.Time `json:"started_at"`
}

func (p *BatchProcessor) ProcessBatch(ctx context.Context, jobID string, items []string) error {
    // Try to resume from checkpoint
    state := &BatchState{JobID: jobID}
    if err := p.store.Load(ctx, jobID, state); err == nil {
        log.Printf("Resuming job %s from item %d", jobID, state.Processed)
        items = items[state.Processed:]
    } else {
        // New job
        state = &BatchState{
            JobID:      jobID,
            TotalItems: len(items),
            Processed:  0,
            StartedAt:  time.Now(),
        }
    }
    
    // Process items with periodic checkpointing
    for i, item := range items {
        select {
        case <-ctx.Done():
            // Save progress before shutting down
            state.LastItem = item
            return p.store.Save(ctx, jobID, state)
        default:
            // Process item
            if err := p.processItem(ctx, item); err != nil {
                return err
            }
            
            state.Processed++
            state.LastItem = item
            
            // Checkpoint periodically
            if state.Processed%p.checkpointFreq == 0 {
                if err := p.store.Save(ctx, jobID, state); err != nil {
                    log.Printf("Failed to checkpoint: %v", err)
                }
            }
        }
    }
    
    // Job completed, remove checkpoint
    return p.store.Delete(ctx, jobID)
}

3. Circuit Breaker Pattern for Dependencies

Protect your service from cascading failures:

package circuitbreaker

import (
    "context"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    mu              sync.RWMutex
    state           State
    failures        int
    successes       int
    lastFailureTime time.Time
    
    maxFailures      int
    resetTimeout     time.Duration
    halfOpenRequests int
}

func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
    cb.mu.RLock()
    state := cb.state
    cb.mu.RUnlock()
    
    if state == StateOpen {
        // Check if we should transition to half-open
        cb.mu.Lock()
        if time.Since(cb.lastFailureTime) > cb.resetTimeout {
            cb.state = StateHalfOpen
            cb.successes = 0
            state = StateHalfOpen
        }
        cb.mu.Unlock()
    }
    
    if state == StateOpen {
        return ErrCircuitOpen
    }
    
    err := fn()
    
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailureTime = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = StateOpen
            log.Printf("Circuit breaker opened after %d failures", cb.failures)
        }
        return err
    }
    
    if state == StateHalfOpen {
        cb.successes++
        if cb.successes >= cb.halfOpenRequests {
            cb.state = StateClosed
            cb.failures = 0
            log.Println("Circuit breaker closed")
        }
    }
    
    return nil
}

Testing Your Implementation

Manual Testing Guide

Test 1: Startup Race Condition

Setup:

# Deploy both services
kubectl apply -f k8s/bad-service.yaml
kubectl apply -f k8s/good-service.yaml

# Watch pods in separate terminal
watch kubectl get pods -n demo

Test the bad service:

# Force restart
kubectl delete pod -l app=bad-service -n demo

# Observe: Pod will enter CrashLoopBackOff due to liveness probe
# killing it before 45s startup completes

Test the good service:

# Force restart
kubectl delete pod -l app=good-service -n demo

# Observe: Pod stays in 0/1 Ready state for ~45s, then becomes ready
# No restarts occur thanks to startup probe

Test 2: Data Consistency Under Failure

Setup:

# Deploy payment service with reconciliation enabled
kubectl apply -f k8s/payment-service.yaml

# Start payment traffic generator
kubectl run payment-generator --image=payment-client:latest \
  --restart=Never --rm -it -- \
  --target=payment-service.demo.svc.cluster.local:8080 \
  --rate=10 --duration=60s

Simulate SIGKILL during transactions:

# In another terminal, kill pods abruptly
while true; do
  kubectl delete pod -l app=payment-service -n demo --force --grace-period=0
  sleep 30
done

Verify reconciliation:

# Check for data inconsistencies
kubectl logs -l app=payment-service -n demo | grep "inconsistency"

# Monitor reconciliation metrics
kubectl port-forward svc/payment-service 8090:8090
curl http://localhost:8090/metrics | grep consistency

Test 3: RTO/RPO Validation

Disaster Recovery Simulation:

# Simulate regional failure
kubectl patch deployment payment-service -n demo \
  --patch '{"spec":{"replicas":0}}'

# Measure RTO - time to restore service
start_time=$(date +%s)
kubectl patch deployment payment-service -n demo \
  --patch '{"spec":{"replicas":3}}'

# Wait for all pods to be ready
kubectl wait --for=condition=ready pod -l app=payment-service -n demo --timeout=900s
end_time=$(date +%s)
rto=$((end_time - start_time))

echo "RTO: ${rto} seconds"
if [ $rto -le 900 ]; then
  echo "? RTO target met (15 minutes)"
else
  echo "? RTO target exceeded"
fi

Test 4: Istio Resilience Features

Automatic Retry Testing:

# Deploy with fault injection
kubectl apply -f istio/fault-injection.yaml

# Generate requests with chaos header
for i in {1..100}; do
  grpcurl -H "x-chaos-test: true" -plaintext \
    payment-service.demo.svc.cluster.local:8080 \
    PaymentService/ProcessPayment \
    -d '{"amount": 100, "currency": "USD"}'
done

# Check Istio metrics for retry behavior
kubectl exec -n istio-system deployment/istiod -- \
  pilot-agent request GET stats/prometheus | grep retry

Monitoring and Observability

RTO/RPO Considerations

Recovery Time Objective (RTO): Target time to restore service after an outage Recovery Point Objective (RPO): Maximum acceptable data loss

Your service lifecycle design directly impacts these critical business metrics:

package monitoring

import (
    "time"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    // RTO-related metrics
    ServiceStartupTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
        Name: "service_startup_duration_seconds",
        Help: "Time from pod start to service ready",
        Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // Up to 10 minutes
    }, []string{"service", "version"})
    
    ServiceRecoveryTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
        Name: "service_recovery_duration_seconds", 
        Help: "Time to recover from failure state",
        Buckets: []float64{1, 5, 10, 30, 60, 300, 900}, // Up to 15 minutes
    }, []string{"service", "failure_type"})
    
    // RPO-related metrics
    LastCheckpointAge = promauto.NewGaugeVec(prometheus.GaugeOpts{
        Name: "last_checkpoint_age_seconds",
        Help: "Age of last successful checkpoint",
    }, []string{"service", "checkpoint_type"})
    
    DataConsistencyChecks = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "data_consistency_checks_total",
        Help: "Total number of consistency checks performed",
    }, []string{"service", "check_type", "status"})
    
    InconsistencyDetected = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "data_inconsistencies_detected_total",
        Help: "Total number of data inconsistencies detected",
    }, []string{"service", "inconsistency_type", "severity"})
)

Grafana Dashboard

{
  "dashboard": {
    "title": "Service Lifecycle - Business Impact",
    "panels": [
      {
        "title": "RTO Compliance",
        "description": "Percentage of recoveries meeting RTO target (15 minutes)",
        "targets": [{
          "expr": "100 * (histogram_quantile(0.95, service_recovery_duration_seconds_bucket) <= 900)"
        }],
        "thresholds": [
          {"value": 95, "color": "green"},
          {"value": 90, "color": "yellow"},
          {"value": 0, "color": "red"}
        ]
      },
      {
        "title": "RPO Risk Assessment",
        "description": "Data at risk based on checkpoint age",
        "targets": [{
          "expr": "last_checkpoint_age_seconds / 60"
        }],
        "unit": "minutes"
      },
      {
        "title": "Data Consistency Status",
        "targets": [{
          "expr": "rate(data_inconsistencies_detected_total[5m])"
        }]
      }
    ]
  }
}

Production Readiness Checklist

Before deploying to production, ensure your service meets these criteria:

Application Layer

  • [ ] Implements separate liveness and readiness endpoints
  • [ ] Readiness checks validate all critical dependencies
  • [ ] Graceful shutdown drains in-flight requests
  • [ ] Idempotency for all state-modifying operations
  • [ ] Anti-entropy/reconciliation processes implemented
  • [ ] Circuit breakers for external dependencies
  • [ ] Checkpoint-and-resume for long-running operations
  • [ ] Structured logging with correlation IDs
  • [ ] Metrics for startup, shutdown, and health status

Kubernetes Configuration

  • [ ] Startup probe for slow-initializing services
  • [ ] Distinct liveness and readiness probes
  • [ ] Calculated terminationGracePeriodSeconds based on actual shutdown time
  • [ ] PreStop hooks for load balancer draining
  • [ ] Resource requests and limits defined
  • [ ] PodDisruptionBudget for availability
  • [ ] Anti-affinity rules for high availability

Service Mesh Integration

  • [ ] Istio sidecar lifecycle annotations (holdApplicationUntilProxyStarts)
  • [ ] Istio automatic retry policies configured
  • [ ] Circuit breaker configuration in DestinationRule
  • [ ] Distributed tracing enabled
  • [ ] mTLS for service-to-service communication

Data Integrity & Recovery

  • [ ] RTO/RPO metrics tracked and alerting configured
  • [ ] Reconciliation processes tested with Game Day exercises
  • [ ] Chaos engineering tests validate failure scenarios
  • [ ] Synthetic monitoring for end-to-end business flows
  • [ ] Backup and restore procedures documented and tested

Common Pitfalls and Solutions

1. My service keeps restarting during deployment:

Symptom: Pods enter CrashLoopBackOff during rollout

Common Causes:

  • Liveness probe starts before application is ready
  • Startup time exceeds probe timeout
  • Missing startup probe

Solution:

startupProbe:
  httpGet:
    path: /healthz
    port: 8080
  failureThreshold: 30  # 30 * 10s = 5 minutes
  periodSeconds: 10

2. Data corruption during pod restarts:

Symptom: Inconsistent database state after deployments

Common Causes:

  • Non-atomic operations
  • Missing idempotency
  • No reconciliation processes

Solution:

// Implement atomic operations with database transactions
tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

// All operations within transaction
if err := processPayment(tx, req); err != nil {
    return err // Automatic rollback
}

return tx.Commit()

3. Service mesh sidecar issues:

Symptom: ECONNREFUSED errors on startup

Common Causes:

  • Application starts before sidecar is ready
  • Sidecar terminates before application

Solution:

annotations:
  sidecar.istio.io/holdApplicationUntilProxyStarts: "true"
  proxy.istio.io/config: |
    proxyMetadata:
      EXIT_ON_ZERO_ACTIVE_CONNECTIONS: "true"

Conclusion

Service lifecycle management is not just about preventing outages—it’s about building systems that are predictable, observable, and resilient to the inevitable failures that occur in distributed systems. This allows:

  • Zero-downtime deployments: Services gracefully handle rollouts without data loss.
  • Improved reliability: Proper health checks prevent cascading failures.
  • Better observability: Clear signals about service state and data consistency.
  • Faster recovery: Services self-heal from transient failures.
  • Data integrity: Idempotency and reconciliation prevent corruption.
  • Compliance readiness: Meet RTO/RPO requirements for disaster recovery.
  • Financial protection: Prevent duplicate transactions and data corruption that could cost millions.

The difference between a service that “works on my machine” and one that thrives in production lies in these details. Whether you’re running on GKE, EKS, or AKS, these patterns form the foundation of production-ready microservices.

Want to test these patterns yourself? The complete code examples and deployment manifests are available on GitHub.

July 11, 2025

Building Resilient, Interactive Playbooks with Formicary

Filed under: Computing,Technology — admin @ 8:16 pm

In any complex operational environment, the most challenging processes are often those that can’t be fully automated. A CI/CD pipeline might be 99% automated, but that final push to production requires a sign-off. A disaster recovery plan might be scripted, but you need a human to make the final call to failover. These “human-in-the-loop” scenarios are where rigid automation fails and manual checklists introduce risk.

Formicary is a distributed orchestration engine designed to bridge this gap. It allows you to codify your entire operational playbook—from automated scripts to manual verification steps—into a single, version-controlled workflow. This post will guide you through Formicary‘s core concepts and demonstrate how to build two powerful, real-world playbooks:

  1. A Secure CI/CD Pipeline that builds, scans, and deploys to staging, then pauses for manual approval before promoting to production.
  2. A Semi-Automated Disaster Recovery Playbook that uses mocked Infrastructure as Code (IaC) to provision a new environment and waits for an operator’s go-ahead before failing over.

Formicary Features and Architecture

Formicary combines the robust workflow capabilities with the practical CI/CD features, all in a self-hosted, extensible platform.

Core Features

  • Declarative Workflows: Define complex jobs as a Directed Acyclic Graph (DAG) in a single, human-readable YAML file. Your entire playbook is version-controlled code.
  • Versatile Executors: A task is not tied to a specific runtime. Use the method that fits the job: KUBERNETES, DOCKER, SHELL, or even HTTP API calls.
  • Advanced Flow Control: Go beyond simple linear stages. Use on_exit_code to branch your workflow based on a script’s result, create polling “sensor” tasks, and define robust retry logic.
  • Manual Approval Gates: Explicitly define MANUAL tasks that pause the workflow and require human intervention to proceed via the UI or API.
  • Security Built-in: Manage secrets with database-level encryption and automatic log redaction. An RBAC model controls user access.

Architecture in a Nutshell

Formicary operates on a leader-follower model. The Queen server acts as the control plane, while one or more Ant workers form the execution plane.

  • Queen Server: The central orchestrator. It manages job definitions, schedules pending jobs based on priority, and tracks the state of all workers and executions.
  • Ant Workers: The workhorses. They register with the Queen, advertising their capabilities (e.g., supported executors and tags like gpu-enabled). They pick up tasks from the message queue and execute them.
  • Backend: Formicary relies on a database (like Postgres or MySQL) for state, a message queue (like Go Channels, Redis or Pulsar) for communication, and an S3-compatible object store for artifacts.

Getting Started: A Local Formicary Environment

The quickest way to get started is with the provided Docker Compose setup.

Prerequisites

  • Docker & Docker Compose
  • A local Kubernetes cluster (like Docker Desktop’s Kubernetes, Minikube, or k3s) with its kubeconfig file correctly set up. The embedded Ant worker will use this to run Kubernetes tasks.

Installation Steps

  1. Clone the Repository: git clone https://github.com/bhatti/formicary.git && cd formicary
  2. Launch the System:
    This command starts the Queen server, a local Ant worker, Redis, and MinIO object storage. docker-compose up
  3. Explore the Dashboard:
    Once the services are running, open your browser to http://localhost:7777.

Example 1: Secure CI/CD with Manual Production Deploy

Our goal is to build a CI/CD pipeline for a Go application that:

  1. Builds the application binary.
  2. Runs static analysis (gosec) and saves the report.
  3. Deploys automatically to a staging environment.
  4. Pauses for manual verification.
  5. If approved, deploys to production.

Here is the complete playbook definition:

job_type: secure-go-cicd
description: Build, scan, and deploy a Go application with a manual production gate.
tasks:
- task_type: build
  method: KUBERNETES
  container:
    image: golang:1.24-alpine
  script:
    - echo "Building Go binary..."
    - go build -o my-app ./...
  artifacts:
    paths: [ "my-app" ]
  on_completed: security-scan

- task_type: security-scan
  method: KUBERNETES
  container:
    image: securego/gosec:latest
  allow_failure: true # We want the report even if it finds issues
  script:
    - echo "Running SAST scan with gosec..."
    # The -no-fail flag prevents the task from failing the pipeline immediately.
    - gosec -fmt=sarif -out=gosec-report.sarif ./...
  artifacts:
    paths: [ "gosec-report.sarif" ]
  on_completed: deploy-staging

- task_type: deploy-staging
  method: KUBERNETES
  dependencies: [ "build" ]
  container:
    image: alpine:latest
  script:
    - echo "Deploying ./my-app to staging..."
    - sleep 5 # Simulate deployment work
    - echo "Staging deployment complete. Endpoint: http://staging.example.com"
  on_completed: verify-production-deploy

- task_type: verify-production-deploy
  method: MANUAL
  description: "Staging deployment complete. A security scan report is available as an artifact. Please verify the staging environment and the report before promoting to production."
  on_exit_code:
    APPROVED: promote-production
    REJECTED: rollback-staging

- task_type: promote-production
  method: KUBERNETES
  dependencies: [ "build" ]
  container:
    image: alpine:latest
  script:
    - echo "PROMOTING ./my-app TO PRODUCTION! This is a critical, irreversible step."
  on_completed: cleanup

- task_type: rollback-staging
  method: KUBERNETES
  container:
    image: alpine:latest
  script:
    - echo "Deployment was REJECTED. Rolling back staging environment now."
  on_completed: cleanup

- task_type: cleanup
  method: KUBERNETES
  always_run: true
  container:
    image: alpine:latest
  script:
    - echo "Pipeline finished."

Executing the Playbook

  1. Upload the Job Definition: curl -X POST http://localhost:7777/api/jobs/definitions \ -H "Content-Type: application/yaml" \ --data-binary @playbooks/secure-ci-cd.yaml
  2. Submit the Job Request: curl -X POST http://localhost:7777/api/jobs/requests \ -H "Content-Type: application/json" \ -d '{"job_type": "secure-go-cicd"}'
  3. Monitor and Approve:
    • Go to the dashboard. You will see the job run through build, security-scan, and deploy-staging.
    • The job will then enter the MANUAL_APPROVAL_REQUIRED state.
    • On the job’s detail page, you will see an “Approve” button next to the verify-production-deploy task.
    • To approve via the API, get the Job Request ID and the Task Execution ID from the UI or API, then run:

Once approved, the playbook will proceed to promote-production and run the final cleanup step.

Example 2: Semi-Automated Disaster Recovery Playbook

Now for a more critical scenario: failing over a service to a secondary region. This playbook uses mocked IaC steps and pauses for the crucial final decision.

job_type: aws-region-failover
description: A playbook to provision and failover to a secondary region.
tasks:
- task_type: check-primary-status
  method: KUBERNETES
  container:
    image: alpine:latest
  script:
    - echo "Pinging primary region endpoint... it's down! Initiating failover procedure."
    - exit 1 # Simulate failure to trigger the 'on_failed' path
  on_completed: no-op # This path is not taken in our simulation
  on_failed: provision-secondary-infra

- task_type: provision-secondary-infra
  method: KUBERNETES
  container:
    image: hashicorp/terraform:light
  script:
    - echo "Simulating 'terraform apply' to provision DR infrastructure in us-west-2..."
    - sleep 10 # Simulate time for infra to come up
    - echo "Terraform apply complete. Outputting simulated state file."
    - echo '{"aws_instance.dr_server": {"id": "i-12345dr"}}' > terraform.tfstate
  artifacts:
    paths: [ "terraform.tfstate" ]
  on_completed: verify-failover

- task_type: verify-failover
  method: MANUAL
  description: "Secondary infrastructure in us-west-2 has been provisioned. The terraform.tfstate file is available as an artifact. Please VERIFY COSTS and readiness. Approve to switch live traffic."
  on_exit_code:
    APPROVED: switch-dns
    REJECTED: teardown-secondary-infra

- task_type: switch-dns
  method: KUBERNETES
  container:
    image: amazon/aws-cli
  script:
    - echo "CRITICAL: Switching production DNS records to the us-west-2 environment..."
    - sleep 5
    - echo "DNS failover complete. Traffic is now routed to the DR region."
  on_completed: notify-completion

- task_type: teardown-secondary-infra
  method: KUBERNETES
  container:
    image: hashicorp/terraform:light
  script:
    - echo "Failover REJECTED. Simulating 'terraform destroy' for secondary infrastructure..."
    - sleep 10
    - echo "Teardown complete."
  on_completed: notify-completion

- task_type: notify-completion
  method: KUBERNETES
  always_run: true
  container:
    image: alpine:latest
  script:
    - echo "Disaster recovery playbook has concluded."

Executing the DR Playbook

The execution flow is similar to the first example. An operator would trigger this job, wait for the provision-secondary-infra task to complete, download and review the terraform.tfstate artifact, and then make the critical “Approve” or “Reject” decision.

Conclusion

Formicary helps you turn your complex operational processes into reliable, trackable workflows that run automatically. It uses containers to execute tasks and includes manual approval checkpoints, so you can automate your work with confidence. This approach reduces human mistakes while making sure people stay in charge of the important decisions.

June 14, 2025

Feature Flag Anti-Paterns: Learnings from Outages

Filed under: Computing — admin @ 9:54 pm

Feature flags are key components of modern infrastructure for shipping faster, testing in production, and reducing risk. However, they can also be a fast track to complex outages if not handled with discipline. Google’s recent major outage serves as a case study, and I’ve seen similar issues arise from missteps with feature flags. The core of Google’s incident revolved around a new code path in their “Service Control” system that should have been protected with a feature flag but wasn’t. This path, designed for an additional quota policy check, went directly to production without flag protection. When a policy change with unintended blank fields was replicated globally within seconds, it triggered the untested code path, causing a null pointer that crashed binaries globally. This incident perfectly illustrates why feature flags aren’t just nice-to-have—they’re essential guardrails that prevent exactly these kinds of global outages. Google also didn’t implement proper error handling, and the system didn’t use randomized exponential backoff that resulted in “thundering herd” effect that prolonged recovery.

Let’s dive into common anti-patterns I’ve observed and how we can avoid them:

Anti-Pattern 1: Inadequate Testing & Error Handling

This is perhaps the most common and dangerous anti-pattern. It involves deploying code behind a feature flag without comprehensive testing all states of that flag (on, off) and the various condition that interact with the flagged feature. It also includes neglecting robust error handling within the flagged code itself without defaulting flags to “off” in production. For example, Google’s Service Control binary crashed due to a null pointer when a new policy was propagated globally. This didn’t adequately tested the code path with empty input and failed to implement proper error handling. I’ve seen similar issues where teams didn’t test the code path protected with a feature flag in a test environment that only manifest in production. In other cases, the flag was accidentally left ON by default for production, leading to immediate issues upon deployment. The Google incident also mentions the problematic code “did not have appropriate error handling.” If the code within your feature flag assumes perfect conditions, it’s a ticking time bomb. These issues can be remedied by:

  • Default Off in Production: Ensure all new feature flags are disabled by default in production.
  • Comprehensive Testing: Test the feature with the flag ON and OFF. Crucially, test the specific conditions, data inputs, and configurations that trigger with the new code paths enabled by the flag.
  • Robust Error Handling: Implement proper error handling within the code controlled by the flag. It should fail gracefully or revert to a safe state if an unexpected issue occurs, not bring down the service.
  • Consider Testing Costs: If testing all combinations becomes prohibitively expensive or complex, it might indicate the feature is too large for a single flag and should be broken down.

Anti-Pattern 2: Inadequate Peer Review

This anti-pattern manifests when feature flag changes occur without a proper review process. It’s like making direct database changes in production without a change request. For example, Google’s issue was a policy metadata change rather than a direct flag toggle where metadata replicated globally within seconds. It is analogous to flipping a critical global flag without due diligence. If that policy metadata change had been managed like a code change (e.g., via GitOps or Config as a Code with canary rollout, the issue might have been caught earlier. This can be remedied with:

  • GitOps/Config-as-Code: Manage feature flag configurations as code within your Git repository. This enforces PRs, peer reviews, and provides an auditable history.
  • Test Flag Rollback: As part of your process, ensure you can easily and reliably roll back a feature flag configuration change, just like you would with code.
  • Detect Configuration Drift: Ensure that the actual state in production does not drift from what’s expected or version-controlled.

Anti-Pattern 3: Inadequate Authorization and Auditing

This means not protecting enabling/disabling feature flags with proper permissions. Internally, if anyone can flip a production flag via a UI without a PR or a second pair of eyes, we’re exposed. Also, if there’s no clear record of who changed it, when, and why, incident response becomes a frantic scramble. Remedies include:

  • Strict Access Control: Implement strong Role-Based Access Control (or Relationship-Based Access Control) to limit who can modify flag states or configurations in production.
  • Comprehensive Auditing: Ensure your feature flagging system provides detailed audit logs for every change: who made the change, what was changed, and when.

Anti-Pattern 4: No Monitoring

Deploying a feature behind a flag and then flipping it on for everyone without closely monitoring its impact is like walking into a dark room and hoping you don’t trip. This can be remedied by actively monitoring feature flags and collecting metrics on your observability platform. This includes tracking not just the flag’s state (on/off) but also its real-time impact on key system metrics (error rates, latency, resource consumption) and relevant business KPIs.

Anti-Pattern 5: No Phased Rollout or Kill Switch

This means turning a new, complex feature on for 100% of users simultaneously with a flag. For example, during Google’s incident, major changes to quota management settings were propagated immediately causing global outage. The “red-button” to disable the problematic serving path was crucial for their recovery. Remedies for this anti-pattern include:

  • Canary Releases & Phased Rollouts: Don’t enable features for everyone at once. Perform canary releases: enable for internal users, then a small percentage of production users while monitoring metrics.
  • “Red Button” Control: Have a clear, a “kill switch” or “red button” mechanism for quickly and globally disabling any problematic feature flag if issues arise.

Anti-Pattern 6: Thundering Herd

Enabling a feature flag can potentially change traffic patterns for incoming requests. For example, Google didn’t implement randomized exponential backoff in Service Control that caused “thundering herd” on underlying infrastructure. To prevent such issues, implement exponential backoff with jitter for request retries, combined with comprehensive monitoring.

Anti-Pattern 7: Misusing Flags for Config or Entitlements

Using feature flags as a general-purpose configuration management system or to manage complex user entitlements (e.g., free vs. premium tiers). For example, I’ve seen teams use feature flags to store API endpoints, timeout values, or rules about which customer tier gets which sub-feature. This means that your feature flag system becomes a de-facto distributed configuration database. This can be remedied with:

  • Purposeful Flags: Use feature flags primarily for controlling the lifecycle of discrete features: progressive rollout, A/B testing, kill switches.
  • Dedicated Systems: Use proper configuration management tools for application settings and robust entitlement systems for user permissions and plans.

Anti-Pattern 8: The “Zombie Flag” Infestation

Introducing feature flags but never removing them once a feature is fully rolled out or stable. I’ve seen codebases littered with if (isFeatureXEnabled) checks for features that have been live for years or were abandoned. This can be remedied with:

  • Lifecycle Management: Treat flags as having a defined lifespan.
  • Scheduled Cleanup: Regularly audit flags. Once a feature is 100% rolled out and stable (or definitively killed), schedule work to remove the flag and associated dead code.

Anti-Pattern 9: Ignoring Flagging Service Health

This means not considering how your application behaves if the feature flagging service itself experiences an outage or is unreachable. A crucial point in Google’s RCA was that their “Cloud Service Health infrastructure being down due to this outage” delayed communication. A colleague once pointed out: what happens if LaunchDarkly is down? This can be remedied with:

  • Safe Defaults in Code: When your code requests a flag’s state from the SDK (e.g., ldClient.variation("my-feature", user, **false**)), the provided default value is critical. For new or potentially risky features, this default must be the “safe” state (feature OFF).
  • SDK Resilience: Feature-Flag SDKs are designed to cache flag values and use them if the service is unreachable (stasis). But on a fresh app start before any cache is populated, your coded defaults are your safety net.

Summary

Feature flags are incredibly valuable for modern software development. They empower teams to move faster and release with more confidence. But as the Google incident and my own experiences show, they require thoughtful implementation and ongoing discipline. By avoiding these anti-patterns – by testing thoroughly, using flags for their intended purpose, managing their lifecycle, governing changes, and planning for system failures – we can ensure feature flags remain a powerful asset.

March 24, 2025

K8 Highlander: Managing Stateful and Singleton Processes in Kubernetes

Filed under: Technology — Tags: , — admin @ 2:22 pm

Introduction

Kubernetes has revolutionized how we deploy, scale, and manage applications in the cloud. I’ve been using Kubernetes for many years to build scalable, resilient, and maintainable services. However, Kubernetes was primarily designed for stateless applications – services that can scale horizontally. While such shared-nothing architecture is must-have for most modern microservices but it presents challenges for use-cases such as:

  1. Stateful/Singleton Processes: Applications that must run as a single instance across a cluster to avoid conflicts, race conditions, or data corruption. Examples include:
    • Legacy applications not designed for distributed operation
    • Batch processors that need exclusive access to resources
    • Job schedulers that must ensure jobs run exactly once
    • Applications with sequential ID generators
  2. Active/Passive Disaster Recovery: High-availability setups where you need a primary instance running with hot standbys ready to take over instantly if the primary fails.

Traditional Kubernetes primitives like StatefulSets provide stable network identities and ordered deployment but don’t solve the “exactly-one-active” problem. DaemonSets ensure one pod per node, but don’t address the need for a single instance across the entire cluster. This gap led me to develop K8 Highlander – a solution that ensures “there can be only one” active instance of your workloads while maintaining high availability through automatic failover.

Architecture

K8 Highlander implements distributed leader election to ensure only one controller instance is active at any time, with others ready to take over if the leader fails. The name “Highlander” refers to the tagline from the 1980s movie & show: “There can be only one.”

Core Components

K8 Highlander Architecture

The system consists of several key components:

  1. Leader Election: Uses distributed locking (via Redis or a database) to ensure only one controller is active at a time. The leader periodically renews its lock, and if it fails, another controller can acquire the lock and take over.
  2. Workload Manager: Manages different types of workloads in Kubernetes, ensuring they’re running and healthy when this controller is the leader.
  3. Monitoring Server: Provides real-time metrics and status information about the controller and its workloads.
  4. HTTP Server: Serves a dashboard and API endpoints for monitoring and management.

How Leader Election Works

The leader election process follows these steps:

  1. Each controller instance attempts to acquire a distributed lock with a TTL (Time-To-Live)
  2. Only one instance succeeds and becomes the leader
  3. The leader periodically renews its lock to maintain leadership
  4. If the leader fails to renew (due to crash, network issues, etc.), the lock expires
  5. Another instance acquires the lock and becomes the new leader
  6. The new leader starts managing workloads

This approach ensures high availability while preventing split-brain scenarios where multiple instances might be active simultaneously.

Workload Types

K8 Highlander supports four types of workloads:

  1. Process Workloads: Single-instance processes running in pods
  2. CronJob Workloads: Scheduled tasks that run at specific intervals
  3. Service Workloads: Continuously running services using Deployments
  4. Persistent Workloads: Stateful applications with persistent storage using StatefulSets

Each workload type is managed to ensure exactly one instance is running across the cluster, with automatic recreation if terminated unexpectedly.

Deploying and Using K8 Highlander

Let me walk through how to deploy and use K8 Highlander for your singleton workloads.

Prerequisites

  • Kubernetes cluster (v1.16+)
  • Redis server or PostgreSQL database for leader state storage
  • kubectl configured to access your cluster

Installation Using Docker

The simplest way to install K8 Highlander is using the pre-built Docker image:

# Create a namespace for k8-highlander
kubectl create namespace k8-highlander

# Create a ConfigMap with your configuration
kubectl create configmap k8-highlander-config \
  --from-file=config.yaml=./config/config.yaml \
  -n k8-highlander

# Deploy k8-highlander
kubectl apply -f - <<EOF
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k8-highlander
  namespace: k8-highlander
spec:
  replicas: 2  # Run multiple instances for HA
  selector:
    matchLabels:
      app: k8-highlander
  template:
    metadata:
      labels:
        app: k8-highlander
    spec:
      containers:
      - name: controller
        image: plexobject/k8-highlander:latest
        env:
        - name: HIGHLANDER_REDIS_ADDR
          value: "redis:6379"
        - name: HIGHLANDER_TENANT
          value: "default"
        - name: HIGHLANDER_NAMESPACE
          value: "default"
        - name: CONFIG_PATH
          value: "/etc/k8-highlander/config.yaml"
        ports:
        - containerPort: 8080
          name: http
        volumeMounts:
        - name: config-volume
          mountPath: /etc/k8-highlander
      volumes:
      - name: config-volume
        configMap:
          name: k8-highlander-config
---
apiVersion: v1
kind: Service
metadata:
  name: k8-highlander
  namespace: k8-highlander
spec:
  selector:
    app: k8-highlander
  ports:
  - port: 8080
    targetPort: 8080
EOF

This deploys K8 Highlander with your configuration, ensuring high availability with multiple replicas while maintaining the singleton behavior for your workloads.

Using K8 Highlander Locally for Testing

You can also run K8 Highlander locally for testing:

docker run -d --name k8-highlander \
  -v $(pwd)/config.yaml:/etc/k8-highlander/config.yaml \
  -e HIGHLANDER_REDIS_ADDR=redis-host:6379 \
  -p 8080:8080 \
  plexobject/k8-highlander:latest

Basic Configuration

K8 Highlander uses a YAML configuration file to define its behavior and workloads. Here’s a simple example:

id: "controller-1"
tenant: "default"
port: 8080
namespace: "default"

# Storage configuration
storageType: "redis"
redis:
  addr: "redis:6379"
  password: ""
  db: 0

# Cluster configuration
cluster:
  name: "primary"
  kubeconfig: ""  # Uses in-cluster config if empty

# Workloads configuration
workloads:
  # Process workload example
  processes:
    - name: "data-processor"
      image: "mycompany/data-processor:latest"
      script:
        commands:
          - "echo 'Starting data processor'"
          - "/app/process-data.sh"
        shell: "/bin/sh"
      env:
        DB_HOST: "postgres.example.com"
      resources:
        cpuRequest: "200m"
        memoryRequest: "256Mi"
      restartPolicy: "OnFailure"

Example Workload Configurations

Let’s look at examples for each workload type:

Process Workload

Use this for single-instance processes that need to run continuously:

processes:
  - name: "sequential-id-generator"
    image: "mycompany/id-generator:latest"
    script:
      commands:
        - "echo 'Starting ID generator'"
        - "/app/run-id-generator.sh"
      shell: "/bin/sh"
    env:
      DB_HOST: "postgres.example.com"
    resources:
      cpuRequest: "200m"
      memoryRequest: "256Mi"
    restartPolicy: "OnFailure"

CronJob Workload

For scheduled tasks that should run exactly once at specified intervals:

cronJobs:
  - name: "daily-report"
    schedule: "0 0 * * *"  # Daily at midnight
    image: "mycompany/report-generator:latest"
    script:
      commands:
        - "echo 'Generating daily report'"
        - "/app/generate-report.sh"
      shell: "/bin/sh"
    env:
      REPORT_TYPE: "daily"
    restartPolicy: "OnFailure"

Service Workload

For continuously running services that need to be singleton but highly available:

services:
  - name: "admin-api"
    image: "mycompany/admin-api:latest"
    replicas: 1
    ports:
      - name: "http"
        containerPort: 8080
        servicePort: 80
    env:
      LOG_LEVEL: "info"
    resources:
      cpuRequest: "100m"
      memoryRequest: "128Mi"

Persistent Workload

For stateful applications with persistent storage:

persistentSets:
  - name: "message-queue"
    image: "ibmcom/mqadvanced-server:latest"
    replicas: 1
    ports:
      - name: "mq"
        containerPort: 1414
        servicePort: 1414
    persistentVolumes:
      - name: "data"
        mountPath: "/var/mqm"
        size: "10Gi"
    env:
      LICENSE: "accept"
      MQ_QMGR_NAME: "QM1"

High Availability Setup

For production environments, run multiple instances of K8 Highlander to ensure high availability:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: k8-highlander
  namespace: k8-highlander
spec:
  replicas: 3  # Run multiple instances for HA
  selector:
    matchLabels:
      app: k8-highlander
  template:
    metadata:
      labels:
        app: k8-highlander
    spec:
      containers:
      - name: controller
        image: plexobject/k8-highlander:latest
        env:
        - name: HIGHLANDER_REDIS_ADDR
          value: "redis:6379"
        - name: HIGHLANDER_TENANT
          value: "production"

API and Monitoring Capabilities

K8 Highlander provides comprehensive monitoring, metrics, and API endpoints for observability and management.

Dashboard

Access the built-in dashboard at http://<controller-address>:8080/ to see the status of the controller and its workloads in real-time.

Leader Dashboard Screenshot

The dashboard shows:

  • Current leader status
  • Workload health and status
  • Redis/database connectivity
  • Failover history
  • Resource usage

API Endpoints

K8 Highlander exposes several HTTP endpoints for monitoring and integration:

  • GET /status: Returns the current status of the controller
  • GET /api/workloads: Lists all managed workloads and their status
  • GET /api/workloads/{name}: Gets the status of a specific workload
  • GET /healthz: Liveness probe endpoint
  • GET /readyz: Readiness probe endpoint

Example API response:

{
  "status": "success",
  "data": {
    "isLeader": true,
    "leaderSince": "2023-05-01T12:34:56Z",
    "lastLeaderTransition": "2023-05-01T12:34:56Z",
    "uptime": "1h2m3s",
    "leaderID": "controller-1",
    "workloadStatus": {
      "processes": {
        "data-processor": {
          "active": true,
          "namespace": "default"
        }
      }
    }
  }
}

Prometheus Metrics

K8 Highlander exposes Prometheus metrics at /metrics for monitoring and alerting:

# HELP k8_highlander_is_leader Indicates if this instance is currently the leader (1) or not (0)
# TYPE k8_highlander_is_leader gauge
k8_highlander_is_leader 1
# HELP k8_highlander_leadership_transitions_total Total number of leadership transitions
# TYPE k8_highlander_leadership_transitions_total counter
k8_highlander_leadership_transitions_total 1
# HELP k8_highlander_workload_status Status of managed workloads (1=active, 0=inactive)
# TYPE k8_highlander_workload_status gauge
k8_highlander_workload_status{name="data-processor",namespace="default",type="process"} 1

Key metrics include:

  • Leadership status and transitions
  • Workload health and status
  • Redis/database operations
  • Failover events and duration
  • System resource usage

Grafana Dashboard

A Grafana dashboard is available for visualizing K8 Highlander metrics. Import the dashboard from the dashboards directory in the repository.

Advanced Features

Multi-Tenant Support

K8 Highlander supports multi-tenant deployments, where different teams or environments can have their own isolated leader election and workload management:

# Tenant A configuration
id: "controller-1"
tenant: "tenant-a"
namespace: "tenant-a"
# Tenant B configuration
id: "controller-2"
tenant: "tenant-b"
namespace: "tenant-b"

Each tenant has its own leader election process, so one controller can be the leader for tenant A while another is the leader for tenant B.

Multi-Cluster Deployment

For disaster recovery scenarios, K8 Highlander can be deployed across multiple Kubernetes clusters with a shared Redis or database:

# Primary cluster
id: "controller-1"
tenant: "production"
cluster:
  name: "primary"
  kubeconfig: "/path/to/primary-kubeconfig"
# Secondary cluster
id: "controller-2"
tenant: "production"
cluster:
  name: "secondary"
  kubeconfig: "/path/to/secondary-kubeconfig"

If the primary cluster fails, a controller in the secondary cluster can become the leader and take over workload management.

Summary

K8 Highlander fills a critical gap in Kubernetes’ capabilities by providing reliable singleton workload management with automatic failover. It’s ideal for:

  • Legacy applications that don’t support horizontal scaling
  • Processes that need exclusive access to resources
  • Scheduled jobs that should run exactly once
  • Active/passive high-availability setups

The solution ensures high availability without sacrificing the “exactly one active” constraint that many applications require. By handling the complexity of leader election and workload management, K8 Highlander allows you to run stateful workloads in Kubernetes with confidence.

Where to Go from Here

K8 Highlander is an open-source project with MIT license, and contributions are welcome! Feel free to submit issues, feature requests, or pull requests to help improve the project.


« Newer PostsOlder Posts »

Powered by WordPress