Shahzad Bhatti Welcome to my ramblings and rants!

September 17, 2025

Transaction Boundaries: The Foundation of Reliable Systems

Filed under: Computing,Concurrency — admin @ 11:19 am

Over the years, I have seen countless production issues due to improper transaction management. A typical example: an API requires changes to multiple database tables, and each update is wrapped in different methods without proper transaction boundaries. This works fine when everything goes smoothly, but due to database constraints or other issues, a secondary database update might fail. In too many cases, the code doesn’t handle proper rollback and just throws an error—leaving the database in an inconsistent state.

In other cases, I’ve debugged production bugs due to improper coordination between database updates and event queues, where we desperately needed atomic behavior. I used J2EE in the late 1990s and early 2000s, which provided support for two-phase commit (2PC) to coordinate multiple updates across resources. However, 2PC wasn’t a scalable solution. I then experimented with aspect-oriented programming like AspectJ to handle cross-cutting concerns like transaction management, but it resulted in more complex code that was difficult to debug and maintain.

Later, I moved to Java Spring, which provided annotations for transaction management. This was both efficient and elegant—the @Transactional annotation made transaction boundaries explicit without cluttering business logic. When I worked at a travel booking company where we had to coordinate flight reservations, hotel bookings, car rentals, and insurance through various vendor APIs, I built a transaction framework based on the command pattern and chain of responsibility. This worked well for issuing compensating transactions when a remote API call failed midway through our public API workflow.

However, when I moved to Go and Rust, I found a lack of these basic transaction management primitives. I often see bugs in Go and Rust codebases that could have been caught earlier—many implementations assume the happy path and don’t properly handle partial failures or rollback scenarios.

In this blog, I’ll share learnings from my experience across different languages and platforms. I’ll cover best practices for establishing proper transaction boundaries, from single-database ACID transactions to distributed SAGA patterns, with working examples in Java/Spring, Go, and Rust. The goal isn’t just to prevent data corruption—it’s to build systems you can reason about, debug, and trust.

The Happy Path Fallacy

Most developers write code assuming everything will work perfectly. Here’s a typical “happy path” implementation:

// This looks innocent but is fundamentally broken
public class OrderService {
    public void processOrder(Order order) {
        orderRepository.save(order);           // What if this succeeds...
        paymentService.chargeCard(order);      // ...but this fails?
        inventoryService.allocate(order);      // Now we have inconsistent state
        emailService.sendConfirmation(order);  // And this might never happen
    }
}

The problem isn’t just that operations can fail—it’s that partial failures leave your system in an undefined state. Without proper transaction boundaries, you’re essentially playing Russian roulette with your data integrity. In my experience analyzing production systems, I’ve found that most data corruption doesn’t come from dramatic failures or outages. It comes from these subtle, partial failures that happen during normal operation. A network timeout here, a service restart there, and suddenly your carefully designed system is quietly hemorrhaging data consistency.

Transaction Fundamentals

Before we dive into robust transaction management in our applications, we need to understand what databases actually provide and how they achieve consistency guarantees. Most developers treat transactions as a black box—call BEGIN, do some work, call COMMIT, and hope for the best. But understanding the underlying mechanisms is crucial for making informed decisions about isolation levels, recognizing performance implications, and debugging concurrency issues when they inevitably arise in production. Let’s examine the foundational concepts that every developer working with transactions should understand.

The ACID Foundation

Before diving into implementation patterns, let’s establish why ACID properties matter:

  • Atomicity: Either all operations in a transaction succeed, or none do
  • Consistency: The database remains in a valid state before and after the transaction
  • Isolation: Concurrent transactions don’t interfere with each other
  • Durability: Once committed, changes survive system failures

These aren’t academic concepts—they’re the guardrails that prevent your system from sliding into chaos. Let’s see how different languages and frameworks help us maintain these guarantees.

Isolation Levels: The Hidden Performance vs Consistency Tradeoff

Most developers don’t realize that their database isn’t using the strictest isolation level by default. In fact, most production databases (MySQL, PostgreSQL, Oracle, SQL Server) default to READ COMMITTED, not SERIALIZABLE. This creates subtle race conditions that can lead to double spending and other financial disasters.

// The double spending problem with default isolation
@Service
public class VulnerableAccountService {
    
    // This uses READ COMMITTED by default - DANGEROUS for financial operations!
    @Transactional
    public void withdrawFunds(String accountId, BigDecimal amount) {
        Account account = accountRepository.findById(accountId);
        
        // RACE CONDITION: Another transaction can modify balance here!
        if (account.getBalance().compareTo(amount) >= 0) {
            account.setBalance(account.getBalance().subtract(amount));
            accountRepository.save(account);
        } else {
            throw new InsufficientFundsException();
        }
    }
}

// What happens with concurrent requests:
// Thread 1: Read balance = $100, check passes
// Thread 2: Read balance = $100, check passes  
// Thread 1: Withdraw $100, balance = $0
// Thread 2: Withdraw $100, balance = -$100 (DOUBLE SPENDING!)

Database Default Isolation Levels

DatabaseDefault IsolationFinancial Safety
PostgreSQLREAD COMMITTED? Vulnerable
MySQLREPEATABLE READ?? Better but not perfect
OracleREAD COMMITTED? Vulnerable
SQL ServerREAD COMMITTED? Vulnerable
H2/HSQLDBREAD COMMITTED? Vulnerable

The Right Way: Database Constraints + Proper Isolation

// Method 1: Database constraints (fastest)
@Entity
@Table(name = "accounts")
public class Account {
    @Id
    private String accountId;
    
    @Column(nullable = false)
    @Check(constraints = "balance >= 0") // Database prevents negative balance
    private BigDecimal balance;
    
    @Version
    private Long version;
}

@Service
public class SafeAccountService {
    
    // Let database constraint handle the race condition
    @Transactional
    public void withdrawFundsWithConstraint(String accountId, BigDecimal amount) {
        try {
            Account account = accountRepository.findById(accountId);
            account.setBalance(account.getBalance().subtract(amount));
            accountRepository.save(account); // Database throws exception if balance < 0
        } catch (DataIntegrityViolationException e) {
            throw new InsufficientFundsException("Withdrawal would result in negative balance");
        }
    }
    
    // Method 2: SERIALIZABLE isolation (most secure)
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void withdrawFundsSerializable(String accountId, BigDecimal amount) {
        Account account = accountRepository.findById(accountId);
        if (account.getBalance().compareTo(amount) >= 0) {
            account.setBalance(account.getBalance().subtract(amount));
            accountRepository.save(account);
        } else {
            throw new InsufficientFundsException();
        }
        // SERIALIZABLE guarantees no other transaction can interfere
    }
    
    // Method 3: Optimistic locking (good performance)
    @Transactional
    @Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 3)
    public void withdrawFundsOptimistic(String accountId, BigDecimal amount) {
        Account account = accountRepository.findById(accountId);
        if (account.getBalance().compareTo(amount) >= 0) {
            account.setBalance(account.getBalance().subtract(amount));
            accountRepository.save(account); // Version check prevents race conditions
        } else {
            throw new InsufficientFundsException();
        }
    }
}

MVCC

Most developers don’t realize that modern databases achieve isolation levels through Multi-Version Concurrency Control (MVCC), not traditional locking. Understanding MVCC explains why certain isolation behaviors seem counterintuitive. Instead of locking rows for reads, databases maintain multiple versions of each row with timestamps. When you start a transaction, you get a consistent snapshot of the database as it existed at that moment.

// What actually happens under MVCC
@Transactional(isolation = Isolation.REPEATABLE_READ)
public void demonstrateMVCC() {
    // T1: Transaction starts, gets snapshot at time=100
    Account account = accountRepository.findById("123"); // Reads version at time=100
    
    // T2: Another transaction modifies the account (creates version at time=101)
    
    // T1: Reads same account again
    Account sameAccount = accountRepository.findById("123"); // Still reads version at time=100!
    
    assert account.getBalance().equals(sameAccount.getBalance()); // MVCC guarantees this
}

MVCC vs Traditional Locking

-- Traditional locking approach (not MVCC)
BEGIN TRANSACTION;
SELECT * FROM accounts WHERE id = '123' FOR SHARE; -- Acquires shared lock
-- Other transactions blocked from writing until this transaction ends

-- MVCC approach (PostgreSQL, MySQL InnoDB)
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT * FROM accounts WHERE id = '123'; -- No locks, reads from snapshot
-- Other transactions can write freely, creating new versions

MVCC delivers better performance and reduces deadlock contention compared to traditional locking, but it comes with cleanup overhead requirements (PostgreSQL VACUUM, MySQL purge operations). I have encountered numerous production issues where real-time queries or ETL jobs would suddenly degrade in performance due to aggressive background VACUUM operations on older PostgreSQL versions, though recent versions have significantly improved this behavior. MVCC can also lead to stale reads in long-running transactions, as they maintain their snapshot view even as the underlying data changes.

// MVCC write conflict example
@Transactional
@Retryable(value = {OptimisticLockingFailureException.class})
public void updateAccountMVCC(String accountId, BigDecimal newBalance) {
    Account account = accountRepository.findById(accountId);
    
    // If another transaction modified this account between our read
    // and write, MVCC will detect the conflict and retry
    account.setBalance(newBalance);
    accountRepository.save(account); // May throw OptimisticLockingFailureException
}

This is why PostgreSQL defaults to READ COMMITTED and why long-running analytical queries should use dedicated read replicas—MVCC snapshots can become expensive to maintain over time.

Java and Spring: The Gold Standard

Spring’s @Transactional annotation is probably the most elegant solution I’ve encountered for transaction management. It uses aspect-oriented programming to wrap methods in transaction boundaries, making the complexity invisible to business logic.

Basic Transaction Management

@Service
@Transactional
public class OrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private PaymentService paymentService;
    
    @Autowired
    private InventoryService inventoryService;
    
    // All operations within this method are atomic
    public Order processOrder(CreateOrderRequest request) {
        Order order = new Order(request);
        order = orderRepository.save(order);
        
        // If any of these fail, everything rolls back
        Payment payment = paymentService.processPayment(
            order.getCustomerId(), 
            order.getTotalAmount()
        );
        
        inventoryService.reserveItems(order.getItems());
        order.setPaymentId(payment.getId());
        order.setStatus(OrderStatus.CONFIRMED);
        
        return orderRepository.save(order);
    }
}

Different Transaction Types

Spring provides fine-grained control over transaction behavior:

@Service
public class OrderService {
    
    // Read-only transactions can be optimized by the database
    @Transactional(readOnly = true)
    public List<Order> getOrderHistory(String customerId) {
        return orderRepository.findByCustomerId(customerId);
    }
    
    // Long-running operations need higher timeout
    @Transactional(timeout = 300) // 5 minutes
    public void processBulkOrders(List<CreateOrderRequest> requests) {
        for (CreateOrderRequest request : requests) {
            processOrder(request);
        }
    }
    
    // Critical operations need strict isolation
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void transferInventory(String fromLocation, String toLocation, 
                                String itemId, int quantity) {
        Item fromItem = inventoryRepository.findByLocationAndItem(fromLocation, itemId);
        Item toItem = inventoryRepository.findByLocationAndItem(toLocation, itemId);
        
        if (fromItem.getQuantity() < quantity) {
            throw new InsufficientInventoryException();
        }
        
        fromItem.setQuantity(fromItem.getQuantity() - quantity);
        toItem.setQuantity(toItem.getQuantity() + quantity);
        
        inventoryRepository.save(fromItem);
        inventoryRepository.save(toItem);
    }
    
    // Some operations should create new transactions
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void logAuditEvent(String event, String details) {
        AuditLog log = new AuditLog(event, details, Instant.now());
        auditRepository.save(log);
        // This commits immediately, independent of calling transaction
    }
    
    // Handle specific rollback conditions
    @Transactional(rollbackFor = {BusinessException.class, ValidationException.class})
    public void processComplexOrder(ComplexOrderRequest request) {
        // Business logic that might throw business exceptions
        validateOrderRules(request);
        Order order = createOrder(request);
        processPayment(order);
    }
}

Nested Transactions and Propagation

Understanding nested transactions is critical for building robust systems. In some cases, you want a child transaction to succeed regardless of whether the parent transaction succeeds or not—these are often called “autonomous transactions” or “independent transactions.” The solution was to use REQUIRES_NEW propagation for audit operations, creating independent transactions that commit immediately regardless of what happens to the parent transaction. Similarly, for notification services, you typically want notifications to be sent even if the business operation partially fails—users should know that something went wrong.

@Service
public class OrderProcessingService {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private NotificationService notificationService;
    
    @Transactional
    public void processOrderWithNotification(CreateOrderRequest request) {
        // This participates in the existing transaction
        Order order = orderService.processOrder(request);
        
        // This creates a new transaction that commits independently
        notificationService.sendOrderConfirmation(order);
        
        // If something fails here, the order transaction can still commit
        // but the notification might not be sent
    }
}

@Service
public class NotificationService {
    
    // Creates a new transaction - notifications are sent even if 
    // the main order processing fails later
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void sendOrderConfirmation(Order order) {
        NotificationRecord record = new NotificationRecord(
            order.getCustomerId(),
            "Order confirmed: " + order.getId(),
            NotificationType.ORDER_CONFIRMATION
        );
        notificationRepository.save(record);
        
        // Send actual notification asynchronously
        emailService.sendAsync(order.getCustomerEmail(), 
                              "Order Confirmation", 
                              generateOrderEmail(order));
    }
}

Go with GORM: Explicit Transaction Management

Go doesn’t have the luxury of annotations, so transaction management becomes more explicit. This actually has benefits—the transaction boundaries are clearly visible in the code.

Basic GORM Transactions

package services

import (
    "context"
    "fmt"
    "gorm.io/gorm"
)

type OrderService struct {
    db *gorm.DB
}

type Order struct {
    ID          uint   `gorm:"primarykey"`
    CustomerID  string
    TotalAmount int64
    Status      string
    PaymentID   string
    Items       []OrderItem `gorm:"foreignKey:OrderID"`
}

type OrderItem struct {
    ID       uint   `gorm:"primarykey"`
    OrderID  uint
    SKU      string
    Quantity int
    Price    int64
}

// Basic transaction with explicit rollback handling
func (s *OrderService) ProcessOrder(ctx context.Context, request CreateOrderRequest) (*Order, error) {
    tx := s.db.Begin()
    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
            panic(r)
        }
    }()

    order := &Order{
        CustomerID:  request.CustomerID,
        TotalAmount: request.TotalAmount,
        Status:      "PENDING",
    }

    // Save the order
    if err := tx.Create(order).Error; err != nil {
        tx.Rollback()
        return nil, fmt.Errorf("failed to create order: %w", err)
    }

    // Process payment
    paymentID, err := s.processPayment(ctx, tx, order)
    if err != nil {
        tx.Rollback()
        return nil, fmt.Errorf("payment failed: %w", err)
    }

    // Reserve inventory
    if err := s.reserveInventory(ctx, tx, request.Items); err != nil {
        tx.Rollback()
        return nil, fmt.Errorf("inventory reservation failed: %w", err)
    }

    // Update order with payment info
    order.PaymentID = paymentID
    order.Status = "CONFIRMED"
    if err := tx.Save(order).Error; err != nil {
        tx.Rollback()
        return nil, fmt.Errorf("failed to update order: %w", err)
    }

    if err := tx.Commit().Error; err != nil {
        return nil, fmt.Errorf("failed to commit transaction: %w", err)
    }

    return order, nil
}

Functional Transaction Wrapper

To reduce boilerplate, we can create a transaction wrapper:

// TransactionFunc represents a function that runs within a transaction
type TransactionFunc func(tx *gorm.DB) error

// WithTransaction wraps a function in a database transaction
func (s *OrderService) WithTransaction(fn TransactionFunc) error {
    tx := s.db.Begin()
    defer func() {
        if r := recover(); r != nil {
            tx.Rollback()
            panic(r)
        }
    }()

    if err := fn(tx); err != nil {
        tx.Rollback()
        return err
    }

    return tx.Commit().Error
}

// Now our business logic becomes cleaner
func (s *OrderService) ProcessOrderClean(ctx context.Context, request CreateOrderRequest) (*Order, error) {
    var order *Order
    
    err := s.WithTransaction(func(tx *gorm.DB) error {
        order = &Order{
            CustomerID:  request.CustomerID,
            TotalAmount: request.TotalAmount,
            Status:      "PENDING",
        }

        if err := tx.Create(order).Error; err != nil {
            return fmt.Errorf("failed to create order: %w", err)
        }

        paymentID, err := s.processPaymentInTx(ctx, tx, order)
        if err != nil {
            return fmt.Errorf("payment failed: %w", err)
        }

        if err := s.reserveInventoryInTx(ctx, tx, request.Items); err != nil {
            return fmt.Errorf("inventory reservation failed: %w", err)
        }

        order.PaymentID = paymentID
        order.Status = "CONFIRMED"
        
        return tx.Save(order).Error
    })

    return order, err
}

Context-Based Transaction Management

For more sophisticated transaction management, we can use context to pass transactions:

type contextKey string

const txKey contextKey = "transaction"

// WithTransactionContext creates a new context with a transaction
func WithTransactionContext(ctx context.Context, tx *gorm.DB) context.Context {
    return context.WithValue(ctx, txKey, tx)
}

// TxFromContext retrieves a transaction from context
func TxFromContext(ctx context.Context) (*gorm.DB, bool) {
    tx, ok := ctx.Value(txKey).(*gorm.DB)
    return tx, ok
}

// GetDB returns either the transaction from context or the main DB
func (s *OrderService) GetDB(ctx context.Context) *gorm.DB {
    if tx, ok := TxFromContext(ctx); ok {
        return tx
    }
    return s.db
}

// Now services can automatically use transactions when available
func (s *PaymentService) ProcessPayment(ctx context.Context, customerID string, amount int64) (string, error) {
    db := s.GetDB(ctx) // Uses transaction if available
    
    payment := &Payment{
        CustomerID: customerID,
        Amount:     amount,
        Status:     "PROCESSING",
    }
    
    if err := db.Create(payment).Error; err != nil {
        return "", err
    }
    
    // Simulate payment processing
    if amount > 100000 { // Reject large amounts for demo
        payment.Status = "FAILED"
        db.Save(payment)
        return "", fmt.Errorf("payment amount too large")
    }
    
    payment.Status = "COMPLETED"
    payment.TransactionID = generatePaymentID()
    
    if err := db.Save(payment).Error; err != nil {
        return "", err
    }
    
    return payment.TransactionID, nil
}

// Usage with context-based transactions
func (s *OrderService) ProcessOrderWithContext(ctx context.Context, request CreateOrderRequest) (*Order, error) {
    var order *Order
    
    return order, s.WithTransaction(func(tx *gorm.DB) error {
        // Create context with transaction
        txCtx := WithTransactionContext(ctx, tx)
        
        order = &Order{
            CustomerID:  request.CustomerID,
            TotalAmount: request.TotalAmount,
            Status:      "PENDING",
        }

        if err := tx.Create(order).Error; err != nil {
            return err
        }

        // These services will automatically use the transaction
        paymentID, err := s.paymentService.ProcessPayment(txCtx, order.CustomerID, order.TotalAmount)
        if err != nil {
            return err
        }

        if err := s.inventoryService.ReserveItems(txCtx, request.Items); err != nil {
            return err
        }

        order.PaymentID = paymentID
        order.Status = "CONFIRMED"
        
        return tx.Save(order).Error
    })
}

Read-Only and Isolation Control

// Read-only operations can be optimized
func (s *OrderService) GetOrderHistory(ctx context.Context, customerID string) ([]Order, error) {
    var orders []Order
    
    // Use read-only transaction for consistency
    err := s.db.Transaction(func(tx *gorm.DB) error {
        return tx.Raw("SELECT * FROM orders WHERE customer_id = ? ORDER BY created_at DESC", 
                     customerID).Scan(&orders).Error
    }, &sql.TxOptions{ReadOnly: true})
    
    return orders, err
}

// Operations requiring specific isolation levels
func (s *InventoryService) TransferStock(ctx context.Context, fromSKU, toSKU string, quantity int) error {
    return s.db.Transaction(func(tx *gorm.DB) error {
        var fromItem, toItem InventoryItem
        
        // Lock rows to prevent concurrent modifications
        if err := tx.Set("gorm:query_option", "FOR UPDATE").
            Where("sku = ?", fromSKU).First(&fromItem).Error; err != nil {
            return err
        }
        
        if err := tx.Set("gorm:query_option", "FOR UPDATE").
            Where("sku = ?", toSKU).First(&toItem).Error; err != nil {
            return err
        }
        
        if fromItem.Quantity < quantity {
            return fmt.Errorf("insufficient inventory")
        }
        
        fromItem.Quantity -= quantity
        toItem.Quantity += quantity
        
        if err := tx.Save(&fromItem).Error; err != nil {
            return err
        }
        
        return tx.Save(&toItem).Error
        
    }, &sql.TxOptions{Isolation: sql.LevelSerializable})
}

Rust: Custom Transaction Annotations with Macros

Rust doesn’t have runtime annotations like Java, but we can create compile-time macros that provide similar functionality. This approach gives us zero-runtime overhead while maintaining clean syntax.

Building a Transaction Macro System

First, let’s create the macro infrastructure:

// src/transaction/mod.rs
use diesel::prelude::*;
use diesel::result::Error as DieselError;
use std::fmt;

#[derive(Debug)]
pub enum TransactionError {
    Database(DieselError),
    Business(String),
    Validation(String),
}

impl fmt::Display for TransactionError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            TransactionError::Database(e) => write!(f, "Database error: {}", e),
            TransactionError::Business(e) => write!(f, "Business error: {}", e),
            TransactionError::Validation(e) => write!(f, "Validation error: {}", e),
        }
    }
}

impl std::error::Error for TransactionError {}

pub type TransactionResult<T> = Result<T, TransactionError>;

// Macro for creating transactional functions
#[macro_export]
macro_rules! transactional {
    (
        fn $name:ident($($param:ident: $param_type:ty),*) -> $return_type:ty {
            $($body:tt)*
        }
    ) => {
        fn $name(conn: &mut PgConnection, $($param: $param_type),*) -> TransactionResult<$return_type> {
            conn.transaction::<$return_type, TransactionError, _>(|conn| {
                $($body)*
            })
        }
    };
}

// Macro for read-only transactions
#[macro_export]
macro_rules! read_only {
    (
        fn $name:ident($($param:ident: $param_type:ty),*) -> $return_type:ty {
            $($body:tt)*
        }
    ) => {
        fn $name(conn: &mut PgConnection, $($param: $param_type),*) -> TransactionResult<$return_type> {
            // In a real implementation, we'd set READ ONLY mode
            conn.transaction::<$return_type, TransactionError, _>(|conn| {
                $($body)*
            })
        }
    };
}

Using the Transaction Macros

// src/services/order_service.rs
use diesel::prelude::*;
use crate::transaction::*;
use crate::models::*;
use crate::schema::orders::dsl::*;

pub struct OrderService;

impl OrderService {
    // Transactional order processing with automatic rollback
    transactional! {
        fn process_order(request: CreateOrderRequest) -> Order {
            // Create the order
            let new_order = NewOrder {
                customer_id: &request.customer_id,
                total_amount: request.total_amount,
                status: "PENDING",
            };
            
            let order: Order = diesel::insert_into(orders)
                .values(&new_order)
                .get_result(conn)
                .map_err(TransactionError::Database)?;

            // Process payment
            let payment_id = Self::process_payment_internal(conn, &order)
                .map_err(|e| TransactionError::Business(format!("Payment failed: {}", e)))?;

            // Reserve inventory
            Self::reserve_inventory_internal(conn, &request.items)
                .map_err(|e| TransactionError::Business(format!("Inventory reservation failed: {}", e)))?;

            // Update order with payment info
            let updated_order = diesel::update(orders.filter(id.eq(order.id)))
                .set((
                    payment_id.eq(&payment_id),
                    status.eq("CONFIRMED"),
                ))
                .get_result(conn)
                .map_err(TransactionError::Database)?;

            Ok(updated_order)
        }
    }

    // Read-only transaction for queries
    read_only! {
        fn get_order_history(customer_id: String) -> Vec<Order> {
            let order_list = orders
                .filter(customer_id.eq(&customer_id))
                .order(created_at.desc())
                .load::<Order>(conn)
                .map_err(TransactionError::Database)?;
            
            Ok(order_list)
        }
    }

    // Helper functions that work within existing transactions
    fn process_payment_internal(conn: &mut PgConnection, order: &Order) -> Result<String, String> {
        use crate::schema::payments::dsl::*;
        
        let new_payment = NewPayment {
            customer_id: &order.customer_id,
            order_id: order.id,
            amount: order.total_amount,
            status: "PROCESSING",
        };
        
        let payment: Payment = diesel::insert_into(payments)
            .values(&new_payment)
            .get_result(conn)
            .map_err(|e| format!("Payment creation failed: {}", e))?;
        
        // Simulate payment processing logic
        if order.total_amount > 100000 {
            diesel::update(payments.filter(id.eq(payment.id)))
                .set(status.eq("FAILED"))
                .execute(conn)
                .map_err(|e| format!("Payment update failed: {}", e))?;
            
            return Err("Payment amount too large".to_string());
        }
        
        let transaction_id = format!("txn_{}", uuid::Uuid::new_v4());
        
        diesel::update(payments.filter(id.eq(payment.id)))
            .set((
                status.eq("COMPLETED"),
                transaction_id.eq(&transaction_id),
            ))
            .execute(conn)
            .map_err(|e| format!("Payment finalization failed: {}", e))?;
        
        Ok(transaction_id)
    }

    fn reserve_inventory_internal(conn: &mut PgConnection, items: &[OrderItemRequest]) -> Result<(), String> {
        use crate::schema::inventory::dsl::*;
        
        for item in items {
            // Lock the inventory row for update
            let mut inventory_item: InventoryItem = inventory
                .filter(sku.eq(&item.sku))
                .for_update()
                .first(conn)
                .map_err(|e| format!("Inventory lookup failed: {}", e))?;
            
            if inventory_item.quantity < item.quantity {
                return Err(format!("Insufficient inventory for SKU: {}", item.sku));
            }
            
            inventory_item.quantity -= item.quantity;
            
            diesel::update(inventory.filter(sku.eq(&item.sku)))
                .set(quantity.eq(inventory_item.quantity))
                .execute(conn)
                .map_err(|e| format!("Inventory update failed: {}", e))?;
        }
        
        Ok(())
    }
}

Advanced Transaction Features in Rust

// More sophisticated transaction management with isolation levels
#[macro_export]
macro_rules! serializable_transaction {
    (
        fn $name:ident($($param:ident: $param_type:ty),*) -> $return_type:ty {
            $($body:tt)*
        }
    ) => {
        fn $name(conn: &mut PgConnection, $($param: $param_type),*) -> TransactionResult<$return_type> {
            // Set serializable isolation level
            conn.batch_execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE")
                .map_err(TransactionError::Database)?;
                
            conn.transaction::<$return_type, TransactionError, _>(|conn| {
                $($body)*
            })
        }
    };
}

// Usage for operations requiring strict consistency
impl InventoryService {
    serializable_transaction! {
        fn transfer_stock(from_sku: String, to_sku: String, quantity: i32) -> (InventoryItem, InventoryItem) {
            use crate::schema::inventory::dsl::*;
            
            // Lock both items in consistent order to prevent deadlocks
            let (first_sku, second_sku) = if from_sku < to_sku {
                (&from_sku, &to_sku)
            } else {
                (&to_sku, &from_sku)
            };
            
            let mut from_item: InventoryItem = inventory
                .filter(sku.eq(first_sku))
                .for_update()
                .first(conn)
                .map_err(TransactionError::Database)?;
                
            let mut to_item: InventoryItem = inventory
                .filter(sku.eq(second_sku))
                .for_update()
                .first(conn)
                .map_err(TransactionError::Database)?;
            
            // Ensure we have the right items
            if from_item.sku != from_sku {
                std::mem::swap(&mut from_item, &mut to_item);
            }
            
            if from_item.quantity < quantity {
                return Err(TransactionError::Business(
                    "Insufficient inventory for transfer".to_string()
                ));
            }
            
            from_item.quantity -= quantity;
            to_item.quantity += quantity;
            
            let updated_from = diesel::update(inventory.filter(sku.eq(&from_sku)))
                .set(quantity.eq(from_item.quantity))
                .get_result(conn)
                .map_err(TransactionError::Database)?;
                
            let updated_to = diesel::update(inventory.filter(sku.eq(&to_sku)))
                .set(quantity.eq(to_item.quantity))
                .get_result(conn)
                .map_err(TransactionError::Database)?;
            
            Ok((updated_from, updated_to))
        }
    }
}

Async Transaction Support

For modern Rust applications using async/await:

// src/transaction/async_transaction.rs
use diesel_async::{AsyncPgConnection, AsyncConnection};
use diesel_async::pooled_connection::bb8::Pool;

#[macro_export]
macro_rules! async_transactional {
    (
        async fn $name:ident($($param:ident: $param_type:ty),*) -> $return_type:ty {
            $($body:tt)*
        }
    ) => {
        async fn $name(pool: &Pool<AsyncPgConnection>, $($param: $param_type),*) -> TransactionResult<$return_type> {
            let mut conn = pool.get().await
                .map_err(|e| TransactionError::Database(e.into()))?;
                
            conn.transaction::<$return_type, TransactionError, _>(|conn| {
                Box::pin(async move {
                    $($body)*
                })
            }).await
        }
    };
}

// Usage with async operations
impl OrderService {
    async_transactional! {
        async fn process_order_async(request: CreateOrderRequest) -> Order {
            // All the same logic as before, but with async/await support
            let new_order = NewOrder {
                customer_id: &request.customer_id,
                total_amount: request.total_amount,
                status: "PENDING",
            };
            
            let order: Order = diesel::insert_into(orders)
                .values(&new_order)
                .get_result(conn)
                .await
                .map_err(TransactionError::Database)?;

            // Process payment asynchronously
            let payment_id = Self::process_payment_async(conn, &order).await
                .map_err(|e| TransactionError::Business(format!("Payment failed: {}", e)))?;

            // Continue with order processing...
            
            Ok(order)
        }
    }
}

Multi-Database Transactions: Two-Phase Commit

I used J2EE and XA transactions extensively in the late 1990s and early 2000s when these standards were being defined by Sun Microsystems with major contributions from IBM, Oracle, and BEA Systems. While these technologies provided strong consistency guarantees, they added enormous complexity to applications and resulted in significant performance issues. The fundamental problem with 2PC is that it’s a blocking protocol—if the transaction coordinator fails during the commit phase, all participating databases remain locked until the coordinator recovers. I’ve seen production systems grind to a halt for hours because of coordinator failures. There are also edge cases that 2PC simply cannot handle, such as network partitions between the coordinator and participants, which led to the development of three-phase commit (3PC). In most cases, you should avoid distributed transactions entirely and use patterns like SAGA, event sourcing, or careful service boundaries instead.

Java XA Transactions

@Configuration
@EnableTransactionManagement
public class XATransactionConfig {
    
    @Bean
    @Primary
    public DataSource orderDataSource() {
        MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setURL("jdbc:mysql://localhost:3306/orders");
        xaDataSource.setUser("orders_user");
        xaDataSource.setPassword("orders_pass");
        return xaDataSource;
    }
    
    @Bean
    public DataSource inventoryDataSource() {
        MysqlXADataSource xaDataSource = new MysqlXADataSource();
        xaDataSource.setURL("jdbc:mysql://localhost:3306/inventory");
        xaDataSource.setUser("inventory_user");
        xaDataSource.setPassword("inventory_pass");
        return xaDataSource;
    }
    
    @Bean
    public JtaTransactionManager jtaTransactionManager() {
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setTransactionManager(atomikosTransactionManager());
        jtaTransactionManager.setUserTransaction(atomikosUserTransaction());
        return jtaTransactionManager;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionManager atomikosTransactionManager() {
        UserTransactionManager transactionManager = new UserTransactionManager();
        transactionManager.setForceShutdown(false);
        return transactionManager;
    }
    
    @Bean
    public UserTransactionImp atomikosUserTransaction() throws SystemException {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(300);
        return userTransactionImp;
    }
}

@Service
public class DistributedOrderService {
    
    @Autowired
    @Qualifier("orderDataSource")
    private DataSource orderDataSource;
    
    @Autowired
    @Qualifier("inventoryDataSource")
    private DataSource inventoryDataSource;
    
    // XA transaction spans both databases
    @Transactional
    public void processDistributedOrder(CreateOrderRequest request) {
        // Operations on orders database
        try (Connection orderConn = orderDataSource.getConnection()) {
            PreparedStatement orderStmt = orderConn.prepareStatement(
                "INSERT INTO orders (customer_id, total_amount, status) VALUES (?, ?, ?)"
            );
            orderStmt.setString(1, request.getCustomerId());
            orderStmt.setBigDecimal(2, request.getTotalAmount());
            orderStmt.setString(3, "PENDING");
            orderStmt.executeUpdate();
        }
        
        // Operations on inventory database
        try (Connection inventoryConn = inventoryDataSource.getConnection()) {
            for (OrderItem item : request.getItems()) {
                PreparedStatement inventoryStmt = inventoryConn.prepareStatement(
                    "UPDATE inventory SET quantity = quantity - ? WHERE sku = ? AND quantity >= ?"
                );
                inventoryStmt.setInt(1, item.getQuantity());
                inventoryStmt.setString(2, item.getSku());
                inventoryStmt.setInt(3, item.getQuantity());
                
                int updatedRows = inventoryStmt.executeUpdate();
                if (updatedRows == 0) {
                    throw new InsufficientInventoryException("Not enough inventory for " + item.getSku());
                }
            }
        }
        
        // If we get here, both database operations succeeded
        // The XA transaction manager will coordinate the commit across both databases
    }
}

Go Distributed Transactions

Go doesn’t have built-in distributed transaction support, so we need to implement 2PC manually:

package distributed

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "time"
    
    "github.com/google/uuid"
)

type TransactionManager struct {
    resources []XAResource
}

type XAResource interface {
    Prepare(ctx context.Context, txID string) error
    Commit(ctx context.Context, txID string) error
    Rollback(ctx context.Context, txID string) error
}

type DatabaseResource struct {
    db   *sql.DB
    name string
}

func (r *DatabaseResource) Prepare(ctx context.Context, txID string) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    
    // Store transaction for later commit/rollback
    // In production, you'd need a proper transaction store
    transactionStore[txID+"-"+r.name] = tx
    
    return nil
}

func (r *DatabaseResource) Commit(ctx context.Context, txID string) error {
    tx, exists := transactionStore[txID+"-"+r.name]
    if !exists {
        return fmt.Errorf("transaction not found: %s", txID)
    }
    
    err := tx.Commit()
    delete(transactionStore, txID+"-"+r.name)
    return err
}

func (r *DatabaseResource) Rollback(ctx context.Context, txID string) error {
    tx, exists := transactionStore[txID+"-"+r.name]
    if !exists {
        return nil // Already rolled back
    }
    
    err := tx.Rollback()
    delete(transactionStore, txID+"-"+r.name)
    return err
}

// Global transaction store (in production, use Redis or similar)
var transactionStore = make(map[string]*sql.Tx)

func (tm *TransactionManager) ExecuteDistributedTransaction(ctx context.Context, fn func() error) error {
    txID := uuid.New().String()
    
    // Phase 1: Prepare all resources
    for _, resource := range tm.resources {
        if err := resource.Prepare(ctx, txID); err != nil {
            // Rollback all prepared resources
            tm.rollbackAll(ctx, txID)
            return fmt.Errorf("prepare failed: %w", err)
        }
    }
    
    // Execute business logic
    if err := fn(); err != nil {
        tm.rollbackAll(ctx, txID)
        return fmt.Errorf("business logic failed: %w", err)
    }
    
    // Phase 2: Commit all resources
    for _, resource := range tm.resources {
        if err := resource.Commit(ctx, txID); err != nil {
            log.Printf("Commit failed for txID %s: %v", txID, err)
            // In production, you'd need a recovery mechanism here
            return fmt.Errorf("commit failed: %w", err)
        }
    }
    
    return nil
}

func (tm *TransactionManager) rollbackAll(ctx context.Context, txID string) {
    for _, resource := range tm.resources {
        if err := resource.Rollback(ctx, txID); err != nil {
            log.Printf("Rollback failed for txID %s: %v", txID, err)
        }
    }
}

// Usage example
func ProcessDistributedOrder(ctx context.Context, request CreateOrderRequest) error {
    orderDB, _ := sql.Open("mysql", "orders_connection_string")
    inventoryDB, _ := sql.Open("mysql", "inventory_connection_string")
    
    tm := &TransactionManager{
        resources: []XAResource{
            &DatabaseResource{db: orderDB, name: "orders"},
            &DatabaseResource{db: inventoryDB, name: "inventory"},
        },
    }
    
    return tm.ExecuteDistributedTransaction(ctx, func() error {
        // Business logic goes here - use the prepared transactions
        orderTx := transactionStore[txID+"-orders"]
        inventoryTx := transactionStore[txID+"-inventory"]
        
        // Create order
        _, err := orderTx.Exec(
            "INSERT INTO orders (customer_id, total_amount, status) VALUES (?, ?, ?)",
            request.CustomerID, request.TotalAmount, "PENDING",
        )
        if err != nil {
            return err
        }
        
        // Update inventory
        for _, item := range request.Items {
            result, err := inventoryTx.Exec(
                "UPDATE inventory SET quantity = quantity - ? WHERE sku = ? AND quantity >= ?",
                item.Quantity, item.SKU, item.Quantity,
            )
            if err != nil {
                return err
            }
            
            rowsAffected, _ := result.RowsAffected()
            if rowsAffected == 0 {
                return fmt.Errorf("insufficient inventory for %s", item.SKU)
            }
        }
        
        return nil
    })
}

Concurrency Control: Optimistic vs Pessimistic

Understanding when to use optimistic versus pessimistic concurrency control can make or break your application’s performance under load.

Pessimistic Locking: “Better Safe Than Sorry”

// Java/JPA pessimistic locking
@Service
public class AccountService {
    
    @Transactional
    public void transferFunds(String fromAccountId, String toAccountId, BigDecimal amount) {
        // Lock accounts in consistent order to prevent deadlocks
        String firstId = fromAccountId.compareTo(toAccountId) < 0 ? fromAccountId : toAccountId;
        String secondId = fromAccountId.compareTo(toAccountId) < 0 ? toAccountId : fromAccountId;
        
        Account firstAccount = accountRepository.findById(firstId, LockModeType.PESSIMISTIC_WRITE);
        Account secondAccount = accountRepository.findById(secondId, LockModeType.PESSIMISTIC_WRITE);
        
        Account fromAccount = fromAccountId.equals(firstId) ? firstAccount : secondAccount;
        Account toAccount = fromAccountId.equals(firstId) ? secondAccount : firstAccount;
        
        if (fromAccount.getBalance().compareTo(amount) < 0) {
            throw new InsufficientFundsException();
        }
        
        fromAccount.setBalance(fromAccount.getBalance().subtract(amount));
        toAccount.setBalance(toAccount.getBalance().add(amount));
        
        accountRepository.save(fromAccount);
        accountRepository.save(toAccount);
    }
}
// Go pessimistic locking with GORM
func (s *AccountService) TransferFunds(ctx context.Context, fromAccountID, toAccountID string, amount int64) error {
    return s.WithTransaction(func(tx *gorm.DB) error {
        var fromAccount, toAccount Account
        
        // Lock accounts in consistent order
        firstID, secondID := fromAccountID, toAccountID
        if fromAccountID > toAccountID {
            firstID, secondID = toAccountID, fromAccountID
        }
        
        // Lock first account
        if err := tx.Set("gorm:query_option", "FOR UPDATE").
            Where("id = ?", firstID).First(&fromAccount).Error; err != nil {
            return err
        }
        
        // Lock second account
        if err := tx.Set("gorm:query_option", "FOR UPDATE").
            Where("id = ?", secondID).First(&toAccount).Error; err != nil {
            return err
        }
        
        // Ensure we have the correct accounts
        if fromAccount.ID != fromAccountID {
            fromAccount, toAccount = toAccount, fromAccount
        }
        
        if fromAccount.Balance < amount {
            return fmt.Errorf("insufficient funds")
        }
        
        fromAccount.Balance -= amount
        toAccount.Balance += amount
        
        if err := tx.Save(&fromAccount).Error; err != nil {
            return err
        }
        
        return tx.Save(&toAccount).Error
    })
}

Optimistic Locking: “Hope for the Best, Handle the Rest”

// JPA optimistic locking with version fields
@Entity
public class Account {
    @Id
    private String id;
    
    private BigDecimal balance;
    
    @Version
    private Long version; // JPA automatically manages this
    
    // getters and setters...
}

@Service
public class OptimisticAccountService {
    
    @Transactional
    @Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 3)
    public void transferFunds(String fromAccountId, String toAccountId, BigDecimal amount) {
        Account fromAccount = accountRepository.findById(fromAccountId);
        Account toAccount = accountRepository.findById(toAccountId);
        
        if (fromAccount.getBalance().compareTo(amount) < 0) {
            throw new InsufficientFundsException();
        }
        
        fromAccount.setBalance(fromAccount.getBalance().subtract(amount));
        toAccount.setBalance(toAccount.getBalance().add(amount));
        
        // If either account was modified by another transaction,
        // OptimisticLockingFailureException will be thrown
        accountRepository.save(fromAccount);
        accountRepository.save(toAccount);
    }
}
// Rust optimistic locking with version fields
#[derive(Queryable, Identifiable, AsChangeset)]
#[diesel(table_name = accounts)]
pub struct Account {
    pub id: String,
    pub balance: i64,
    pub version: i32,
}

impl AccountService {
    transactional! {
        fn transfer_funds_optimistic(from_account_id: String, to_account_id: String, amount: i64) -> () {
            use crate::schema::accounts::dsl::*;
            
            // Read current versions
            let from_account: Account = accounts
                .filter(id.eq(&from_account_id))
                .first(conn)
                .map_err(TransactionError::Database)?;
                
            let to_account: Account = accounts
                .filter(id.eq(&to_account_id))
                .first(conn)
                .map_err(TransactionError::Database)?;
            
            if from_account.balance < amount {
                return Err(TransactionError::Business("Insufficient funds".to_string()));
            }
            
            // Update with version check
            let from_updated = diesel::update(
                accounts
                    .filter(id.eq(&from_account_id))
                    .filter(version.eq(from_account.version))
            )
            .set((
                balance.eq(from_account.balance - amount),
                version.eq(from_account.version + 1)
            ))
            .execute(conn)
            .map_err(TransactionError::Database)?;
            
            if from_updated == 0 {
                return Err(TransactionError::Business("Concurrent modification detected".to_string()));
            }
            
            let to_updated = diesel::update(
                accounts
                    .filter(id.eq(&to_account_id))
                    .filter(version.eq(to_account.version))
            )
            .set((
                balance.eq(to_account.balance + amount),
                version.eq(to_account.version + 1)
            ))
            .execute(conn)
            .map_err(TransactionError::Database)?;
            
            if to_updated == 0 {
                return Err(TransactionError::Business("Concurrent modification detected".to_string()));
            }
            
            Ok(())
        }
    }
}

Distributed Transactions: SAGA Pattern

When 2PC becomes too heavyweight or you’re dealing with services that don’t support XA transactions, the SAGA pattern provides an elegant alternative using compensating transactions.

Command Pattern for Compensating Transactions

I initially applied this design pattern at a travel booking company in mid 2000 where we had to integrate with numerous external vendors—airline companies, hotels, car rental agencies, insurance providers, and activity booking services. Each vendor had different APIs, response times, and failure modes, but we needed to present customers with a single, atomic booking experience. The command pattern worked exceptionally well for this scenario. When a customer booked a vacation package, we’d execute a chain of commands: reserve flight, book hotel, rent car, purchase insurance. If any step failed midway through, we could automatically issue compensating transactions to undo the previous successful reservations. This approach delivered both excellent performance (operations could run in parallel where possible) and high reliability.

// Base interfaces for SAGA operations
public interface SagaCommand<T> {
    T execute() throws Exception;
    void compensate(T result) throws Exception;
}

public class SagaOrchestrator {
    
    public class SagaExecution<T> {
        private final SagaCommand<T> command;
        private T result;
        private boolean executed = false;
        
        public SagaExecution(SagaCommand<T> command) {
            this.command = command;
        }
        
        public T execute() throws Exception {
            result = command.execute();
            executed = true;
            return result;
        }
        
        public void compensate() throws Exception {
            if (executed && result != null) {
                command.compensate(result);
            }
        }
    }
    
    private final List<SagaExecution<?>> executions = new ArrayList<>();
    
    public <T> T execute(SagaCommand<T> command) throws Exception {
        SagaExecution<T> execution = new SagaExecution<>(command);
        executions.add(execution);
        return execution.execute();
    }
    
    public void compensateAll() {
        // Compensate in reverse order
        for (int i = executions.size() - 1; i >= 0; i--) {
            try {
                executions.get(i).compensate();
            } catch (Exception e) {
                log.error("Compensation failed", e);
                // In production, you'd need dead letter queue handling
            }
        }
    }
}

// Concrete command implementations
public class CreateOrderCommand implements SagaCommand<Order> {
    private final OrderService orderService;
    private final CreateOrderRequest request;
    
    public CreateOrderCommand(OrderService orderService, CreateOrderRequest request) {
        this.orderService = orderService;
        this.request = request;
    }
    
    @Override
    public Order execute() throws Exception {
        return orderService.createOrder(request);
    }
    
    @Override
    public void compensate(Order order) throws Exception {
        orderService.cancelOrder(order.getId());
    }
}

public class ProcessPaymentCommand implements SagaCommand<Payment> {
    private final PaymentService paymentService;
    private final String customerId;
    private final BigDecimal amount;
    
    @Override
    public Payment execute() throws Exception {
        return paymentService.processPayment(customerId, amount);
    }
    
    @Override
    public void compensate(Payment payment) throws Exception {
        paymentService.refundPayment(payment.getId());
    }
}

public class ReserveInventoryCommand implements SagaCommand<List<InventoryReservation>> {
    private final InventoryService inventoryService;
    private final List<OrderItem> items;
    
    @Override
    public List<InventoryReservation> execute() throws Exception {
        return inventoryService.reserveItems(items);
    }
    
    @Override
    public void compensate(List<InventoryReservation> reservations) throws Exception {
        for (InventoryReservation reservation : reservations) {
            inventoryService.releaseReservation(reservation.getId());
        }
    }
}

// Usage in service
@Service
public class SagaOrderService {
    
    public void processOrderWithSaga(CreateOrderRequest request) throws Exception {
        SagaOrchestrator saga = new SagaOrchestrator();
        
        try {
            // Execute commands in sequence
            Order order = saga.execute(new CreateOrderCommand(orderService, request));
            
            Payment payment = saga.execute(new ProcessPaymentCommand(
                paymentService, order.getCustomerId(), order.getTotalAmount()
            ));
            
            List<InventoryReservation> reservations = saga.execute(
                new ReserveInventoryCommand(inventoryService, request.getItems())
            );
            
            // If we get here, everything succeeded
            orderService.confirmOrder(order.getId());
            
        } catch (Exception e) {
            // Compensate all executed commands
            saga.compensateAll();
            throw e;
        }
    }
}

Persistent SAGA with State Machine

// SAGA state management
@Entity
public class SagaTransaction {
    @Id
    private String id;
    
    @Enumerated(EnumType.STRING)
    private SagaStatus status;
    
    private String currentStep;
    
    @ElementCollection
    private List<String> completedSteps = new ArrayList<>();
    
    @ElementCollection
    private List<String> compensatedSteps = new ArrayList<>();
    
    private String contextData; // JSON serialized context
    
    // getters/setters...
}

public enum SagaStatus {
    STARTED, IN_PROGRESS, COMPLETED, COMPENSATING, COMPENSATED, FAILED
}

@Component
public class PersistentSagaOrchestrator {
    
    @Autowired
    private SagaTransactionRepository sagaRepo;
    
    @Transactional
    public void executeSaga(String sagaId, List<SagaStep> steps) {
        SagaTransaction saga = sagaRepo.findById(sagaId)
            .orElse(new SagaTransaction(sagaId));
        
        try {
            for (SagaStep step : steps) {
                if (saga.getCompletedSteps().contains(step.getName())) {
                    continue; // Already completed
                }
                
                saga.setCurrentStep(step.getName());
                saga.setStatus(SagaStatus.IN_PROGRESS);
                sagaRepo.save(saga);
                
                // Execute step
                step.execute();
                
                saga.getCompletedSteps().add(step.getName());
                sagaRepo.save(saga);
            }
            
            saga.setStatus(SagaStatus.COMPLETED);
            sagaRepo.save(saga);
            
        } catch (Exception e) {
            compensateSaga(sagaId);
            throw e;
        }
    }
    
    @Transactional
    public void compensateSaga(String sagaId) {
        SagaTransaction saga = sagaRepo.findById(sagaId)
            .orElseThrow(() -> new IllegalArgumentException("SAGA not found"));
        
        saga.setStatus(SagaStatus.COMPENSATING);
        sagaRepo.save(saga);
        
        // Compensate in reverse order
        List<String> stepsToCompensate = new ArrayList<>(saga.getCompletedSteps());
        Collections.reverse(stepsToCompensate);
        
        for (String stepName : stepsToCompensate) {
            if (saga.getCompensatedSteps().contains(stepName)) {
                continue;
            }
            
            try {
                SagaStep step = findStepByName(stepName);
                step.compensate();
                saga.getCompensatedSteps().add(stepName);
                sagaRepo.save(saga);
            } catch (Exception e) {
                log.error("Compensation failed for step: " + stepName, e);
                saga.setStatus(SagaStatus.FAILED);
                sagaRepo.save(saga);
                return;
            }
        }
        
        saga.setStatus(SagaStatus.COMPENSATED);
        sagaRepo.save(saga);
    }
}

Go SAGA Implementation

// SAGA state machine in Go
package saga

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    
    "gorm.io/gorm"
)

type SagaStatus string

const (
    StatusStarted     SagaStatus = "STARTED"
    StatusInProgress  SagaStatus = "IN_PROGRESS"
    StatusCompleted   SagaStatus = "COMPLETED"
    StatusCompensating SagaStatus = "COMPENSATING"
    StatusCompensated SagaStatus = "COMPENSATED"
    StatusFailed      SagaStatus = "FAILED"
)

type SagaTransaction struct {
    ID               string     `gorm:"primarykey"`
    Status           SagaStatus
    CurrentStep      string
    CompletedSteps   string // JSON array
    CompensatedSteps string // JSON array
    ContextData      string // JSON context
    CreatedAt        time.Time
    UpdatedAt        time.Time
}

type SagaStep interface {
    Name() string
    Execute(ctx context.Context, sagaContext map[string]interface{}) error
    Compensate(ctx context.Context, sagaContext map[string]interface{}) error
}

type SagaOrchestrator struct {
    db *gorm.DB
}

func NewSagaOrchestrator(db *gorm.DB) *SagaOrchestrator {
    return &SagaOrchestrator{db: db}
}

func (o *SagaOrchestrator) ExecuteSaga(ctx context.Context, sagaID string, steps []SagaStep, context map[string]interface{}) error {
    return o.db.Transaction(func(tx *gorm.DB) error {
        var saga SagaTransaction
        if err := tx.First(&saga, "id = ?", sagaID).Error; err != nil {
            if err == gorm.ErrRecordNotFound {
                // Create new saga
                contextJSON, _ := json.Marshal(context)
                saga = SagaTransaction{
                    ID:          sagaID,
                    Status:      StatusStarted,
                    ContextData: string(contextJSON),
                }
                if err := tx.Create(&saga).Error; err != nil {
                    return err
                }
            } else {
                return err
            }
        }
        
        // Parse completed steps
        var completedSteps []string
        if saga.CompletedSteps != "" {
            json.Unmarshal([]byte(saga.CompletedSteps), &completedSteps)
        }
        
        completedMap := make(map[string]bool)
        for _, step := range completedSteps {
            completedMap[step] = true
        }
        
        // Execute steps
        for _, step := range steps {
            if completedMap[step.Name()] {
                continue // Already completed
            }
            
            saga.CurrentStep = step.Name()
            saga.Status = StatusInProgress
            if err := tx.Save(&saga).Error; err != nil {
                return err
            }
            
            // Parse saga context
            var sagaContext map[string]interface{}
            json.Unmarshal([]byte(saga.ContextData), &sagaContext)
            
            // Execute step
            if err := step.Execute(ctx, sagaContext); err != nil {
                // Start compensation
                return o.compensateSaga(ctx, tx, sagaID, steps[:len(completedSteps)])
            }
            
            // Mark step as completed
            completedSteps = append(completedSteps, step.Name())
            completedJSON, _ := json.Marshal(completedSteps)
            saga.CompletedSteps = string(completedJSON)
            
            // Update context if modified
            updatedContext, _ := json.Marshal(sagaContext)
            saga.ContextData = string(updatedContext)
            
            if err := tx.Save(&saga).Error; err != nil {
                return err
            }
        }
        
        saga.Status = StatusCompleted
        return tx.Save(&saga).Error
    })
}

func (o *SagaOrchestrator) compensateSaga(ctx context.Context, tx *gorm.DB, sagaID string, completedSteps []SagaStep) error {
    var saga SagaTransaction
    if err := tx.First(&saga, "id = ?", sagaID).Error; err != nil {
        return err
    }
    
    saga.Status = StatusCompensating
    if err := tx.Save(&saga).Error; err != nil {
        return err
    }
    
    // Parse compensated steps
    var compensatedSteps []string
    if saga.CompensatedSteps != "" {
        json.Unmarshal([]byte(saga.CompensatedSteps), &compensatedSteps)
    }
    
    compensatedMap := make(map[string]bool)
    for _, step := range compensatedSteps {
        compensatedMap[step] = true
    }
    
    // Compensate in reverse order
    for i := len(completedSteps) - 1; i >= 0; i-- {
        step := completedSteps[i]
        
        if compensatedMap[step.Name()] {
            continue
        }
        
        var sagaContext map[string]interface{}
        json.Unmarshal([]byte(saga.ContextData), &sagaContext)
        
        if err := step.Compensate(ctx, sagaContext); err != nil {
            saga.Status = StatusFailed
            tx.Save(&saga)
            return fmt.Errorf("compensation failed for step %s: %w", step.Name(), err)
        }
        
        compensatedSteps = append(compensatedSteps, step.Name())
        compensatedJSON, _ := json.Marshal(compensatedSteps)
        saga.CompensatedSteps = string(compensatedJSON)
        
        if err := tx.Save(&saga).Error; err != nil {
            return err
        }
    }
    
    saga.Status = StatusCompensated
    return tx.Save(&saga).Error
}

// Concrete step implementations
type CreateOrderStep struct {
    orderService *OrderService
    request      CreateOrderRequest
}

func (s *CreateOrderStep) Name() string {
    return "CREATE_ORDER"
}

func (s *CreateOrderStep) Execute(ctx context.Context, sagaContext map[string]interface{}) error {
    order, err := s.orderService.CreateOrder(ctx, s.request)
    if err != nil {
        return err
    }
    
    // Store order ID in context for later steps
    sagaContext["orderId"] = order.ID
    return nil
}

func (s *CreateOrderStep) Compensate(ctx context.Context, sagaContext map[string]interface{}) error {
    if orderID, exists := sagaContext["orderId"]; exists {
        return s.orderService.CancelOrder(ctx, orderID.(uint))
    }
    return nil
}

// Usage
func ProcessOrderWithSaga(ctx context.Context, orchestrator *SagaOrchestrator, request CreateOrderRequest) error {
    sagaID := uuid.New().String()
    
    steps := []SagaStep{
        &CreateOrderStep{orderService, request},
        &ProcessPaymentStep{paymentService, request.CustomerID, request.TotalAmount},
        &ReserveInventoryStep{inventoryService, request.Items},
    }
    
    context := map[string]interface{}{
        "customerId": request.CustomerID,
        "requestId":  request.RequestID,
    }
    
    return orchestrator.ExecuteSaga(ctx, sagaID, steps, context)
}

The Dual-Write Problem: Database + Events

One of the most insidious transaction problems occurs when you need to both update the database and publish an event. I’ve debugged countless production issues where customers received order confirmations but no order existed in the database, or orders were created but notification events never fired.

The Anti-Pattern: Sequential Operations

// THIS IS FUNDAMENTALLY BROKEN - DON'T DO THIS
@Service
public class BrokenOrderService {
    
    @Transactional
    public void processOrder(CreateOrderRequest request) {
        Order order = orderRepository.save(new Order(request));
        
        // DANGER: Event published outside transaction boundary
        eventPublisher.publishEvent(new OrderCreatedEvent(order));
        
        // What if this line throws an exception?
        // Event is already published but transaction will rollback!
    }
    
    // ALSO BROKEN: Event first, then database
    @Transactional  
    public void processOrderEventFirst(CreateOrderRequest request) {
        Order order = new Order(request);
        
        // DANGER: Event published before persistence
        eventPublisher.publishEvent(new OrderCreatedEvent(order));
        
        // What if database save fails?
        // Event consumers will query for order that doesn't exist!
        orderRepository.save(order);
    }
}

Solution 1: Transactional Outbox Pattern

I have used Outbox pattern in a number of applications especially for sending notifications to users where instead of directly sending an event to a queue, the messages are stored in the database and then relayed to external service like Apple Push Notification Service (APNs) or Google Push Notification Service (FCM).

// Outbox event entity
@Entity
@Table(name = "outbox_events")
public class OutboxEvent {
    @Id
    private String id;
    
    @Column(name = "event_type")
    private String eventType;
    
    @Column(name = "payload", columnDefinition = "TEXT")
    private String payload;
    
    @Column(name = "created_at")
    private Instant createdAt;
    
    @Column(name = "processed_at")
    private Instant processedAt;
    
    @Enumerated(EnumType.STRING)
    private OutboxStatus status;
    
    // constructors, getters, setters...
}

public enum OutboxStatus {
    PENDING, PROCESSED, FAILED
}

// Outbox repository
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
    @Query("SELECT e FROM OutboxEvent e WHERE e.status = :status ORDER BY e.createdAt ASC")
    List<OutboxEvent> findByStatusOrderByCreatedAt(@Param("status") OutboxStatus status);
    
    @Modifying
    @Query("UPDATE OutboxEvent e SET e.status = :status, e.processedAt = :processedAt WHERE e.id = :id")
    void updateStatus(@Param("id") String id, @Param("status") OutboxStatus status, @Param("processedAt") Instant processedAt);
}

// Corrected order service using outbox
@Service
public class TransactionalOrderService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private OutboxEventRepository outboxRepository;
    
    @Transactional
    public void processOrder(CreateOrderRequest request) {
        // 1. Process business logic
        Order order = new Order(request);
        order = orderRepository.save(order);
        
        // 2. Store event in same transaction
        OutboxEvent event = new OutboxEvent(
            UUID.randomUUID().toString(),
            "OrderCreated",
            serializeEvent(new OrderCreatedEvent(order)),
            Instant.now(),
            OutboxStatus.PENDING
        );
        outboxRepository.save(event);
        
        // Both order and event are committed atomically!
    }
    
    private String serializeEvent(Object event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Event serialization failed", e);
        }
    }
}

// Event relay service
@Component
public class OutboxEventRelay {
    
    @Autowired
    private OutboxEventRepository outboxRepository;
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    @Scheduled(fixedDelay = 1000) // Poll every second
    @Transactional
    public void processOutboxEvents() {
        List<OutboxEvent> pendingEvents = outboxRepository
            .findByStatusOrderByCreatedAt(OutboxStatus.PENDING);
        
        for (OutboxEvent outboxEvent : pendingEvents) {
            try {
                // Deserialize and publish the event
                Object event = deserializeEvent(outboxEvent.getEventType(), outboxEvent.getPayload());
                eventPublisher.publishEvent(event);
                
                // Mark as processed
                outboxRepository.updateStatus(
                    outboxEvent.getId(), 
                    OutboxStatus.PROCESSED, 
                    Instant.now()
                );
                
            } catch (Exception e) {
                log.error("Failed to process outbox event: " + outboxEvent.getId(), e);
                outboxRepository.updateStatus(
                    outboxEvent.getId(), 
                    OutboxStatus.FAILED, 
                    Instant.now()
                );
            }
        }
    }
}

Go Implementation: Outbox Pattern with GORM

// Outbox event model
type OutboxEvent struct {
    ID          string     `gorm:"primarykey"`
    EventType   string     `gorm:"not null"`
    Payload     string     `gorm:"type:text;not null"`
    CreatedAt   time.Time
    ProcessedAt *time.Time
    Status      OutboxStatus `gorm:"type:varchar(20);default:'PENDING'"`
}

type OutboxStatus string

const (
    OutboxStatusPending   OutboxStatus = "PENDING"
    OutboxStatusProcessed OutboxStatus = "PROCESSED" 
    OutboxStatusFailed    OutboxStatus = "FAILED"
)

// Service with outbox pattern
type OrderService struct {
    db           *gorm.DB
    eventRelay   *OutboxEventRelay
}

func (s *OrderService) ProcessOrder(ctx context.Context, request CreateOrderRequest) (*Order, error) {
    var order *Order
    
    err := s.db.Transaction(func(tx *gorm.DB) error {
        // 1. Create order
        order = &Order{
            CustomerID:  request.CustomerID,
            TotalAmount: request.TotalAmount,
            Status:      "CONFIRMED",
        }
        
        if err := tx.Create(order).Error; err != nil {
            return fmt.Errorf("failed to create order: %w", err)
        }
        
        // 2. Store event in same transaction
        eventPayload, err := json.Marshal(OrderCreatedEvent{
            OrderID:     order.ID,
            CustomerID:  order.CustomerID,
            TotalAmount: order.TotalAmount,
        })
        if err != nil {
            return fmt.Errorf("failed to serialize event: %w", err)
        }
        
        outboxEvent := &OutboxEvent{
            ID:        uuid.New().String(),
            EventType: "OrderCreated",
            Payload:   string(eventPayload),
            CreatedAt: time.Now(),
            Status:    OutboxStatusPending,
        }
        
        if err := tx.Create(outboxEvent).Error; err != nil {
            return fmt.Errorf("failed to store outbox event: %w", err)
        }
        
        return nil
    })
    
    return order, err
}

// Event relay service
type OutboxEventRelay struct {
    db            *gorm.DB
    eventPublisher EventPublisher
    ticker         *time.Ticker
    done           chan bool
}

func NewOutboxEventRelay(db *gorm.DB, publisher EventPublisher) *OutboxEventRelay {
    return &OutboxEventRelay{
        db:             db,
        eventPublisher: publisher,
        ticker:         time.NewTicker(1 * time.Second),
        done:           make(chan bool),
    }
}

func (r *OutboxEventRelay) Start(ctx context.Context) {
    go func() {
        for {
            select {
            case <-r.ticker.C:
                r.processOutboxEvents(ctx)
            case <-r.done:
                return
            case <-ctx.Done():
                return
            }
        }
    }()
}

func (r *OutboxEventRelay) processOutboxEvents(ctx context.Context) {
    var events []OutboxEvent
    
    // Find pending events
    if err := r.db.Where("status = ?", OutboxStatusPending).
        Order("created_at ASC").
        Limit(100).
        Find(&events).Error; err != nil {
        log.Printf("Failed to fetch outbox events: %v", err)
        return
    }
    
    for _, event := range events {
        if err := r.processEvent(ctx, event); err != nil {
            log.Printf("Failed to process event %s: %v", event.ID, err)
            
            // Mark as failed
            now := time.Now()
            r.db.Model(&event).Updates(OutboxEvent{
                Status:      OutboxStatusFailed,
                ProcessedAt: &now,
            })
        } else {
            // Mark as processed
            now := time.Now()
            r.db.Model(&event).Updates(OutboxEvent{
                Status:      OutboxStatusProcessed,
                ProcessedAt: &now,
            })
        }
    }
}

func (r *OutboxEventRelay) processEvent(ctx context.Context, event OutboxEvent) error {
    // Deserialize and publish the event
    switch event.EventType {
    case "OrderCreated":
        var orderEvent OrderCreatedEvent
        if err := json.Unmarshal([]byte(event.Payload), &orderEvent); err != nil {
            return err
        }
        return r.eventPublisher.Publish(ctx, orderEvent)
        
    default:
        return fmt.Errorf("unknown event type: %s", event.EventType)
    }
}

Rust Implementation: Outbox with Diesel

// Outbox event model
use diesel::prelude::*;
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};

#[derive(Debug, Clone, Queryable, Insertable, Serialize, Deserialize)]
#[diesel(table_name = outbox_events)]
pub struct OutboxEvent {
    pub id: String,
    pub event_type: String,
    pub payload: String,
    pub created_at: DateTime<Utc>,
    pub processed_at: Option<DateTime<Utc>>,
    pub status: OutboxStatus,
}

#[derive(Debug, Clone, Serialize, Deserialize, diesel_derive_enum::DbEnum)]
#[ExistingTypePath = "crate::schema::sql_types::OutboxStatus"]
pub enum OutboxStatus {
    Pending,
    Processed,
    Failed,
}

// Service with outbox
impl OrderService {
    transactional! {
        fn process_order_with_outbox(request: CreateOrderRequest) -> Order {
            use crate::schema::orders::dsl::*;
            use crate::schema::outbox_events::dsl::*;
            
            // 1. Create order
            let new_order = NewOrder {
                customer_id: &request.customer_id,
                total_amount: request.total_amount,
                status: "CONFIRMED",
            };
            
            let order: Order = diesel::insert_into(orders)
                .values(&new_order)
                .get_result(conn)
                .map_err(TransactionError::Database)?;
            
            // 2. Create event in same transaction
            let event_payload = serde_json::to_string(&OrderCreatedEvent {
                order_id: order.id,
                customer_id: order.customer_id.clone(),
                total_amount: order.total_amount,
            }).map_err(|e| TransactionError::Business(format!("Event serialization failed: {}", e)))?;
            
            let outbox_event = OutboxEvent {
                id: uuid::Uuid::new_v4().to_string(),
                event_type: "OrderCreated".to_string(),
                payload: event_payload,
                created_at: Utc::now(),
                processed_at: None,
                status: OutboxStatus::Pending,
            };
            
            diesel::insert_into(outbox_events)
                .values(&outbox_event)
                .execute(conn)
                .map_err(TransactionError::Database)?;
            
            Ok(order)
        }
    }
}

// Event relay service
pub struct OutboxEventRelay {
    pool: Pool<ConnectionManager<PgConnection>>,
    event_publisher: Arc<dyn EventPublisher>,
}

impl OutboxEventRelay {
    pub async fn start(&self, mut shutdown: tokio::sync::broadcast::Receiver<()>) {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    if let Err(e) = self.process_outbox_events().await {
                        tracing::error!("Failed to process outbox events: {}", e);
                    }
                }
                _ = shutdown.recv() => {
                    tracing::info!("Outbox relay shutting down");
                    break;
                }
            }
        }
    }
    
    async fn process_outbox_events(&self) -> Result<(), Box<dyn std::error::Error>> {
        use crate::schema::outbox_events::dsl::*;
        
        let mut conn = self.pool.get()?;
        
        let pending_events: Vec<OutboxEvent> = outbox_events
            .filter(status.eq(OutboxStatus::Pending))
            .order(created_at.asc())
            .limit(100)
            .load(&mut conn)?;
        
        for event in pending_events {
            match self.process_single_event(&event).await {
                Ok(_) => {
                    // Mark as processed
                    let now = Utc::now();
                    diesel::update(outbox_events.filter(id.eq(&event.id)))
                        .set((
                            status.eq(OutboxStatus::Processed),
                            processed_at.eq(Some(now))
                        ))
                        .execute(&mut conn)?;
                }
                Err(e) => {
                    tracing::error!("Failed to process event {}: {}", event.id, e);
                    let now = Utc::now();
                    diesel::update(outbox_events.filter(id.eq(&event.id)))
                        .set((
                            status.eq(OutboxStatus::Failed),
                            processed_at.eq(Some(now))
                        ))
                        .execute(&mut conn)?;
                }
            }
        }
        
        Ok(())
    }
}

Solution 2: Change Data Capture (CDC)

I have used this pattern extensively for high-throughput systems where polling-based outbox patterns couldn’t keep up with the data volume. My first implementation was in the early 2000s when building an intelligent traffic management system, where I used CDC to synchronize data between two subsystems—all database changes were captured and published to a JMS queue, with consumers updating their local databases in near real-time. In subsequent projects, I used CDC to publish database changes directly to Kafka topics, enabling downstream services to build analytical systems, populate data lakes, and power real-time reporting dashboards.

As CDC eliminates the polling overhead entirely, this approach can be scaled to millions of transactions per day while maintaining sub-second latency for downstream consumers.

// CDC-based event publisher using Debezium
@Component
public class CDCEventHandler {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @KafkaListener(topics = "dbserver.public.orders")
    public void handleOrderChange(ConsumerRecord<String, String> record) {
        try {
            // Parse CDC event
            JsonNode changeEvent = objectMapper.readTree(record.value());
            String operation = changeEvent.get("op").asText(); // c=create, u=update, d=delete
            
            if ("c".equals(operation)) {
                JsonNode after = changeEvent.get("after");
                
                OrderCreatedEvent event = OrderCreatedEvent.builder()
                    .orderId(after.get("id").asLong())
                    .customerId(after.get("customer_id").asText())
                    .totalAmount(after.get("total_amount").asLong())
                    .build();
                
                // Publish to business event topic
                kafkaTemplate.send("order-events", 
                    event.getOrderId().toString(), 
                    objectMapper.writeValueAsString(event));
            }
            
        } catch (Exception e) {
            log.error("Failed to process CDC event", e);
        }
    }
}

Solution 3: Event Sourcing

I have often used this pattern with financial systems in conjunction with CQRS, where all changes are stored as immutable events and can be replayed if needed. This approach is particularly valuable in financial contexts because it provides a complete audit trail—every account balance change, every trade, every adjustment can be traced back to its originating event.

For systems where events are the source of truth:

// Event store as the primary persistence
@Entity
public class EventStore {
    @Id
    private String eventId;
    
    private String aggregateId;
    private String eventType;
    private String eventData;
    private Long version;
    private Instant timestamp;
    
    // getters, setters...
}

@Service
public class EventSourcedOrderService {
    
    @Transactional
    public void processOrder(CreateOrderRequest request) {
        String orderId = UUID.randomUUID().toString();
        
        // Store event - this IS the transaction
        OrderCreatedEvent event = new OrderCreatedEvent(orderId, request);
        
        EventStore eventRecord = new EventStore(
            UUID.randomUUID().toString(),
            orderId,
            "OrderCreated",
            objectMapper.writeValueAsString(event),
            1L,
            Instant.now()
        );
        
        eventStoreRepository.save(eventRecord);
        
        // Publish immediately - event is already persisted
        eventPublisher.publishEvent(event);
        
        // Read model is updated asynchronously via event handlers
    }
}

Database Connection Pooling & Transaction Isolation

One of the most overlooked sources of transaction problems is the mismatch between application threads and database connections. I’ve debugged production deadlocks that turned out to be caused by connection pool starvation rather than actual database contention.

The Thread-Pool vs Connection-Pool Mismatch

// DANGEROUS CONFIGURATION - This will cause deadlocks
@Configuration
public class ProblematicDataSourceConfig {
    
    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setMaximumPoolSize(10);        // Only 10 connections
        config.setConnectionTimeout(30000);   // 30 second timeout
        return new HikariDataSource(config);
    }
    
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);         // 50 threads!
        executor.setMaxPoolSize(100);         // Up to 100 threads!
        return executor;
    }
}

// This service will cause connection pool starvation
@Service
public class ProblematicBulkService {
    
    @Async
    @Transactional
    public CompletableFuture<Void> processBatch(List<Order> orders) {
        // 100 threads trying to get 10 connections = deadlock
        for (Order order : orders) {
            // Long-running transaction holds connection
            processOrder(order);
            
            // Calling another transactional method = needs another connection
            auditService.logOrderProcessing(order); // DEADLOCK RISK!
        }
        return CompletableFuture.completedFuture(null);
    }
}

Correct Connection Pool Configuration

@Configuration
public class OptimalDataSourceConfig {
    
    @Value("${app.database.max-connections:20}")
    private int maxConnections;
    
    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        
        // Rule of thumb: connections >= active threads + buffer
        config.setMaximumPoolSize(maxConnections);
        config.setMinimumIdle(5);
        
        // Prevent connection hoarding
        config.setConnectionTimeout(5000);     // 5 seconds
        config.setIdleTimeout(300000);         // 5 minutes
        config.setMaxLifetime(1200000);        // 20 minutes
        
        // Detect leaked connections
        config.setLeakDetectionThreshold(60000); // 1 minute
        
        return new HikariDataSource(config);
    }
    
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // Keep thread pool smaller than connection pool
        executor.setCorePoolSize(maxConnections - 5);
        executor.setMaxPoolSize(maxConnections);
        executor.setQueueCapacity(100);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

// Connection-aware bulk processing
@Service
public class OptimalBulkService {
    
    @Transactional
    public void processBatchOptimized(List<Order> orders) {
        int batchSize = 50; // Tune based on connection pool size
        
        for (int i = 0; i < orders.size(); i += batchSize) {
            List<Order> batch = orders.subList(i, Math.min(i + batchSize, orders.size()));
            
            // Process batch in single transaction
            processBatchInternal(batch);
            
            // Flush and clear to prevent memory issues
            entityManager.flush();
            entityManager.clear();
        }
    }
    
    private void processBatchInternal(List<Order> batch) {
        // Bulk operations to reduce connection time
        orderRepository.saveAll(batch);
        
        // Batch audit logging
        List<AuditLog> auditLogs = batch.stream()
            .map(order -> new AuditLog("ORDER_PROCESSED", order.getId()))
            .collect(Collectors.toList());
        auditRepository.saveAll(auditLogs);
    }
}

ETL Transaction Boundaries: The Performance Killer

Object-Relational (OR) mapping and automated transactions simplify development, but I have encountered countless performance issues where the same patterns were used for ETL processing or importing data. For example, I’ve seen code where developers used Hibernate to import millions of records, and it took hours to process data that should have completed in minutes. In other cases, I saw transactions committed after inserting each record—an import job that should have taken an hour ended up running for days.

These performance issues stem from misunderstanding the fundamental differences between OLTP (Online Transaction Processing) and OLAP/ETL (Online Analytical Processing) workloads. OLTP patterns optimize for individual record operations with strict ACID guarantees, while ETL patterns optimize for bulk operations with relaxed consistency requirements. Care must be taken to understand these tradeoffs and choose the right approach for each scenario.

// WRONG - Each record in its own transaction (SLOW!)
@Service
public class SlowETLService {
    
    public void importOrders(InputStream csvFile) {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(csvFile))) {
            String line;
            while ((line = reader.readLine()) != null) {
                processOrderRecord(line); // Each call = new transaction!
            }
        }
    }
    
    @Transactional // New transaction per record = DISASTER
    private void processOrderRecord(String csvLine) {
        Order order = parseOrderFromCsv(csvLine);
        orderRepository.save(order);
        // Commit happens here - thousands of commits!
    }
}

// CORRECT - Batch processing with optimal transaction boundaries
@Service 
public class FastETLService {
    
    @Value("${etl.batch-size:1000}")
    private int batchSize;
    
    public void importOrdersOptimized(InputStream csvFile) {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(csvFile))) {
            List<Order> batch = new ArrayList<>(batchSize);
            String line;
            
            while ((line = reader.readLine()) != null) {
                Order order = parseOrderFromCsv(line);
                batch.add(order);
                
                if (batch.size() >= batchSize) {
                    processBatch(batch);
                    batch.clear();
                }
            }
            
            // Process remaining records
            if (!batch.isEmpty()) {
                processBatch(batch);
            }
        }
    }
    
    @Transactional
    private void processBatch(List<Order> orders) {
        // Single transaction for entire batch
        orderRepository.saveAll(orders);
        
        // Bulk validation and error handling
        validateBatch(orders);
        
        // Bulk audit logging
        createAuditEntries(orders);
    }
}

Go Connection Pool Management

// Connection pool configuration with proper sizing
func setupDatabase() *gorm.DB {
    dsn := "host=localhost user=postgres password=secret dbname=orders port=5432 sslmode=disable"
    
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        log.Fatal("Failed to connect to database:", err)
    }
    
    sqlDB, err := db.DB()
    if err != nil {
        log.Fatal("Failed to get SQL DB:", err)
    }
    
    // Critical: Match pool size to application concurrency
    sqlDB.SetMaxOpenConns(20)        // Maximum connections
    sqlDB.SetMaxIdleConns(5)         // Idle connections
    sqlDB.SetConnMaxLifetime(time.Hour) // Connection lifetime
    sqlDB.SetConnMaxIdleTime(time.Minute * 30) // Idle timeout
    
    return db
}

// ETL with proper batching
type ETLService struct {
    db        *gorm.DB
    batchSize int
}

func (s *ETLService) ImportOrders(csvFile io.Reader) error {
    scanner := bufio.NewScanner(csvFile)
    batch := make([]*Order, 0, s.batchSize)
    
    for scanner.Scan() {
        order, err := s.parseOrderFromCSV(scanner.Text())
        if err != nil {
            log.Printf("Failed to parse order: %v", err)
            continue
        }
        
        batch = append(batch, order)
        
        if len(batch) >= s.batchSize {
            if err := s.processBatch(batch); err != nil {
                return fmt.Errorf("batch processing failed: %w", err)
            }
            batch = batch[:0] // Reset slice but keep capacity
        }
    }
    
    // Process remaining orders
    if len(batch) > 0 {
        return s.processBatch(batch)
    }
    
    return scanner.Err()
}

func (s *ETLService) processBatch(orders []*Order) error {
    return s.db.Transaction(func(tx *gorm.DB) error {
        // Batch insert for performance
        if err := tx.CreateInBatches(orders, len(orders)).Error; err != nil {
            return err
        }
        
        // Bulk audit logging in same transaction
        auditLogs := make([]*AuditLog, len(orders))
        for i, order := range orders {
            auditLogs[i] = &AuditLog{
                Action:    "ORDER_IMPORTED",
                OrderID:   order.ID,
                Timestamp: time.Now(),
            }
        }
        
        return tx.CreateInBatches(auditLogs, len(auditLogs)).Error
    })
}

Deadlock Detection and Resolution

Deadlocks are inevitable in high-concurrency systems. The key is detecting and handling them gracefully:

Database-Level Deadlock Prevention

@Service
public class DeadlockAwareAccountService {
    
    @Retryable(
        value = {CannotAcquireLockException.class, DeadlockLoserDataAccessException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 100, multiplier = 2, random = true)
    )
    @Transactional(isolation = Isolation.READ_COMMITTED)
    public void transferFunds(String fromAccountId, String toAccountId, BigDecimal amount) {
        
        // CRITICAL: Always acquire locks in consistent order to prevent deadlocks
        String firstId = fromAccountId.compareTo(toAccountId) < 0 ? fromAccountId : toAccountId;
        String secondId = fromAccountId.compareTo(toAccountId) < 0 ? toAccountId : fromAccountId;
        
        // Lock accounts in alphabetical order
        Account firstAccount = accountRepository.findByIdForUpdate(firstId);
        Account secondAccount = accountRepository.findByIdForUpdate(secondId);
        
        // Determine which is from/to after locking
        Account fromAccount = fromAccountId.equals(firstId) ? firstAccount : secondAccount;
        Account toAccount = fromAccountId.equals(firstId) ? secondAccount : firstAccount;
        
        if (fromAccount.getBalance().compareTo(amount) < 0) {
            throw new InsufficientFundsException();
        }
        
        fromAccount.setBalance(fromAccount.getBalance().subtract(amount));
        toAccount.setBalance(toAccount.getBalance().add(amount));
        
        accountRepository.save(fromAccount);
        accountRepository.save(toAccount);
    }
}

// Custom repository with explicit locking
@Repository
public interface AccountRepository extends JpaRepository<Account, String> {
    
    @Query("SELECT a FROM Account a WHERE a.id = :id")
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Account findByIdForUpdate(@Param("id") String id);
    
    // Timeout-based locking to prevent indefinite waits
    @QueryHints({@QueryHint(name = "javax.persistence.lock.timeout", value = "5000")})
    @Query("SELECT a FROM Account a WHERE a.id = :id")
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Account findByIdForUpdateWithTimeout(@Param("id") String id);
}

Application-Level Deadlock Detection

// Deadlock-aware service with timeout and retry
type AccountService struct {
    db           *gorm.DB
    lockTimeout  time.Duration
    retryAttempts int
}

func (s *AccountService) TransferFundsWithDeadlockHandling(
    ctx context.Context, 
    fromAccountID, toAccountID string, 
    amount int64,
) error {
    
    for attempt := 0; attempt < s.retryAttempts; attempt++ {
        err := s.attemptTransfer(ctx, fromAccountID, toAccountID, amount)
        
        if err == nil {
            return nil // Success
        }
        
        // Check if it's a deadlock or timeout error
        if s.isRetryableError(err) {
            // Exponential backoff with jitter
            backoff := time.Duration(attempt+1) * 100 * time.Millisecond
            jitter := time.Duration(rand.Intn(100)) * time.Millisecond
            
            select {
            case <-time.After(backoff + jitter):
                continue // Retry
            case <-ctx.Done():
                return ctx.Err()
            }
        }
        
        return err // Non-retryable error
    }
    
    return fmt.Errorf("transfer failed after %d attempts", s.retryAttempts)
}

func (s *AccountService) attemptTransfer(
    ctx context.Context, 
    fromAccountID, toAccountID string, 
    amount int64,
) error {
    
    // Set transaction timeout
    timeoutCtx, cancel := context.WithTimeout(ctx, s.lockTimeout)
    defer cancel()
    
    return s.db.WithContext(timeoutCtx).Transaction(func(tx *gorm.DB) error {
        // Lock in consistent order
        firstID, secondID := fromAccountID, toAccountID
        if fromAccountID > toAccountID {
            firstID, secondID = toAccountID, fromAccountID
        }
        
        var fromAccount, toAccount Account
        
        // Lock first account
        if err := tx.Set("gorm:query_option", "FOR UPDATE NOWAIT").
            Where("id = ?", firstID).First(&fromAccount).Error; err != nil {
            return err
        }
        
        // Lock second account
        if err := tx.Set("gorm:query_option", "FOR UPDATE NOWAIT").
            Where("id = ?", secondID).First(&toAccount).Error; err != nil {
            return err
        }
        
        // Ensure we have the right accounts
        if fromAccount.ID != fromAccountID {
            fromAccount, toAccount = toAccount, fromAccount
        }
        
        if fromAccount.Balance < amount {
            return fmt.Errorf("insufficient funds")
        }
        
        fromAccount.Balance -= amount
        toAccount.Balance += amount
        
        if err := tx.Save(&fromAccount).Error; err != nil {
            return err
        }
        
        return tx.Save(&toAccount).Error
    })
}

func (s *AccountService) isRetryableError(err error) bool {
    errStr := strings.ToLower(err.Error())
    
    // Common deadlock/timeout indicators
    retryablePatterns := []string{
        "deadlock detected",
        "lock timeout",
        "could not obtain lock",
        "serialization failure",
        "concurrent update",
    }
    
    for _, pattern := range retryablePatterns {
        if strings.Contains(errStr, pattern) {
            return true
        }
    }
    
    return false
}

CQRS: Separating Read and Write Transaction Models

I have also used Command Query Responsibility Segregation pattern in some financial applications that allows different transaction strategies for reads vs writes:

Java CQRS Implementation

// Command side - strict ACID transactions
@Service
public class OrderCommandService {
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private EventPublisher eventPublisher;
    
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void createOrder(CreateOrderCommand command) {
        // Validate command
        validateOrderCommand(command);
        
        Order order = new Order(command);
        order = orderRepository.save(order);
        
        // Publish event for read model update
        eventPublisher.publish(new OrderCreatedEvent(order));
    }
    
    @Transactional(isolation = Isolation.READ_COMMITTED)
    public void updateOrderStatus(UpdateOrderStatusCommand command) {
        Order order = orderRepository.findById(command.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException());
        
        order.updateStatus(command.getNewStatus());
        orderRepository.save(order);
        
        eventPublisher.publish(new OrderStatusChangedEvent(order));
    }
}

// Query side - optimized for reads, eventual consistency
@Service
public class OrderQueryService {
    
    @Autowired
    private OrderReadModelRepository readModelRepository;
    
    // Read-only transaction for consistency within the query
    @Transactional(readOnly = true)
    public OrderSummary getOrderSummary(String customerId) {
        List<OrderReadModel> orders = readModelRepository
            .findByCustomerId(customerId);
        
        return OrderSummary.builder()
            .totalOrders(orders.size())
            .totalAmount(orders.stream()
                .mapToLong(OrderReadModel::getTotalAmount)
                .sum())
            .recentOrders(orders.stream()
                .sorted((o1, o2) -> o2.getCreatedAt().compareTo(o1.getCreatedAt()))
                .limit(10)
                .collect(Collectors.toList()))
            .build();
    }
    
    // No transaction needed for simple lookups
    public List<OrderReadModel> searchOrders(OrderSearchCriteria criteria) {
        return readModelRepository.search(criteria);
    }
}

// Event handler updates read model asynchronously
@Component
public class OrderReadModelUpdater {
    
    @EventHandler
    @Async
    @Transactional
    public void handle(OrderCreatedEvent event) {
        OrderReadModel readModel = OrderReadModel.builder()
            .orderId(event.getOrderId())
            .customerId(event.getCustomerId())
            .totalAmount(event.getTotalAmount())
            .status(event.getStatus())
            .createdAt(event.getCreatedAt())
            .build();
        
        readModelRepository.save(readModel);
    }
    
    @EventHandler
    @Async
    @Transactional
    public void handle(OrderStatusChangedEvent event) {
        readModelRepository.updateStatus(
            event.getOrderId(), 
            event.getNewStatus(),
            event.getUpdatedAt()
        );
    }
}

Data Locality and Performance Considerations

Data locality is critical to performance issues and I have seen a number of performance issues where data-source was not colocated with the data processing APIs resulting in higher network latency and poor performance:

Multi-Region Database Strategy

@Configuration
public class MultiRegionDataSourceConfig {
    
    // Primary database in same region
    @Bean
    @Primary
    @ConfigurationProperties("spring.datasource.primary")
    public DataSource primaryDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    // Read replica in same availability zone
    @Bean
    @ConfigurationProperties("spring.datasource.read-replica")
    public DataSource readReplicaDataSource() {
        return DataSourceBuilder.create().build();
    }
    
    // Cross-region backup for disaster recovery
    @Bean
    @ConfigurationProperties("spring.datasource.cross-region")
    public DataSource crossRegionDataSource() {
        return DataSourceBuilder.create().build();
    }
}

@Service
public class LocalityAwareOrderService {
    
    @Autowired
    @Qualifier("primaryDataSource")
    private DataSource writeDataSource;
    
    @Autowired
    @Qualifier("readReplicaDataSource") 
    private DataSource readDataSource;
    
    // Writes go to primary in same region
    @Transactional("primaryTransactionManager")
    public void createOrder(CreateOrderRequest request) {
        // Fast writes - same AZ latency ~1ms
        Order order = new Order(request);
        orderRepository.save(order);
    }
    
    // Reads from local replica
    @Transactional(value = "readReplicaTransactionManager", readOnly = true)
    public List<Order> getOrderHistory(String customerId) {
        // Even faster reads - same rack latency ~0.1ms
        return orderRepository.findByCustomerId(customerId);
    }
    
    // Critical path optimization
    @Transactional(timeout = 5) // Fail fast if network issues
    public void processUrgentOrder(UrgentOrderRequest request) {
        // Use connection pooling and keep-alive for predictable latency
        processOrder(request);
    }
}

Go with Regional Database Selection

type RegionalDatabaseConfig struct {
    primaryDB    *gorm.DB
    readReplica  *gorm.DB
    crossRegion  *gorm.DB
}

func NewRegionalDatabaseConfig(region string) *RegionalDatabaseConfig {
    // Select database endpoints based on current region
    primaryDSN := fmt.Sprintf("host=%s-primary.db user=app", region)
    readDSN := fmt.Sprintf("host=%s-read.db user=app", region)
    backupDSN := fmt.Sprintf("host=%s-backup.db user=app", getBackupRegion(region))
    
    return &RegionalDatabaseConfig{
        primaryDB:   connectWithLatencyOptimization(primaryDSN),
        readReplica: connectWithLatencyOptimization(readDSN),
        crossRegion: connectWithLatencyOptimization(backupDSN),
    }
}

func connectWithLatencyOptimization(dsn string) *gorm.DB {
    db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
    if err != nil {
        log.Fatal("Database connection failed:", err)
    }
    
    sqlDB, _ := db.DB()
    
    // Optimize for low-latency
    sqlDB.SetMaxOpenConns(20)
    sqlDB.SetMaxIdleConns(20)        // Keep connections alive
    sqlDB.SetConnMaxIdleTime(0)      // Never close idle connections
    sqlDB.SetConnMaxLifetime(time.Hour) // Rotate connections hourly
    
    return db
}

type OrderService struct {
    config *RegionalDatabaseConfig
}

func (s *OrderService) CreateOrder(ctx context.Context, request CreateOrderRequest) error {
    // Use primary database for writes
    return s.config.primaryDB.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        order := &Order{
            CustomerID:  request.CustomerID,
            TotalAmount: request.TotalAmount,
            Status:      "PENDING",
        }
        
        return tx.Create(order).Error
    })
}

func (s *OrderService) GetOrderHistory(ctx context.Context, customerID string) ([]Order, error) {
    var orders []Order
    
    // Use read replica for queries - better performance
    err := s.config.readReplica.WithContext(ctx).
        Where("customer_id = ?", customerID).
        Order("created_at DESC").
        Find(&orders).Error
        
    return orders, err
}

Understanding Consistency: ACID vs CAP vs Linearizability

One of the most confusing aspects of distributed systems is that “consistency” means different things in different contexts. The C in ACID has nothing to do with the C in CAP, and this distinction is crucial for designing reliable systems.

The Three Faces of Consistency

// ACID Consistency: Data integrity constraints
@Entity
public class BankAccount {
    @Id
    private String accountId;
    
    @Column(nullable = false)
    @Min(0) // ACID Consistency: Balance cannot be negative
    private BigDecimal balance;
    
    @Version
    private Long version;
}

// CAP Consistency: Linearizability across distributed nodes
@Service
public class DistributedAccountService {
    
    // This requires coordination across all replicas
    @Transactional
    public void withdraw(String accountId, BigDecimal amount) {
        // All replicas must see this change at the same logical time
        // = CAP Consistency (Linearizability)
        
        BankAccount account = accountRepository.findById(accountId);
        if (account.getBalance().compareTo(amount) < 0) {
            throw new InsufficientFundsException(); // ACID Consistency violated
        }
        account.setBalance(account.getBalance().subtract(amount));
        accountRepository.save(account); // Maintains ACID invariants
    }
}

ACID Consistency ensures your data satisfies business rules and constraints – balances can’t be negative, foreign keys must reference valid records, etc. It’s about data integrity within a single database.

CAP Consistency (Linearizability) means all nodes in a distributed system see the same data at the same time – as if there’s only one copy of the data. It’s about coordination across multiple machines.

Consistency Models in Distributed Systems

Here’s how different NoSQL systems handle consistency:

DynamoDB: Tunable Consistency

I have used DynamoDB extensively in a number of systems, especially when I worked for a large cloud provider. DynamoDB provides both strongly consistent and eventually consistent reads, but when using strongly consistent reads, all requests go to the leader node, which limits performance and scalability and is also more costly from a financial perspective. We had to carefully evaluate each use case to configure proper consistency settings to balance performance requirements with financial costs.

Additionally, NoSQL databases like DynamoDB don’t typically enforce ACID transactions across multiple items or support normal foreign key constraints, though they do support global secondary indexes. This means you have to manually handle compensating transactions and maintain referential integrity in your application code.

// DynamoDB allows you to choose consistency per operation
@Service
public class DynamoConsistencyService {
    
    @Autowired
    private DynamoDBClient dynamoClient;
    
    // Eventually consistent read - faster, cheaper, may be stale
    public Order getOrder(String orderId) {
        GetItemRequest request = GetItemRequest.builder()
            .tableName("Orders")
            .key(Map.of("orderId", AttributeValue.builder().s(orderId).build()))
            .consistentRead(false) // Eventually consistent
            .build();
            
        return dynamoClient.getItem(request);
    }
    
    // Strongly consistent read - slower, more expensive, always latest
    public Order getOrderStrong(String orderId) {
        GetItemRequest request = GetItemRequest.builder()
            .tableName("Orders")
            .key(Map.of("orderId", AttributeValue.builder().s(orderId).build()))
            .consistentRead(true) // Strongly consistent = linearizable
            .build();
            
        return dynamoClient.getItem(request);
    }
    
    // Quorum-based tunable consistency
    public void updateOrderWithQuorum(String orderId, String newStatus) {
        // W + R > N guarantees consistency
        // W=2, R=2, N=3 means writes must succeed on 2 nodes,
        // reads must check 2 nodes, guaranteeing overlap
        
        UpdateItemRequest request = UpdateItemRequest.builder()
            .tableName("Orders")
            .key(Map.of("orderId", AttributeValue.builder().s(orderId).build()))
            .updateExpression("SET #status = :status")
            .expressionAttributeNames(Map.of("#status", "status"))
            .expressionAttributeValues(Map.of(":status", AttributeValue.builder().s(newStatus).build()))
            .build();
            
        dynamoClient.updateItem(request);
    }
}

Distributed Locking with DynamoDB

Building on my experience implementing distributed locks with DynamoDB, here’s how to prevent double spending in a distributed NoSQL environment:

// Distributed lock implementation for financial operations
@Service
public class DynamoDistributedLockService {
    
    @Autowired
    private DynamoDBClient dynamoClient;
    
    private static final String LOCK_TABLE = "distributed_locks";
    private static final int LOCK_TIMEOUT_SECONDS = 30;
    
    public boolean acquireTransferLock(String fromAccount, String toAccount, String requestId) {
        // Always lock accounts in consistent order to prevent deadlocks
        List<String> sortedAccounts = Arrays.asList(fromAccount, toAccount)
            .stream()
            .sorted()
            .collect(Collectors.toList());
        
        String lockKey = String.join(":", sortedAccounts);
        long expiryTime = System.currentTimeMillis() + (LOCK_TIMEOUT_SECONDS * 1000);
        
        try {
            PutItemRequest request = PutItemRequest.builder()
                .tableName(LOCK_TABLE)
                .item(Map.of(
                    "lock_key", AttributeValue.builder().s(lockKey).build(),
                    "owner", AttributeValue.builder().s(requestId).build(),
                    "expires_at", AttributeValue.builder().n(String.valueOf(expiryTime)).build(),
                    "accounts", AttributeValue.builder().ss(sortedAccounts).build()
                ))
                .conditionExpression("attribute_not_exists(lock_key) OR expires_at < :current_time")
                .expressionAttributeValues(Map.of(
                    ":current_time", AttributeValue.builder().n(String.valueOf(System.currentTimeMillis())).build()
                ))
                .build();
                
            dynamoClient.putItem(request);
            return true;
            
        } catch (ConditionalCheckFailedException e) {
            return false; // Lock already held
        }
    }
    
    public void releaseLock(String fromAccount, String toAccount, String requestId) {
        List<String> sortedAccounts = Arrays.asList(fromAccount, toAccount)
            .stream()
            .sorted()
            .collect(Collectors.toList());
        
        String lockKey = String.join(":", sortedAccounts);
        
        DeleteItemRequest request = DeleteItemRequest.builder()
            .tableName(LOCK_TABLE)
            .key(Map.of("lock_key", AttributeValue.builder().s(lockKey).build()))
            .conditionExpression("owner = :request_id")
            .expressionAttributeValues(Map.of(
                ":request_id", AttributeValue.builder().s(requestId).build()
            ))
            .build();
            
        try {
            dynamoClient.deleteItem(request);
        } catch (ConditionalCheckFailedException e) {
            log.warn("Failed to release lock - may have been taken by another process: {}", lockKey);
        }
    }
}

// Usage in financial service
@Service
public class DynamoFinancialService {
    
    @Autowired
    private DynamoDistributedLockService lockService;
    
    public TransferResult transferFunds(String fromAccount, String toAccount, BigDecimal amount) {
        String requestId = UUID.randomUUID().toString();
        
        // Acquire distributed lock to prevent concurrent transfers
        if (!lockService.acquireTransferLock(fromAccount, toAccount, requestId)) {
            return TransferResult.rejected("Another transfer in progress for these accounts");
        }
        
        try {
            // Now safe to perform transfer with DynamoDB transactions
            Collection<TransactionWriteRequest> actions = new ArrayList<>();
            
            // Conditional update for from account
            actions.add(TransactionWriteRequest.builder()
                .update(Update.builder()
                    .tableName("accounts")
                    .key(Map.of("account_id", AttributeValue.builder().s(fromAccount).build()))
                    .updateExpression("SET balance = balance - :amount")
                    .conditionExpression("balance >= :amount") // Prevent negative balance
                    .expressionAttributeValues(Map.of(
                        ":amount", AttributeValue.builder().n(amount.toString()).build()
                    ))
                    .build())
                .build());
            
            // Conditional update for to account  
            actions.add(TransactionWriteRequest.builder()
                .update(Update.builder()
                    .tableName("accounts")
                    .key(Map.of("account_id", AttributeValue.builder().s(toAccount).build()))
                    .updateExpression("SET balance = balance + :amount")
                    .expressionAttributeValues(Map.of(
                        ":amount", AttributeValue.builder().n(amount.toString()).build()
                    ))
                    .build())
                .build());
            
            // Execute atomic transaction
            TransactWriteItemsRequest txRequest = TransactWriteItemsRequest.builder()
                .transactItems(actions)
                .build();
                
            dynamoClient.transactWriteItems(txRequest);
            return TransferResult.success();
            
        } catch (TransactionCanceledException e) {
            return TransferResult.rejected("Transfer failed - likely insufficient funds");
        } finally {
            lockService.releaseLock(fromAccount, toAccount, requestId);
        }
    }
}

The CAP Theorem in Practice

// Real-world CAP tradeoffs in microservices
@Service
public class CAPAwareOrderService {
    
    // Partition tolerance is mandatory in distributed systems
    // So you choose: Consistency OR Availability
    
    // CP System: Choose Consistency over Availability
    @CircuitBreaker(fallbackMethod = "fallbackCreateOrder")
    public OrderResponse createOrderCP(CreateOrderRequest request) {
        try {
            // All replicas must acknowledge - may fail if partition occurs
            return orderService.createOrderWithQuorum(request);
        } catch (PartitionException e) {
            // System becomes unavailable during partition
            throw new ServiceUnavailableException("Cannot guarantee consistency during partition");
        }
    }
    
    // AP System: Choose Availability over Consistency  
    public OrderResponse createOrderAP(CreateOrderRequest request) {
        try {
            // Try strong consistency first
            return orderService.createOrderWithQuorum(request);
        } catch (PartitionException e) {
            // Fall back to available replicas - may create conflicts
            log.warn("Partition detected, falling back to eventual consistency");
            return orderService.createOrderEventual(request);
        }
    }
    
    // Fallback for CP system
    public OrderResponse fallbackCreateOrder(CreateOrderRequest request, Exception ex) {
        // Return cached response or friendly error
        return OrderResponse.builder()
            .status("DEFERRED")
            .message("Order will be processed when system recovers")
            .build();
    }
}

This distinction between ACID consistency and CAP consistency is fundamental to designing distributed systems. ACID gives you data integrity within a single node, while CAP forces you to choose between strong consistency and availability when networks partition. Understanding these tradeoffs lets you make informed architectural decisions based on your business requirements.

CAP Theorem and Financial Consistency

The CAP theorem creates fundamental challenges for financial systems. You cannot have both strong consistency and availability during network partitions, which forces difficult architectural decisions:

// CP System: Prioritize Consistency over Availability
@Service
public class FinancialConsistencyService {
    
    @CircuitBreaker(fallbackMethod = "rejectTransaction")
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public TransferResult transferFunds(String fromAccount, String toAccount, BigDecimal amount) {
        // Requires consensus across all replicas
        // May become unavailable during partitions
        
        if (!distributedLockService.acquireLock(fromAccount, toAccount)) {
            throw new ServiceUnavailableException("Cannot acquire distributed lock");
        }
        
        try {
            // This operation requires strong consistency across all nodes
            Account from = accountService.getAccountWithQuorum(fromAccount);
            Account to = accountService.getAccountWithQuorum(toAccount);
            
            if (from.getBalance().compareTo(amount) < 0) {
                throw new InsufficientFundsException();
            }
            
            // Both updates must succeed on majority of replicas
            accountService.updateWithQuorum(fromAccount, from.getBalance().subtract(amount));
            accountService.updateWithQuorum(toAccount, to.getBalance().add(amount));
            
            return TransferResult.success();
            
        } finally {
            distributedLockService.releaseLock(fromAccount, toAccount);
        }
    }
    
    public TransferResult rejectTransaction(String fromAccount, String toAccount, 
                                          BigDecimal amount, Exception ex) {
        // During partitions, reject rather than risk double spending
        return TransferResult.rejected("Cannot guarantee consistency during network partition");
    }
}

NoSQL Transaction Limitations: DynamoDB and Compensating Transactions

NoSQL databases often lack ACID guarantees, requiring different strategies:

DynamoDB Optimistic Concurrency

// DynamoDB with optimistic locking using version fields
@DynamoDBTable(tableName = "Orders")
public class DynamoOrder {
    
    @DynamoDBHashKey
    private String orderId;
    
    @DynamoDBAttribute
    private String customerId;
    
    @DynamoDBAttribute
    private Long totalAmount;
    
    @DynamoDBAttribute
    private String status;
    
    @DynamoDBVersionAttribute
    private Long version; // DynamoDB handles optimistic locking
    
    // getters, setters...
}

@Service
public class DynamoOrderService {
    
    @Autowired
    private DynamoDBMapper dynamoMapper;
    
    // Optimistic concurrency with retry
    @Retryable(
        value = {ConditionalCheckFailedException.class},
        maxAttempts = 3,
        backoff = @Backoff(delay = 100, multiplier = 2)
    )
    public void updateOrderStatus(String orderId, String newStatus) {
        try {
            // Get current version
            DynamoOrder order = dynamoMapper.load(DynamoOrder.class, orderId);
            if (order == null) {
                throw new OrderNotFoundException();
            }
            
            // Update with version check
            order.setStatus(newStatus);
            dynamoMapper.save(order); // Fails if version changed
            
        } catch (ConditionalCheckFailedException e) {
            // Another process modified the record - retry
            throw e;
        }
    }
    
    // Multi-item transaction using DynamoDB transactions (limited)
    public void transferOrderItems(String fromOrderId, String toOrderId, List<String> itemIds) {
        Collection<TransactionWriteRequest> actions = new ArrayList<>();
        
        // Read both orders first
        DynamoOrder fromOrder = dynamoMapper.load(DynamoOrder.class, fromOrderId);
        DynamoOrder toOrder = dynamoMapper.load(DynamoOrder.class, toOrderId);
        
        // Prepare conditional updates
        fromOrder.removeItems(itemIds);
        toOrder.addItems(itemIds);
        
        actions.add(new TransactionWriteRequest()
            .withConditionCheck(new ConditionCheck()
                .withTableName("Orders")
                .withKey(Collections.singletonMap("orderId", new AttributeValue(fromOrderId)))
                .withConditionExpression("version = :version")
                .withExpressionAttributeValues(Collections.singletonMap(":version", 
                    new AttributeValue().withN(fromOrder.getVersion().toString())))))
        
        actions.add(new TransactionWriteRequest()
            .withUpdate(new Update()
                .withTableName("Orders")
                .withKey(Collections.singletonMap("orderId", new AttributeValue(fromOrderId)))
                .withUpdateExpression("SET #items = :items, version = version + :inc")
                .withConditionExpression("version = :currentVersion")
                .withExpressionAttributeNames(Collections.singletonMap("#items", "items"))
                .withExpressionAttributeValues(Map.of(
                    ":items", new AttributeValue().withSS(fromOrder.getItems()),
                    ":inc", new AttributeValue().withN("1"),
                    ":currentVersion", new AttributeValue().withN(fromOrder.getVersion().toString())
                ))));
        
        // Execute transaction
        dynamoClient.transactWriteItems(new TransactWriteItemsRequest()
            .withTransactItems(actions));
    }
}

Compensating Transactions for NoSQL

// SAGA pattern for NoSQL databases without transactions
@Service
public class NoSQLOrderSaga {
    
    @Autowired
    private DynamoOrderService orderService;
    
    @Autowired
    private MongoInventoryService inventoryService;
    
    @Autowired
    private CassandraPaymentService paymentService;
    
    public void processOrderWithCompensation(CreateOrderRequest request) {
        CompensatingTransactionManager saga = new CompensatingTransactionManager();
        
        try {
            // Step 1: Create order in DynamoDB
            String orderId = saga.execute("CREATE_ORDER", 
                () -> orderService.createOrder(request),
                (orderId) -> orderService.deleteOrder(orderId)
            );
            
            // Step 2: Reserve inventory in MongoDB  
            String reservationId = saga.execute("RESERVE_INVENTORY",
                () -> inventoryService.reserveItems(request.getItems()),
                (reservationId) -> inventoryService.releaseReservation(reservationId)
            );
            
            // Step 3: Process payment in Cassandra
            String paymentId = saga.execute("PROCESS_PAYMENT",
                () -> paymentService.processPayment(request.getPaymentInfo()),
                (paymentId) -> paymentService.refundPayment(paymentId)
            );
            
            // All steps successful - confirm order
            orderService.confirmOrder(orderId, paymentId, reservationId);
            
        } catch (Exception e) {
            // Compensate all executed steps
            saga.compensateAll();
            throw new OrderProcessingException("Order processing failed", e);
        }
    }
}

// Generic compensating transaction manager
public class CompensatingTransactionManager {
    
    private final List<CompensatingAction> executedActions = new ArrayList<>();
    
    public <T> T execute(String stepName, Supplier<T> action, Consumer<T> compensation) {
        try {
            T result = action.get();
            executedActions.add(new CompensatingAction<>(stepName, result, compensation));
            return result;
        } catch (Exception e) {
            log.error("Step {} failed: {}", stepName, e.getMessage());
            throw e;
        }
    }
    
    public void compensateAll() {
        // Compensate in reverse order
        for (int i = executedActions.size() - 1; i >= 0; i--) {
            CompensatingAction action = executedActions.get(i);
            try {
                action.compensate();
                log.info("Compensated step: {}", action.getStepName());
            } catch (Exception e) {
                log.error("Compensation failed for step: {}", action.getStepName(), e);
                // In production, you'd send to dead letter queue
            }
        }
    }
    
    @AllArgsConstructor
    private static class CompensatingAction<T> {
        private final String stepName;
        private final T result;
        private final Consumer<T> compensationAction;
        
        public void compensate() {
            compensationAction.accept(result);
        }
        
        public String getStepName() {
            return stepName;
        }
    }
}

Real-World Considerations

After implementing transaction management across dozens of production systems, here are the lessons that only come from battle scars:

Performance vs. Consistency Tradeoffs

// Different strategies for different use cases
@Service
public class OrderService {
    
    // High-consistency financial operations
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void processPayment(PaymentRequest request) {
        // Strict ACID guarantees
    }
    
    // Analytics operations can be eventually consistent
    @Transactional(readOnly = true, isolation = Isolation.READ_COMMITTED)
    public OrderAnalytics generateAnalytics(String customerId) {
        // Faster reads, acceptable if slightly stale
    }
    
    // Bulk operations need careful batching
    @Transactional
    public void processBulkOrders(List<CreateOrderRequest> requests) {
        int batchSize = 100;
        for (int i = 0; i < requests.size(); i += batchSize) {
            int end = Math.min(i + batchSize, requests.size());
            List<CreateOrderRequest> batch = requests.subList(i, end);
            
            for (CreateOrderRequest request : batch) {
                processOrder(request);
            }
            
            // Flush changes to avoid memory issues
            entityManager.flush();
            entityManager.clear();
        }
    }
}

Monitoring and Observability

Transaction boundaries are invisible until they fail. Proper monitoring is crucial:

@Component
public class TransactionMetrics {
    
    private final MeterRegistry meterRegistry;
    
    @EventListener
    public void handleTransactionCommit(TransactionCommitEvent event) {
        Timer.Sample sample = Timer.start(meterRegistry);
        sample.stop(Timer.builder("transaction.duration")
                .tag("status", "commit")
                .tag("name", event.getTransactionName())
                .register(meterRegistry));
    }
    
    @EventListener
    public void handleTransactionRollback(TransactionRollbackEvent event) {
        Counter.builder("transaction.rollback")
                .tag("reason", event.getRollbackReason())
                .register(meterRegistry)
                .increment();
    }
}

Testing Transaction Boundaries

@TestConfiguration
static class TransactionTestConfig {
    
    @Bean
    @Primary
    public PlatformTransactionManager testTransactionManager() {
        // Use test-specific transaction manager that tracks state
        return new TestTransactionManager();
    }
}

@Test
public void testTransactionRollbackOnFailure() {
    // Given
    CreateOrderRequest request = createValidOrderRequest();
    
    // Mock payment service to fail
    when(paymentService.processPayment(any(), any()))
        .thenThrow(new PaymentException("Payment failed"));
    
    // When
    assertThatThrownBy(() -> orderService.processOrder(request))
        .isInstanceOf(PaymentException.class);
    
    // Then
    assertThat(orderRepository.count()).isEqualTo(0); // No order created
    assertThat(inventoryRepository.findBySku("TEST_SKU").getQuantity())
        .isEqualTo(100); // Inventory not decremented
}

Conclusion

Transaction management isn’t just about preventing data corruption—it’s about building systems that you can reason about, debug, and trust. Whether you’re using Spring’s elegant annotations, Go’s explicit transaction handling, or building custom macros in Rust, the key principles remain the same:

  1. Make transaction boundaries explicit – Either through annotations, function signatures, or naming conventions
  2. Fail fast and fail clearly – Don’t let partial failures create zombie states
  3. Design for compensation – In distributed systems, rollback isn’t always possible
  4. Monitor transaction health – You can’t improve what you don’t measure
  5. Test failure scenarios – Happy path testing doesn’t catch transaction bugs

The techniques we’ve covered—from basic ACID transactions to sophisticated SAGA patterns—form a spectrum of tools. Choose the right tool for your consistency requirements, performance needs, and operational complexity. Remember, the best transaction strategy is the one that lets you sleep soundly at night, knowing your data is safe and your system is predictable.

September 13, 2025

Task Scheduling Algorithms in Distributed Orchestration Systems

Filed under: Computing,Concurrency — admin @ 3:03 pm

Modern distributed systems face a fundamental challenge: how to efficiently schedule and execute thousands of tasks across heterogeneous resources while maximizing throughput, minimizing latency, and ensuring fair resource allocation. This challenge becomes even more complex when dealing with workflows, dependencies, and varying resource requirements. I have written about building Formicary, an open-source distributed orchestration engine before and in this post, I’ll explore task scheduling algorithms for executing background tasks, jobs, and workflows through the lens of Formicary. We’ll examine how theoretical scheduling concepts translate into practical implementations in a production-ready system.

Formicary Architecture Overview

Before diving into scheduling algorithms, let’s understand Formicary’s architecture. The system follows a Leader-Follower pattern with two main components:

The Queen (Leader/Server)

  • API & UI Controllers: RESTful APIs and web dashboard
  • Job Scheduler: Leader-elected service that polls for pending jobs
  • Resource Manager: Tracks available ant workers and their capabilities
  • Job Supervisor: Orchestrates job execution as a DAG
  • Task Supervisor: Manages individual task lifecycle

The Ants (Followers/Workers)

  • Executors: Support for Docker, Kubernetes, Shell, HTTP, WebSocket, and custom protocols
  • Registration System: Workers advertise capabilities via tags and methods
  • Artifact Management: Handle dependencies and outputs

Key Features Supporting Advanced Scheduling

Formicary includes several features that enable sophisticated scheduling strategies:

  • Tags & Labels: Route tasks to specific workers based on capabilities
  • Priority Levels: Jobs can have different priority levels for execution order
  • Resource Constraints: Define CPU, memory, and storage requirements
  • Tenant Isolation: Multi-tenant support with quota management
  • Cron Scheduling: Time-based job scheduling
  • Concurrent Limits: Control maximum parallel job execution
  • Dynamic Scaling: Kubernetes-based auto-scaling support

Scheduling Algorithm Decision Flow

Before diving into specific algorithms, let’s visualize how different scheduling strategies route jobs through the system:

Job Execution Lifecycle

Understanding how jobs flow through the system helps illustrate where different scheduling algorithms take effect:

Wait Time Estimation Algorithm

Formicary implements a sophisticated wait time estimation system that helps users understand queue delays and plan accordingly. The algorithm considers multiple factors:

// Simplified wait time calculation
func CalculateWaitTime(jobRequest, queuePosition, historicalStats, availableWorkers) time.Duration {
    // 1. Find position in priority-ordered queue
    queuePosition := findQueuePosition(jobRequest.Priority, jobRequest.SubmissionTime)
    
    // 2. Calculate jobs ahead in queue (70% of executing jobs assumed near completion)
    jobsAhead := queuePosition + int(float64(executingJobs) * 0.7)
    
    // 3. Estimate based on historical average execution time
    if historicalAverage > 0 && availableWorkers > 0 {
        estimatedWait := time.Duration(float64(jobsAhead) / float64(availableWorkers)) * 
                        time.Duration(historicalAverage) * time.Millisecond
    }
    
    // 4. Account for scheduled future execution
    if jobRequest.ScheduledAt.After(time.Now()) {
        scheduleDiff := time.Until(jobRequest.ScheduledAt)
        if scheduleDiff > estimatedWait {
            estimatedWait = scheduleDiff
        }
    }
    
    return estimatedWait
}

Formicary uses JobStatsRegistry to track execution patterns:

type JobStats struct {
    SucceededJobsAverage int64  // Average execution time
    ExecutingJobs       int32   // Currently running
    AntsCapacity        int     // Available worker capacity
    AntsAvailable       bool    // Worker availability status
}

It considers worker availability and capacity constraints:

  • Calculates minimum available workers across all required task types
  • Accounts for tag-based routing restrictions
  • Factors in Kubernetes resource quotas

Formicary orders pending jobs by priority and submission time:

sort.Slice(pendingJobs, func(i, j int) bool {
    if job1.Priority == job2.Priority {
        return job1.CreatedAt.Before(job2.CreatedAt)  // FCFS within priority
    }
    return job1.Priority > job2.Priority  // Higher priority first
})

This estimation helps with:

  • SLA Management: Predict if jobs will meet deadlines
  • Capacity Planning: Identify when to scale worker pools
  • User Experience: Provide realistic wait time expectations
  • Load Balancing: Route jobs to less congested worker pools

Task Scheduling Algorithms in Practice

Now let’s examine how various scheduling algorithms are implemented or can be achieved in Formicary:

1. First-Come First-Serve (FCFS)

FCFS processes tasks in arrival order using a simple FIFO queue. The algorithm maintains fairness by ensuring no task is starved, but suffers from the “convoy effect” where short jobs wait behind long-running ones. Its characteristics include:

  • Average waiting time: (sum of waiting times) / number of jobs
  • Turnaround time: completion_time - arrival_time
  • No preemption – jobs run to completion

Formicary Implementation: This is Formicary’s default behavior. When jobs are submitted, they’re placed in a PENDING state and processed by the Job Scheduler in submission order.

# Job requests are processed in submission order
job_type: data-processing
description: FCFS example - processed in submission order
tasks:
  - task_type: process-data
    method: DOCKER
    container:
      image: python:3.9
    script:
      - python process_data.py

Pros: Simple, predictable, no starvation Cons: Long-running jobs can block shorter ones, poor average wait times

2. Priority Scheduling

Each job has an assigned priority, with higher priority jobs scheduled first. Priority assignment can be static or dynamic based on various factors. A drawback of of this algorithm is starvation of low-priority jobs but it can be addressed with following techniques:

  • Aging: Gradually increase priority of waiting jobs
  • Priority Inversion Protection: Temporary priority boost for resource holders
  • Fair Share: Ensure each user/tenant gets minimum resource allocation

Formicary Implementation: Jobs support priority levels, and the scheduler considers priority when selecting the next job to execute.

job_type: critical-analysis
priority: 10  # Higher priority job
description: Critical security analysis
tasks:
  - task_type: vulnerability-scan
    method: KUBERNETES
    container:
      image: security-scanner:latest
# Submitting jobs with different priorities
curl -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  --data '{"job_type": "critical-analysis", "job_priority": 10}' \
  $SERVER/api/jobs/requests

curl -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  --data '{"job_type": "routine-backup", "job_priority": 1}' \
  $SERVER/api/jobs/requests

Implementation Details: The Job Scheduler queries pending jobs ordered by priority, ensuring high-priority jobs are scheduled first when resources become available.

3. Multilevel Queues – Tag-Based Routing

This algorithm partitions jobs into separate queues based on characteristics (interactive, batch, system). Each queue can use different scheduling algorithms, with inter-queue scheduling typically using fixed priorities or time slicing. Common queue classification strategies include:

  • Job Type: Interactive, batch, system, real-time
  • Resource Requirements: CPU-intensive, I/O-intensive, memory-intensive
  • Duration: Short, medium, long-running jobs
  • User Class: Premium, standard, background users

Formicary Implementation: Using tags and labels, we can effectively create multilevel queues by routing different job types to specialized worker pools.

# Short-running analysis jobs
job_type: quick-scan
tags: ["fast-worker", "analysis"]
tasks:
  - task_type: preflight-check
    method: DOCKER
    tags: ["cpu-optimized"]
    container:
      image: lightweight-scanner:latest

---
# Medium-duration static analysis
job_type: static-analysis
tags: ["medium-worker", "analysis"] 
tasks:
  - task_type: code-analysis
    method: KUBERNETES
    tags: ["memory-optimized"]
    container:
      image: static-analyzer:latest
      memory_limit: "4Gi"

---
# Long-running dynamic analysis
job_type: dynamic-analysis
tags: ["heavy-worker", "analysis"]
tasks:
  - task_type: device-testing
    method: KUBERNETES
    tags: ["gpu-enabled", "device-farm"]
    container:
      image: dynamic-analyzer:latest
      resources:
        cpu_limit: "8"
        memory_limit: "16Gi"

At a mobile security company, I implemented this pattern with three distinct worker pools:

  • Fast Workers: Preflight analysis (seconds to minutes)
  • Medium Workers: Static analysis (a few minutes)
  • Heavy Workers: Dynamic analysis on device farms (multiple minutes to hours)

4. Resource-Aware Scheduling

This algorithm makes scheduling decisions based on current and predicted resource availability (CPU, memory, storage, network). It considers both resource requirements and system capacity to prevent oversubscription:

  • Multi-dimensional: CPU, memory, storage, network, custom resources
  • Quality of Service: Guaranteed, burstable, best-effort resource classes
  • Affinity/Anti-affinity (e.g.,Kubernetes Scheduler): Placement preferences and constraints

Advanced techniques include:

  • Bin Packing: First-fit, best-fit, worst-fit algorithms
  • Resource Fragmentation: Avoid unusable resource leftovers
  • Overcommitment: Schedule based on statistical usage patterns

Formicary Implementation: Integration with Kubernetes resource management and custom resource tracking.

job_type: ml-training
description: Resource-aware ML model training
tasks:
  - task_type: train-model
    method: KUBERNETES
    tags: ["gpu-node", "ml-workload"]
    container:
      image: tensorflow/tensorflow:latest-gpu
      cpu_request: "4"
      cpu_limit: "8"
      memory_request: "8Gi"
      memory_limit: "16Gi"
      ephemeral_storage_request: "10Gi"
    node_selector:
      hardware: "gpu-enabled"
      instance-type: "ml-optimized"
    tolerations:
      - key: "gpu-workload"
        operator: "Equal"
        value: "true"
        effect: "NoSchedule"

The Resource Manager tracks worker capabilities and current load, ensuring tasks are only scheduled when adequate resources are available.

5. Matchmaking Scheduler – Affinity-Based Routing

It matches jobs to workers based on capabilities, data locality, and preferences. Uses constraint satisfaction to find optimal job-worker pairings. Matching algorithms include:

  • Hungarian Algorithm: Optimal assignment for bipartite matching
  • Market-based: Economic models with bids and auctions
  • Constraint Satisfaction (Kubernetes/Apache Spark): Match job requirements to worker capabilities

Common locality considerations include:

  • Data Locality: Schedule jobs where data resides
  • Network Topology: Minimize network hops and bandwidth usage
  • Hardware Affinity: GPU jobs to GPU nodes, FPGA workloads to FPGA nodes

Formicary Implementation: Using tags, labels, and Kubernetes affinity rules to achieve data locality and capability matching.

job_type: geo-distributed-processing
description: Process data close to its source
tasks:
  - task_type: process-eu-data
    method: KUBERNETES
    tags: ["eu-region", "gdpr-compliant"]
    container:
      image: data-processor:latest
    affinity:
      node_affinity:
        required_during_scheduling_ignored_during_execution:
          node_selector_terms:
            - match_expressions:
                - key: "region"
                  operator: In
                  values: ["eu-west-1", "eu-central-1"]
                - key: "compliance"
                  operator: In
                  values: ["gdpr"]
    variables:
      DATA_REGION: "eu"
      COMPLIANCE_MODE: "strict"

In mobile security analysis company, I used matchmaking scheduling to manage physical device farms where each device has unique characteristics. The system implemented two-phase matchmaking: first reserving devices based on requirements like platform (iOS/Android), OS version ranges, device type (phone/tablet), and capabilities (SMS, camera, GPS), then using affinity rules to route jobs to the specific reserved device.

Pros: Optimal resource matching, data locality, flexibility Cons: Complex matching logic, potential for suboptimal assignments under constraints

6. Delay Scheduler – Temporal Control

Delay scheduling deliberately postpones task execution until optimal conditions are met, such as data locality, resource availability, or specific timing requirements. The algorithm balances waiting for better conditions against potential starvation, often using configurable timeout thresholds.

Optimal_Delay = min(Max_Wait_Time, Expected_Benefit_Time)
Where:
- Max_Wait_Time = configured upper bound to prevent starvation
- Expected_Benefit_Time = estimated time until optimal conditions
- Locality_Benefit = (Remote_Cost - Local_Cost) / Transfer_Rate

Common delay strategies include:

  • Data Locality Delay: Wait for data to become available on local nodes
  • Resource Availability Delay: Wait for preferred resource types to become free
  • Temporal Delay: Execute at specific times (off-peak hours, scheduled windows)
  • Condition-Based Delay: Wait for external system states or events

Formicary’s Delay Implementations:

  1. Time-Based Scheduling:
job_type: nightly-etl
cron_trigger: "0 2 * * *"  # 2 AM daily
scheduled_at: "2024-12-25T02:00:00Z"  # One-time future execution
  1. Condition-Based Polling:
job_type: external-dependency-wait
tasks:
  - task_type: wait-for-api
    method: HTTP_GET
    url: https://api.service.com/status
    delay_between_retries: 30s
    retry: 20  # Maximum 10 minutes of polling
    on_exit_code:
      "200": proceed-with-processing    # Service ready
      "404": EXECUTING                  # Keep polling
      "503": EXECUTING                  # Service temporarily unavailable
      "FAILED": abort-job               # Permanent failure
  1. Resource Availability Delay:
job_type: gpu-intensive-training
tasks:
  - task_type: training
    method: KUBERNETES
    tags: ["gpu-v100", "high-memory"]
    timeout: 6h
    # Will delay until specific GPU resources become available

Pros:

  • Improved data locality and reduced network I/O
  • Better resource utilization through temporal load balancing
  • Flexible execution timing for cost optimization
  • Support for external dependency coordination

Cons:

  • Increased job latency and scheduling complexity
  • Risk of starvation without proper timeout mechanisms
  • Difficulty in predicting optimal delay periods
  • Potential for cascading delays in dependent workflows

7. Capacity Scheduler – Resource Quotas

Capacity scheduling partitions cluster resources into hierarchical queues with guaranteed minimum capacities and configurable maximum limits. Each queue can elastically use unused capacity from other queues while respecting absolute limits and priority policies.

Queue_Capacity = (Allocated_Resources / Total_Cluster_Resources) × 100%
Effective_Capacity = min(Max_Capacity, Guaranteed_Capacity + Available_Borrowed_Capacity)
Resource_Utilization = Used_Resources / Effective_Capacity

Common principles include:

  • Capacity Guarantees: Each queue has minimum guaranteed resources
  • Elastic Sharing: Unused capacity can be borrowed by other queues
  • Preemption: Higher priority queues can reclaim borrowed resources
  • Hierarchical Organization: Nested queues for organizational structure

Common queue management strategies include:

  • FIFO within Queues: Simple first-come-first-served within capacity limits
  • Priority Ordering: High-priority jobs within queues get preference
  • Fair Share: Proportional resource distribution among queue users
  • Preemption Policies: Graceful vs. aggressive resource reclamation

Formicary Implementation Features:

# Organization-level capacity limits
tenant_limits:
  max_concurrent_jobs: 50
  max_cpu_hours_per_day: 200
  max_storage_gb: 500

# Job-level concurrency control  
job_type: batch-processing
max_concurrency: 3  # Limit concurrent instances

Capacity enforcement mechanisms include:

  • Hard Limits: Absolute maximum resource consumption
  • Soft Limits: Warning thresholds with potential throttling
  • Burst Capacity: Temporary exceeding of limits during low contention
  • Quota Reset Periods: Time-based quota renewals (daily, weekly, monthly)

Pros: Predictable resource allocation, multi-tenant isolation, elastic resource sharing, hierarchical management Cons: Complex configuration, potential resource fragmentation, underutilization during low demand, administrative overhead

8. Fair Scheduler – Multi-Tenant Fairness

It ensures proportional resource sharing among users, groups, or tenants over time. Uses techniques like weighted fair queueing and deficit round-robin to achieve long-term fairness while maintaining efficiency. Common metrics include:

  • Proportional Share (Hadoop Fair Scheduler): Resources allocated based on weights/quotas
  • Max-Min Fairness (Kubernetes): Maximize minimum allocation across users
  • Deadline Fairness: Ensure SLA compliance across tenants

Advanced fair sharing includes:

  • Hierarchical Fair Sharing: Nested user groups and organizations
  • Dominant Resource Fairness: Fair allocation across multiple resource types
  • Lottery Scheduling: Probabilistic fairness using tickets

Formicary Implementation: It implements tenant isolation with quota enforcement.

// Formicary's Fair Scheduling based on actual implementation
type FairScheduler struct {
    jobStatsRegistry *JobStatsRegistry
    serverConfig     *ServerConfig
}

func (fs *FairScheduler) CheckFairSchedulingConstraints(request *JobRequest) error {
    // Multi-level concurrency checking: Organization ? User ? Job-level
    
    // Level 1: Organization concurrency limits
    userExecuting, orgExecuting := fs.jobStatsRegistry.UserOrgExecuting(request)
    
    if orgExecuting >= fs.getMaxConcurrentOrgJobs(request.OrganizationID) {
        return fs.delayJobForConcurrencyExceeded(request, "organization", orgExecuting)
    }
    
    // Level 2: User concurrency limits  
    if userExecuting >= fs.getMaxConcurrentUserJobs(request.UserID) {
        return fs.delayJobForConcurrencyExceeded(request, "user", userExecuting)
    }
    
    // Level 3: Job-type concurrency limits
    executionCount := fs.jobStatsRegistry.GetExecutionCount(request.GetUserJobTypeKey())
    if executionCount >= request.GetMaxConcurrency() {
        return fs.delayJobForConcurrencyExceeded(request, "job-type", int(executionCount))
    }
    
    return nil
}

func (fs *FairScheduler) delayJobForConcurrencyExceeded(request *JobRequest, limitType string, currentCount int) error {
    // Intelligent delay calculation based on historical data
    avgCompletionTime := fs.jobStatsRegistry.GetAverageCompletionTime(request.JobType)
    
    // Dynamic wait factor: 25% of average completion time, bounded between 15-60 seconds
    waitFactor := min(max(avgCompletionTime/4, 15*time.Second), 60*time.Second)
    
    // Randomized delay to prevent thundering herd
    randomDelay := time.Duration(rand.Intn(int(waitFactor))) + waitFactor
    
    // Reschedule with delay
    request.ScheduledAt = time.Now().Add(randomDelay)
    
    // Logarithmic priority degradation (inspired by mobile security company approach)
    if request.Priority > 0 {
        // Priority degradation: log_e(original_priority), minimum of 1
        newPriority := max(1, int(math.Log(float64(request.Priority))))
        request.Priority = newPriority
        
        // Allow zero-priority jobs to bypass concurrency limits (emergency valve)
        if newPriority <= 0 {
            return nil // Allow execution despite limits
        }
    }
    
    // Update schedule attempts counter with exponential backoff
    request.ScheduleAttempts++
    
    return fmt.Errorf("%s concurrency limit exceeded: %d jobs running, rescheduling with %v delay", 
                     limitType, currentCount, randomDelay)
}

// Enhanced concurrency tracking from mobile security company experience
func (fs *FairScheduler) trackConcurrencyMetrics(request *JobRequest) {
    // Real-time metrics for monitoring fairness
    fs.metricsRegistry.Gauge("org_concurrent_jobs", map[string]string{
        "org_id": request.OrganizationID,
        "job_type": request.JobType,
    })
    
    fs.metricsRegistry.Gauge("user_concurrent_jobs", map[string]string{
        "user_id": request.UserID, 
        "job_type": request.JobType,
    })
}

Pros: Prevents monopolization, guarantees minimum service levels Cons: May sacrifice efficiency for fairness, complex weight management

9. Earliest Deadline First (EDF)

Dynamic priority algorithm that assigns highest priority to tasks with earliest deadlines. Optimal for single-processor real-time scheduling if total utilization ? 100%. It uses deadline as the primary scheduling criterion. SJF selects the job with the smallest estimated execution time, minimizing average waiting time.

Schedulability Test: ?(Ci/Ti) ? 1
Where Ci = execution time, Ti = period for periodic tasks

EDF is a dynamic priority scheduling algorithm that assigns highest priority to tasks with the earliest absolute deadlines. It’s provably optimal for single-processor real-time scheduling when total CPU utilization ? 100%, providing maximum schedulability under deadline constraints.

Priority(task) = 1 / (Deadline - Current_Time)
Schedulability_Test: ?(Execution_Time_i / Period_i) ? 1
Laxity = Deadline - Current_Time - Remaining_Execution_Time

Core characteristics include:

  • Dynamic Priority: Priorities change as deadlines approach
  • Work-Conserving: Never idles processor when tasks are ready
  • Deadline-Driven: Scheduling decisions based purely on temporal constraints
  • Optimal Utilization: Achieves 100% processor utilization when schedulable

Failure modes include:

  • Domino Effect: Single deadline miss can cascade to subsequent tasks
  • Thrashing: Excessive context switching under overload conditions
  • Unpredictable Overload: Graceful degradation requires additional mechanisms

While Formicary does not natively supported, SJF can be approximated using separate queues for different job duration categories:

# Short jobs queue
job_type: quick-validation
tags: ["short-queue"]
estimated_runtime: "5m"

# Long jobs queue  
job_type: full-analysis
tags: ["long-queue"] 
estimated_runtime: "2h"

Deadline assignment strategies include:

Relative_Deadline = Period (for periodic tasks)
Absolute_Deadline = Arrival_Time + Relative_Deadline
Critical_Instant = Simultaneous release of all tasks

It can also be simulated using priority scheduling combined with deadline-aware job submission:

# Simulated EDF using priority and scheduled execution
job_type: time-critical-analysis
priority: {{.UrgencyScore}}  # Calculated based on deadline proximity
scheduled_at: "2024-12-31T23:59:59Z"
timeout: 2h
tasks:
  - task_type: urgent-processing
    method: KUBERNETES
    tags: ["priority-worker"]

Implementation Approach: Calculate priority dynamically based on (current_time - deadline) / estimated_runtime to ensure jobs closer to their deadlines receive higher priority.

Pros: Optimal schedulability, maximum resource utilization, simple algorithm, responsive to urgent tasks Cons: Domino effect failures, requires accurate execution time estimates, poor overload behavior, high context switching overhead

10. Speculative Scheduler

It launches multiple instances of slow-running tasks to reduce tail latency. Uses statistical analysis of execution times to detect stragglers and make speculative execution decisions. Balances resource cost against latency improvement. Algorithms include:

  • Progress-based: Monitor task completion percentage
  • Time-based: Tasks running longer than percentile threshold
  • Resource-based: Launch backup only if resources available

Pros: Reduces tail latency, improves user experience, fault tolerance Cons: Resource waste, coordination overhead, may not help heterogeneous workloads

Formicary Status: Not implemented, but the system provides foundation through:

  • Task execution monitoring
  • Historical performance data collection
  • Resource availability tracking

Conceptual Implementation:

job_type: speculative-execution
tasks:
  - task_type: main-task
    method: KUBERNETES
    timeout: 30m
    speculative_backup:
      enabled: true
      delay_threshold: "150%"  # Start backup if 50% slower than average
      resource_threshold: 0.3  # Only if 30%+ resources available

11: Polling and Sensors

Beyond time-based delays, Formicary supports condition-based scheduling through polling sensors that wait for external conditions to be met:

job_type: sensor-job
description: Wait for external conditions before proceeding
tasks:
  - task_type: wait-for-resource
    method: HTTP_GET
    url: https://api.example.com/resource/123
    delay_between_retries: 15s
    retry: 20  # Poll up to 20 times (5 minutes total)
    timeout: 15s
    on_exit_code:
      "200": process-resource    # Success - proceed
      "404": EXECUTING          # Resource not ready - poll again  
      "FAILED": FATAL           # Server error - abort job

  - task_type: process-resource
    method: DOCKER
    container:
      image: data-processor:latest
    script:
      - echo "Resource is now available, processing..."

The key insight is using EXECUTING as an exit code value, which keeps the task in a polling loop rather than completing or failing.

12. Gang Scheduling

Gang scheduling coordinates simultaneous execution of related tasks that need to communicate or synchronize. Instead of scheduling tasks independently, the system reserves resources for all tasks in a “gang” and schedules them together to avoid partial execution and resource deadlocks. Key principles include:

  • All-or-Nothing (MPI Applications): Either all tasks in the gang get scheduled or none do
  • Synchronized Start: Tasks begin execution simultaneously
  • Resource Reservation (Kubernetes Jobs): Pre-allocate resources for the entire task group
  • Communication Optimization: Minimize synchronization delays between related tasks
Gang_Size = max(task_count, critical_path_parallelism)
Resource_Requirement = ?(individual_task_resources) for all gang members
Schedulability = available_resources >= Resource_Requirement

Formicary’s Gang Scheduling Implementation: Formicary implements gang scheduling at the job level through its Resource Manager. When a job is scheduled, resources are pre-allocated for ALL tasks before execution begins:

// Core gang scheduling logic from ResourceManager
func (rm *ManagerImpl) doReserveJobResources(
	requestID string,
	def *types.JobDefinition,
	dryRun bool) (reservations map[string]*common.AntReservation, err error) {
	
	reservations = make(map[string]*common.AntReservation)
	var alloc *common.AntReservation
	
	// Try to reserve resources for each task
	for _, task := range def.Tasks {
		alloc, err = rm.doReserve(requestID, task.TaskType, task.Method, task.Tags, dryRun)
		if err == nil {
			reservations[task.TaskType] = alloc
		} else {
			if !dryRun {
				// ALL-OR-NOTHING: Release all allocations and fail entire job
				_ = rm.ReleaseJobResources(requestID)
			}
			return nil, err
		}
	}
	return reservations, nil
}

Two-Phase Gang Scheduling Process:

  1. Resource Check Phase (Dry Run):
// Check if all job resources are available without allocating
func (rm *ManagerImpl) CheckJobResources(job *types.JobDefinition) ([]*common.AntReservation, error) {
	if reservationsByTask, err := rm.doReserveJobResources("", job, true); err != nil {
		return nil, err // Gang scheduling not possible
	}
	// All tasks can be scheduled - proceed to actual reservation
}
  1. Resource Reservation Phase (Actual Allocation):
// Atomically reserve resources for all tasks
func (rm *ManagerImpl) ReserveJobResources(requestID string, def *types.JobDefinition) (map[string]*common.AntReservation, error) {
	return rm.doReserveJobResources(requestID, def, false)
}

Scheduler Integration with Gang Scheduling:

The Job Scheduler uses gang scheduling through a two-step verification process:

// Step 1: Check if gang scheduling is possible
if err = jobStateMachine.CheckAntResourcesAndConcurrencyForJob(); err != nil {
	// Gang scheduling failed - implement backoff strategy
	scheduleAttempts := request.ScheduleAttempts + 1
	scheduleSecs := math.Min(int(maxWait.Seconds()), scheduleAttempts*5)
	
	// Exponential backoff with priority degradation
	if scheduleAttempts >= 5 && scheduleAttempts%5 == 0 && request.JobPriority > 5 {
		request.JobPriority-- // Degrade priority every 5th attempt
	}
	
	request.ScheduledAt = request.ScheduledAt.Add(time.Duration(scheduleSecs) * time.Second)
	return fmt.Errorf("gang scheduling failed - will retry")
}

// Step 2: Perform actual gang scheduling
if err = jobStateMachine.ReserveJobResources(); err != nil {
	// Even after check, allocation failed - very rare race condition
	return fmt.Errorf("gang allocation failed after successful check")
}

Example Job with Gang Scheduling:

job_type: distributed-ml-training
description: Gang scheduled ML training requiring coordinated execution
tasks:
  - task_type: parameter-server
    method: KUBERNETES
    tags: ["ml-cluster", "coordinator"]
    container:
      image: tensorflow:latest
      cpu_request: "4"
      memory_request: "8Gi"
    
  - task_type: worker-node-1
    method: KUBERNETES  
    tags: ["ml-cluster", "gpu-enabled"]
    container:
      image: tensorflow:gpu
      cpu_request: "8"
      memory_request: "16Gi"
      
  - task_type: worker-node-2
    method: KUBERNETES
    tags: ["ml-cluster", "gpu-enabled"]
    container:
      image: tensorflow:gpu
      cpu_request: "8" 
      memory_request: "16Gi"

  - task_type: aggregator
    method: KUBERNETES
    tags: ["ml-cluster"]
    dependencies: ["parameter-server", "worker-node-1", "worker-node-2"]

Formicary’s Gang Scheduling Features:

  1. Atomic Resource Allocation: All task resources are reserved simultaneously
  2. Automatic Rollback: Failed gang allocation releases all previously reserved resources
  3. Backoff Strategy: Jobs that can’t be gang-scheduled use exponential backoff
  4. Priority Degradation: Long-waiting jobs have priority reduced to prevent resource hogging
  5. Resource Fragmentation Prevention: Avoids partial allocations that waste resources

Formicary tracks gang scheduling effectiveness through metrics:

  • scheduler_failed_total: Jobs that couldn’t be gang-scheduled
  • scheduler_no_more_jobs_total: Scheduler iterations with no schedulable jobs
  • Schedule attempts per job to identify resource contention patterns

Pros:

  • Eliminates partial execution and resource deadlocks
  • Optimal for tightly-coupled distributed workloads
  • Automatic retry with intelligent backoff
  • Priority-based fairness with degradation

Cons:

  • Can lead to resource underutilization
  • Higher latency for large job gangs
  • Complex resource accounting and cleanup
  • May cause convoy effects for large jobs

Integration with Other Algorithms: Gang scheduling in Formicary works alongside:

  • Priority Scheduling: Higher priority gangs get resources first
  • Resource-Aware Scheduling: Considers total gang resource requirements
  • Fair Scheduling: Gang resource consumption counted toward tenant quotas
  • Capacity Scheduling: Gangs compete for available cluster capacity

Advanced Scheduling Patterns

Hybrid Scheduling Strategy

In practice, Formicary often combines multiple scheduling algorithms:

job_type: hybrid-ml-pipeline
priority: 8                    # Priority scheduling
max_concurrency: 3             # Capacity scheduling  
tags: ["gpu-cluster"]          # Matchmaking
cron_trigger: "0 */6 * * *"    # Delay scheduling
tasks:
  - task_type: data-preprocessing
    tags: ["cpu-optimized"]     # Multilevel queues
    method: KUBERNETES
    container:
      cpu_request: "2"          # Resource-aware
      memory_request: "4Gi"
  
  - task_type: model-training
    tags: ["gpu-optimized"]
    method: KUBERNETES
    container:
      image: tensorflow:gpu
    dependencies: ["data-preprocessing"]

Fork-Join Pattern for Parallel Processing

Formicary supports sophisticated parallel execution patterns:

job_type: parallel-video-encoding
description: Parallel processing with fork-join
tasks:
  - task_type: split-video
    method: DOCKER
    container:
      image: ffmpeg:latest
    script:
      - ffmpeg -i input.mp4 -f segment segment_%03d.mp4
    on_completed: fork-encode

  - task_type: fork-encode
    method: FORK_JOB
    fork_job_type: encode-segment
    variables:
      segment_count: 8
    on_completed: await-encoding

  - task_type: await-encoding  
    method: AWAIT_FORKED_JOB
    await_forked_tasks: ["fork-encode"]
    on_completed: merge-video

  - task_type: merge-video
    method: DOCKER
    container:
      image: ffmpeg:latest
    script:
      - ffmpeg -f concat -i segments.txt -c copy output.mp4

Performance Optimizations

Cache-Aware Scheduling

Formicary supports dependency caching to improve scheduling efficiency:

job_type: node-build
tasks:
  - task_type: install-deps
    method: DOCKER
    container:
      image: node:16
    cache:
      key: "node-deps-{{checksum 'package-lock.json'}}"
      paths:
        - node_modules
    script:
      - npm ci

Artifact-Based Dependencies

Smart scheduling based on artifact availability:

job_type: deployment-pipeline
tasks:
  - task_type: build
    artifacts:
      paths: ["dist/"]
    
  - task_type: test
    dependencies: ["build"]  # Waits for build artifacts
    
  - task_type: deploy
    dependencies: ["test"]
    method: KUBERNETES

Monitoring and Observability

Effective scheduling requires comprehensive monitoring:

# Built-in metrics and alerts
metrics:
  - queue_depth_by_priority
  - average_wait_time_by_tag
  - resource_utilization_by_worker
  - job_completion_rate_by_tenant

alerts:
  - name: high_queue_depth
    condition: queue_depth > 1000
    action: scale_workers
    
  - name: poor_resource_utilization  
    condition: cpu_utilization < 30%
    action: consolidate_workers

Real-World Case Study: Mobile Security Analysis Platform

At a mobile security company, I implemented a similar system with three-tier scheduling:

Tier 1: Preflight Analysis (Fast Queue)

  • Duration: 30 seconds – 2 minutes
  • Workers: CPU-optimized containers
  • Algorithm: Priority + FCFS
  • Use Case: Basic file validation, metadata extraction

Tier 2: Static Analysis (Medium Queue)

  • Duration: 5 minutes – 1 hour
  • Workers: Memory-optimized containers
  • Algorithm: Resource-aware + Fair scheduling
  • Use Case: Code analysis, vulnerability scanning

Tier 3: Dynamic Analysis (Heavy Queue)

  • Duration: 1 – 8 hours
  • Workers: GPU-enabled device farm
  • Algorithm: Matchmaking + Capacity scheduling
  • Use Case: Runtime behavior analysis, ML inference

This architecture processed over 100,000 mobile apps daily with 99.9% availability and optimal resource utilization.

Best Practices and Lessons Learned

1. Start Simple, Scale Gradually

Begin with FCFS and basic priority scheduling. Add complexity as your workload characteristics become clear.

2. Observability

Track queue depths, wait times, resource utilization, and job completion rates by different dimensions (tenant, job type, worker pool).

3. Design for Elasticity

Use Kubernetes HPA and custom metrics to automatically scale worker pools based on queue depth and resource utilization.

4. Implement Circuit Breakers

Prevent cascading failures when downstream services are unavailable.

5. Use Dead Letter Queues

Handle persistently failing tasks gracefully:

error_handling:
  max_retries: 3
  dead_letter_queue: "failed-jobs"
  alert_on_dlq: true
  • Earliest Deadline First: For time-sensitive workflows
  • Speculative Execution: For fault tolerance and performance
  • Gang Scheduling: For tightly-coupled parallel jobs

Conclusion

Formicary demonstrates how theoretical scheduling algorithms translate into practical distributed systems. It combines multiple strategies—priority scheduling, resource awareness, fair sharing, and intelligent routing for handling diverse workloads while maintaining predictable performance. The key insight is that real-world schedulers rarely use a single algorithm. Instead, they combine multiple approaches, leveraging the strengths of each for different aspects of the scheduling problem. Tags and labels provide the flexibility to implement sophisticated routing logic, while Kubernetes integration enables resource-aware scheduling at scale.

Whether you’re building CI/CD pipelines, data processing workflows, or ML training systems, understanding these scheduling patterns and their trade-offs is crucial for designing systems that scale efficiently and reliably.


Formicary is open source and available at github.com/bhatti/formicary. Try it out for your next workflow automation project!

September 12, 2025

The Byzantine Generals Problem: A Modern Performance Analysis in Elixir, Erlang, and Rust

Filed under: Computing,Concurrency — admin @ 2:35 pm

Introduction

In 2007, I wrote about implementing Leslie Lamport’s Byzantine Generals Problem algorithm across several programming languages. At the time, this seemed like an interesting theoretical exercise in distributed computing. I didn’t realize that a year later, Satoshi Nakamoto would publish the Bitcoin whitepaper, introducing a decentralized, Sybil resistant digital currency that solved Byzantine fault tolerance at unprecedented scale.

Nearly two decades later, I’m returning to the Byzantine Generals Problem with the perspective that only hindsight provides. This updated post implements the algorithm in modern languages—Rust, Elixir, and contemporary Erlang.

The Byzantine Generals Problem: A Refresher

The Byzantine Generals Problem, first formalized by Leslie Lamport, addresses a fundamental challenge in distributed computing: how can distributed parties reach consensus when some parties may be unreliable or malicious? For example, imagine several divisions of the Byzantine army camped outside an enemy city, each commanded by a general. The generals must coordinate to either attack or retreat, but they can only communicate by messenger. The challenge: some generals might be traitors who will try to confuse the others by sending conflicting messages. For a solution to work, two conditions must be met:

  1. IC1: All loyal lieutenants obey the same order
  2. IC2: If the commanding general is loyal, then every loyal lieutenant obeys the order he sends

One of the most striking results is that no solution exists with fewer than 3m + 1 generals to handle m traitors. With only three generals, no algorithm can handle even a single traitor.

Why This Matters

When I originally wrote about this problem in 2007, Bitcoin didn’t exist. Satoshi Nakamoto’s whitepaper was published in 2008, and the first Bitcoin block wasn’t mined in 2009. Bitcoin’s proof-of-work consensus mechanism essentially solves the Byzantine Generals Problem in a novel way:

  • Generals = Miners: Each miner is like a general trying to reach consensus
  • Orders = Transactions: The “order” is which transactions to include in the next block
  • Traitors = Malicious Miners: Some miners might try to double-spend or create invalid blocks
  • Solution = Longest Chain: The network accepts the longest valid chain as truth

Bitcoin’s brilliant insight was using computational work (proof-of-work) as a way to make it economically expensive to be a “traitor.” As long as honest miners control more than 50% of the computing power, the system remains secure.

Modern Applications Beyond Blockchain

The Byzantine Generals Problem isn’t just about cryptocurrency. It’s fundamental to many critical systems:

  • Aircraft Control Systems: Multiple redundant computers must agree on flight controls
  • Satellite Networks: Space-based systems need fault tolerance against radiation-induced failures
  • Missile Defense: Critical decisions must be made reliably even with component failures
  • Distributed Databases: Systems like Apache Cassandra and MongoDB use Byzantine fault-tolerant algorithms
  • Container Orchestration: Kubernetes uses etcd, which implements Byzantine fault-tolerant consensus
  • Central Bank Digital Currencies (CBDCs): Many countries are exploring blockchain-based national currencies
  • Cross-Border Payments: Systems like Ripple use Byzantine fault-tolerant consensus

Implementation: Modern Languages for a Classic Problem

Let’s implement the Byzantine Generals Problem in three modern languages: Rust, Elixir, and updated Erlang. Each brings different strengths to distributed computing.

Why These Languages?

  • Rust: Memory safety without garbage collection, excellent for systems programming
  • Elixir: Built on the Actor model, designed for fault-tolerant distributed systems
  • Erlang: The original Actor model language, battle-tested in telecom systems

Core Algorithm

We’ll implement the OM(m) algorithm (Oral Messages with m traitors) that works for 3m + 1 or more generals.

Rust Implementation

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use tracing::{debug, info};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Value {
    Zero,
    One,
    Retreat, // Default value
}

impl std::fmt::Display for Value {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Value::Zero => write!(f, "ZERO"),
            Value::One => write!(f, "ONE"),
            Value::Retreat => write!(f, "RETREAT"),
        }
    }
}

#[derive(Clone, Debug)]
pub struct Configuration {
    pub source: usize,
    pub num_rounds: usize,
    pub num_processes: usize,
}

pub struct ByzantineEngine {
    config: Configuration,
    processes: Vec<Arc<Mutex<Process>>>,
    message_count: Arc<Mutex<usize>>,
}

pub struct Process {
    id: usize,
    config: Configuration,
    values: HashMap<String, Value>,
    is_faulty: bool,
}

impl Process {
    pub fn new(id: usize, config: Configuration) -> Self {
        let is_faulty = id == config.source || id == 2; // Configure faulty processes
        Process {
            id,
            config,
            values: HashMap::new(),
            is_faulty,
        }
    }

    pub fn receive_message(&mut self, path: String, value: Value) {
        debug!("Process {} received message: path={}, value={:?}", self.id, path, value);
        self.values.insert(path, value);
    }

    pub fn send_messages(&self, round: usize, processes: &[Arc<Mutex<Process>>], 
                        message_count: Arc<Mutex<usize>>) {
        if round == 0 && self.id == self.config.source {
            self.send_initial_messages(processes, message_count);
        } else if round > 0 {
            self.relay_messages(round, processes, message_count);
        }
    }

    fn send_initial_messages(&self, processes: &[Arc<Mutex<Process>>], 
                           message_count: Arc<Mutex<usize>>) {
        let base_value = Value::Zero;
        
        for (i, process) in processes.iter().enumerate() {
            if i != self.id {
                let value = if self.is_faulty {
                    // Faulty commander sends different values to different processes
                    if i % 2 == 0 { Value::Zero } else { Value::One }
                } else {
                    base_value.clone()
                };
                
                let value_for_log = value.clone(); // Clone for logging
                let mut proc = process.lock().unwrap();
                proc.receive_message(self.id.to_string(), value);
                *message_count.lock().unwrap() += 1;
                
                debug!("Commander {} sent {:?} to process {}", self.id, value_for_log, i);
            }
        }
    }

    fn relay_messages(&self, round: usize, processes: &[Arc<Mutex<Process>>], 
                     message_count: Arc<Mutex<usize>>) {
        let paths = self.get_paths_for_round(round - 1);
        
        for path in paths {
            if let Some(value) = self.values.get(&path) {
                let new_value = self.transform_value(value.clone());
                let new_path = format!("{}{}", path, self.id);
                
                for (i, process) in processes.iter().enumerate() {
                    if i != self.id && !self.path_contains_process(&new_path, i) {
                        let mut proc = process.lock().unwrap();
                        proc.receive_message(new_path.clone(), new_value.clone());
                        *message_count.lock().unwrap() += 1;
                        
                        debug!("Process {} relayed {:?} to process {} with path {}", 
                               self.id, new_value, i, new_path);
                    }
                }
            }
        }
    }

    fn transform_value(&self, value: Value) -> Value {
        if self.is_faulty && self.id == 2 {
            Value::One // Process 2 always sends One when faulty
        } else {
            value
        }
    }

    fn get_paths_for_round(&self, round: usize) -> Vec<String> {
        if round == 0 {
            vec![self.config.source.to_string()]
        } else {
            self.values.keys()
                .filter(|path| path.len() == round + 1)
                .cloned()
                .collect()
        }
    }

    fn path_contains_process(&self, path: &str, process_id: usize) -> bool {
        path.contains(&process_id.to_string())
    }

    pub fn decide(&self) -> Value {
        if self.id == self.config.source {
            // Source process uses its own value
            return if self.is_faulty { Value::One } else { Value::Zero };
        }

        self.majority_vote()
    }

    fn majority_vote(&self) -> Value {
        let mut counts = HashMap::new();
        counts.insert(Value::Zero, 0);
        counts.insert(Value::One, 0);
        counts.insert(Value::Retreat, 0);

        // Count values from the final round paths
        let final_paths: Vec<_> = self.values.keys()
            .filter(|path| path.len() == self.config.num_rounds + 1)
            .collect();

        if final_paths.is_empty() {
            // Count all available values if no final round paths
            for value in self.values.values() {
                *counts.entry(value.clone()).or_insert(0) += 1;
            }
        } else {
            for path in final_paths {
                if let Some(value) = self.values.get(path) {
                    *counts.entry(value.clone()).or_insert(0) += 1;
                }
            }
        }

        debug!("Process {} vote counts: {:?}", self.id, counts);

        // Find majority
        let total_votes: usize = counts.values().sum();
        if total_votes == 0 {
            return Value::Retreat;
        }

        let majority_threshold = total_votes / 2;
        
        for (value, count) in counts {
            if count > majority_threshold {
                return value;
            }
        }

        Value::Retreat // Default if no majority
    }

    pub fn is_faulty(&self) -> bool {
        self.is_faulty
    }

    pub fn is_source(&self) -> bool {
        self.id == self.config.source
    }
}

impl ByzantineEngine {
    pub fn new(source: usize, num_rounds: usize, num_processes: usize) -> Self {
        let config = Configuration { source, num_rounds, num_processes };
        let processes: Vec<Arc<Mutex<Process>>> = (0..num_processes)
            .map(|id| Arc::new(Mutex::new(Process::new(id, config.clone()))))
            .collect();

        ByzantineEngine {
            config,
            processes,
            message_count: Arc::new(Mutex::new(0)),
        }
    }

    pub fn run(&self) -> (Duration, usize) {
        info!("Starting Byzantine Generals algorithm with {} processes, {} rounds", 
              self.config.num_processes, self.config.num_rounds);
        
        let start = Instant::now();
        
        for round in 0..self.config.num_rounds {
            debug!("Starting round {}", round);
            
            let handles: Vec<_> = self.processes.iter().enumerate().map(|(_id, process)| {
                let process = Arc::clone(process);
                let processes = self.processes.clone();
                let message_count = Arc::clone(&self.message_count);
                
                thread::spawn(move || {
                    let proc = process.lock().unwrap();
                    proc.send_messages(round, &processes, message_count);
                })
            }).collect();

            for handle in handles {
                // Add timeout to prevent hanging
                if handle.join().is_err() {
                    eprintln!("Warning: Thread failed in round {}", round);
                }
            }
            
            debug!("Completed round {}", round);
            // Small delay to ensure message ordering
            thread::sleep(Duration::from_millis(10));
        }

        let duration = start.elapsed();
        let messages = *self.message_count.lock().unwrap();
        
        info!("Algorithm completed in {:.2}ms with {} messages", 
              duration.as_millis(), messages);
        
        self.print_results();
        
        (duration, messages)
    }

    fn print_results(&self) {
        println!("\nByzantine Generals Results:");
        println!("===========================");
        
        for (id, process) in self.processes.iter().enumerate() {
            let proc = process.lock().unwrap();
            if proc.is_source() {
                print!("Source ");
            }
            print!("Process {}", id);
            if proc.is_faulty() {
                println!(" is faulty");
            } else {
                println!(" decides on value {}", proc.decide());
            }
        }
        println!();
    }
}

pub fn benchmark_comprehensive(max_processes: usize) {
    let test_cases = generate_test_cases(max_processes);
    
    for (processes, rounds) in test_cases {
        if processes < 4 {
            continue; // Skip invalid cases
        }
        
        let source = processes / 3;
        let engine = ByzantineEngine::new(source, rounds, processes);
        
        let start_memory = get_memory_usage();
        let start = Instant::now();
        let (duration, messages) = engine.run();
        let _total_duration = start.elapsed();
        let end_memory = get_memory_usage();
        
        let memory_used = end_memory.saturating_sub(start_memory);
        
        println!("Rust,{},{},{},{:.2},{:.2}", 
                processes, rounds, messages, 
                duration.as_millis(), memory_used as f64 / 1024.0 / 1024.0);
    }
}

fn generate_test_cases(max_processes: usize) -> Vec<(usize, usize)> {
    let mut cases = Vec::new();
    
    for n in (4..=max_processes).step_by(3) {
        for m in 1..=3 {
            if 3 * m + 1 <= n {
                cases.push((n, m));
            }
        }
    }
    
    cases
}

fn get_memory_usage() -> usize {
    // Simplified memory usage - would need platform-specific code for accurate measurement
    std::process::id() as usize * 1024 // Placeholder
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_value_display() {
        assert_eq!(format!("{}", Value::Zero), "ZERO");
        assert_eq!(format!("{}", Value::One), "ONE");
        assert_eq!(format!("{}", Value::Retreat), "RETREAT");
    }

    #[test]
    fn test_process_creation() {
        let config = Configuration {
            source: 0,
            num_rounds: 2,
            num_processes: 4,
        };
        
        let process = Process::new(0, config.clone());
        assert!(process.is_source());
        assert!(process.is_faulty()); // Source is faulty in our test setup
        
        let process2 = Process::new(1, config);
        assert!(!process2.is_source());
        assert!(!process2.is_faulty());
    }

    #[test]
    fn test_engine_creation() {
        let engine = ByzantineEngine::new(0, 2, 4);
        assert_eq!(engine.config.source, 0);
        assert_eq!(engine.config.num_rounds, 2);
        assert_eq!(engine.config.num_processes, 4);
        assert_eq!(engine.processes.len(), 4);
    }

    #[test]
    fn test_minimum_byzantine_case() {
        let engine = ByzantineEngine::new(0, 1, 4);
        let (duration, messages) = engine.run();
        
        assert!(duration.as_nanos() > 0);
        assert!(messages > 0);
    }
}

Elixir Implementation

defmodule ByzantineGenerals do
  @moduledoc """
  Byzantine Generals Problem implementation in Elixir
  Leverages the Actor model for natural distributed computing
  """

  require Logger

  defmodule Configuration do
    @moduledoc "Configuration for Byzantine Generals algorithm"
    defstruct [:source, :num_rounds, :num_processes]

    @type t :: %__MODULE__{
      source: non_neg_integer(),
      num_rounds: non_neg_integer(),
      num_processes: non_neg_integer()
    }
  end

  defmodule Process do
    @moduledoc "Individual process (general) in the Byzantine Generals algorithm"
    use GenServer
    require Logger

    defstruct [:id, :config, :values, :is_faulty, :message_count, :processes]

    @type value :: :zero | :one | :retreat
    @type path :: String.t()

    # Client API

    def start_link(%{id: id, config: config}) do
      GenServer.start_link(__MODULE__, %{id: id, config: config}, name: :"process_#{id}")
    end

    def receive_message(pid, path, value) do
      GenServer.call(pid, {:receive_message, path, value}, 10_000)
    end

    def send_messages(pid, round, processes) do
      GenServer.call(pid, {:send_messages, round, processes}, 10_000)
    end

    def decide(pid) do
      GenServer.call(pid, :decide, 5_000)
    end

    def is_faulty?(pid) do
      GenServer.call(pid, :is_faulty, 1_000)
    end

    def is_source?(pid) do
      GenServer.call(pid, :is_source, 1_000)
    end

    def get_message_count(pid) do
      GenServer.call(pid, :get_message_count, 1_000)
    end

    def get_values(pid) do
      GenServer.call(pid, :get_values, 1_000)
    end

    # Server callbacks

    @impl true
    def init(%{id: id, config: config}) do
      is_faulty = id == config.source || id == 2
      
      state = %__MODULE__{
        id: id,
        config: config,
        values: %{},
        is_faulty: is_faulty,
        message_count: 0,
        processes: []
      }
      
      Logger.debug("Process #{id} initialized, faulty: #{is_faulty}")
      {:ok, state}
    end

    @impl true
    def handle_call({:receive_message, path, value}, _from, state) do
      Logger.debug("Process #{state.id} received message: path=#{path}, value=#{value}")
      
      new_values = Map.put(state.values, path, value)
      new_count = state.message_count + 1
      
      {:reply, :ok, %{state | values: new_values, message_count: new_count}}
    end

    @impl true
    def handle_call({:send_messages, round, processes}, _from, state) do
      new_state = %{state | processes: processes}
      
      cond do
        round == 0 && state.id == state.config.source ->
          send_initial_messages(new_state)
        round > 0 ->
          relay_messages(new_state, round)
        true ->
          {:reply, :ok, new_state}
      end
    end

    @impl true
    def handle_call(:decide, _from, state) do
      decision = if state.id == state.config.source do
        # Source process uses its own value
        if state.is_faulty, do: :one, else: :zero
      else
        majority_vote(state)
      end
      
      {:reply, decision, state}
    end

    @impl true
    def handle_call(:is_faulty, _from, state) do
      {:reply, state.is_faulty, state}
    end

    @impl true
    def handle_call(:is_source, _from, state) do
      {:reply, state.id == state.config.source, state}
    end

    @impl true
    def handle_call(:get_message_count, _from, state) do
      {:reply, state.message_count, state}
    end

    @impl true
    def handle_call(:get_values, _from, state) do
      {:reply, state.values, state}
    end

    # Private functions

    defp send_initial_messages(state) do
      base_value = :zero
      
      Enum.each(state.processes, fn {id, pid} ->
        if id != state.id do
          value = if state.is_faulty do
            # Faulty commander sends different values
            if rem(id, 2) == 0, do: :zero, else: :one
          else
            base_value
          end
          
          receive_message(pid, Integer.to_string(state.id), value)
          Logger.debug("Commander #{state.id} sent #{value} to process #{id}")
        end
      end)
      
      {:reply, :ok, state}
    end

    defp relay_messages(state, round) do
      paths = get_paths_for_round(state, round - 1)
      
      Enum.each(paths, fn path ->
        case Map.get(state.values, path) do
          nil -> 
            :ok
          value ->
            new_value = transform_value(state, value)
            new_path = path <> Integer.to_string(state.id)
            
            Enum.each(state.processes, fn {id, pid} ->
              if id != state.id && !String.contains?(new_path, Integer.to_string(id)) do
                receive_message(pid, new_path, new_value)
                Logger.debug("Process #{state.id} relayed #{new_value} to #{id}, path: #{new_path}")
              end
            end)
        end
      end)
      
      {:reply, :ok, state}
    end

    defp transform_value(state, value) do
      if state.is_faulty && state.id == 2 do
        :one
      else
        value
      end
    end

    defp get_paths_for_round(state, round) do
      if round == 0 do
        [Integer.to_string(state.config.source)]
      else
        state.values
        |> Map.keys()
        |> Enum.filter(&(String.length(&1) == round + 1))
      end
    end

    defp majority_vote(state) do
      counts = Enum.reduce(state.values, %{zero: 0, one: 0, retreat: 0}, fn {_path, value}, acc ->
        Map.update!(acc, value, &(&1 + 1))
      end)
      
      Logger.debug("Process #{state.id} vote counts: #{inspect(counts)}")
      
      total_votes = Map.values(counts) |> Enum.sum()
      
      if total_votes == 0 do
        :retreat
      else
        majority_threshold = div(total_votes, 2)
        
        case Enum.find(counts, fn {_value, count} -> count > majority_threshold end) do
          {value, _count} -> value
          nil -> :retreat
        end
      end
    end
  end

  defmodule Engine do
    @moduledoc "Engine that orchestrates the Byzantine Generals algorithm"
    
    require Logger

    def run(source, num_rounds, num_processes, opts \\ []) do
      config = %Configuration{
        source: source,
        num_rounds: num_rounds,
        num_processes: num_processes
      }

      verbose = Keyword.get(opts, :verbose, true)
      
      if verbose do
        Logger.info("Starting Byzantine Generals: #{num_processes} processes, #{num_rounds} rounds, source: #{source}")
      end

      # Start processes
      processes = start_processes(config)
      
      start_time = :os.system_time(:millisecond)
      
      # Run algorithm rounds
      run_rounds(processes, num_rounds)
      
      end_time = :os.system_time(:millisecond)
      duration = end_time - start_time

      # Collect results
      {results, total_messages} = collect_results(processes, config, verbose)
      
      # Clean up
      cleanup_processes(processes)

      {duration, total_messages, results}
    end

    defp start_processes(config) do
      for id <- 0..(config.num_processes - 1) do
        {:ok, pid} = Process.start_link(%{id: id, config: config})
        {id, pid}
      end
    end

    defp run_rounds(processes, num_rounds, timeout \\ 30_000) do
      for round <- 0..(num_rounds - 1) do
        Logger.debug("Starting round #{round}")
        
        tasks = Enum.map(processes, fn {_id, pid} ->
          Task.async(fn -> 
            Process.send_messages(pid, round, processes)
          end)
        end)
        
        try do
          Task.await_many(tasks, timeout)
          # Small delay to ensure message ordering
          :timer.sleep(10)
        catch
          :exit, {:timeout, _} -> 
            Logger.error("Round #{round} timed out")
            throw(:timeout)
        end
      end
      :ok
    end

    defp collect_results(processes, _config, verbose) do
      total_messages = Enum.sum(Enum.map(processes, fn {_id, pid} ->
        Process.get_message_count(pid)
      end))

      results = Enum.map(processes, fn {id, pid} ->
        is_source = Process.is_source?(pid)
        is_faulty = Process.is_faulty?(pid)
        decision = if is_faulty, do: nil, else: Process.decide(pid)
        
        result = %{
          id: id,
          is_source: is_source,
          is_faulty: is_faulty,
          decision: decision
        }
        
        if verbose do
          print_process_result(result)
        end
        
        result
      end)

      {results, total_messages}
    end

    defp print_process_result(%{id: id, is_source: is_source, is_faulty: is_faulty, decision: decision}) do
      prefix = if is_source, do: "Source ", else: ""
      
      if is_faulty do
        IO.puts("#{prefix}Process #{id} is faulty")
      else
        IO.puts("#{prefix}Process #{id} decides on value #{decision}")
      end
    end

    defp cleanup_processes(processes) do
      Enum.each(processes, fn {_id, pid} -> 
        GenServer.stop(pid, :normal, 1000)
      end)
    end

    def benchmark(max_processes, opts \\ []) do
      verbose = Keyword.get(opts, :verbose, true)
      
      if verbose do
        IO.puts("Elixir Byzantine Generals Benchmark")
        IO.puts("===================================")
        IO.puts("Language,Processes,Rounds,Messages,Time(ms)")
      end
      
      test_cases = generate_test_cases(max_processes)
      
      results = Enum.map(test_cases, fn {processes, rounds} ->
        source = div(processes, 3)
        {time, messages, _results} = run(source, rounds, processes, verbose: false)
        
        result = %{
          language: "Elixir",
          processes: processes,
          rounds: rounds,
          messages: messages,
          time_ms: time
        }
        
        if verbose do
          IO.puts("Elixir,#{processes},#{rounds},#{messages},#{time}")
        end
        
        result
      end)
      
      results
    end

    defp generate_test_cases(max_processes) do
      for n <- 4..max_processes, rem(n - 1, 3) == 0 do
        for m <- 1..3, 3 * m + 1 <= n do
          {n, m}
        end
      end
      |> List.flatten()
    end
  end

  # Main module functions

  def run(source, num_rounds, num_processes, opts \\ []) do
    Engine.run(source, num_rounds, num_processes, opts)
  end

  def benchmark(max_processes \\ 20, opts \\ []) do
    Engine.benchmark(max_processes, opts)
  end

  def quick_test do
    IO.puts("Running quick test with 4 processes, 1 round...")
    {time, messages, results} = run(0, 1, 4)
    
    IO.puts("\nTest Results:")
    IO.puts("Time: #{time}ms")
    IO.puts("Messages: #{messages}")
    IO.puts("Processes reached consensus: #{length(results)}")
    IO.puts("? Test completed successfully")
    
    :ok
  end
end

defmodule ByzantineGenerals.Application do
  @moduledoc false
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Add supervised processes here if needed
    ]

    opts = [strategy: :one_for_one, name: ByzantineGenerals.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Testing Erlang Implementation

-module(byzantine_generals).
-export([run/3, benchmark/1, quick_test/0, start/0, stop/0]).
-include_lib("kernel/include/logger.hrl").

-record(config, {source, num_rounds, num_processes}).
-record(process_state, {id, config, values, is_faulty, message_count, processes}).

%% Public API

%% Start the application
start() ->
    application:start(byzantine_generals).

%% Stop the application  
stop() ->
    application:stop(byzantine_generals).

%% Run the Byzantine Generals algorithm
run(Source, NumRounds, NumProcesses) ->
    ?LOG_INFO("Starting Byzantine Generals: ~p processes, ~p rounds, source: ~p", 
              [NumProcesses, NumRounds, Source]),
    
    Config = #config{source = Source, num_rounds = NumRounds, num_processes = NumProcesses},
    
    % Validate configuration
    case validate_config(Config) of
        ok -> 
            run_algorithm(Config);
        {error, Reason} ->
            ?LOG_ERROR("Invalid configuration: ~p", [Reason]),
            {error, Reason}
    end.

%% Run benchmark with different configurations
benchmark(MaxProcesses) ->
    ?LOG_INFO("Running Erlang Byzantine Generals Benchmark up to ~p processes", [MaxProcesses]),
    
    io:format("Erlang Byzantine Generals Benchmark~n"),
    io:format("===================================~n"),
    io:format("Language,Processes,Rounds,Messages,Time(ms)~n"),
    
    TestCases = generate_test_cases(MaxProcesses),
    
    Results = lists:map(fun({Processes, Rounds}) ->
        Source = Processes div 3,
        case run(Source, Rounds, Processes) of
            {ok, Time, Messages, _ProcessResults} ->
                io:format("Erlang,~p,~p,~p,~p~n", [Processes, Rounds, Messages, Time]),
                #{language => erlang, processes => Processes, rounds => Rounds, 
                  messages => Messages, time_ms => Time};
            {error, _Reason} ->
                #{error => true, processes => Processes, rounds => Rounds}
        end
    end, TestCases),
    
    Results.

%% Quick test function
quick_test() ->
    io:format("Running quick test with 4 processes, 1 round...~n"),
    case run(0, 1, 4) of
        {ok, Time, Messages, _Results} ->
            io:format("~nTest Results:~n"),
            io:format("Time: ~pms~n", [Time]),
            io:format("Messages: ~p~n", [Messages]),
            io:format("? Test completed successfully~n"),
            ok;
        {error, Reason} ->
            io:format("? Test failed: ~p~n", [Reason]),
            error
    end.

%% Internal functions

validate_config(#config{source = Source, num_rounds = NumRounds, num_processes = NumProcesses}) ->
    if
        NumProcesses < 4 ->
            {error, "Need at least 4 processes for Byzantine Generals Problem"};
        Source >= NumProcesses ->
            {error, "Source must be less than number of processes"};
        NumRounds < 1 ->
            {error, "Need at least 1 round"};
        true ->
            ok
    end.

run_algorithm(Config) ->
    % Start message counter
    CounterPid = spawn_link(fun() -> counter_loop(0) end),
    register(message_counter, CounterPid),
    
    StartTime = erlang:system_time(millisecond),
    
    try
        % Start processes
        ProcessPids = start_processes(Config),
        
        % Initialize processes with neighbor information
        initialize_processes(ProcessPids, Config),
        
        % Run algorithm rounds
        run_rounds(ProcessPids, Config),
        
        % Wait for completion
        timer:sleep(100),
        
        EndTime = erlang:system_time(millisecond),
        Duration = EndTime - StartTime,
        
        % Collect results
        {TotalMessages, ProcessResults} = collect_results(ProcessPids, Config),
        
        % Cleanup
        cleanup_processes(ProcessPids),
        unregister(message_counter),
        exit(CounterPid, normal),
        
        {ok, Duration, TotalMessages, ProcessResults}
        
    catch
        Class:Reason:Stacktrace ->
            ?LOG_ERROR("Algorithm failed: ~p:~p~n~p", [Class, Reason, Stacktrace]),
            % Cleanup on error
            catch unregister(message_counter),
            catch exit(CounterPid, kill),
            {error, {Class, Reason}}
    end.

start_processes(Config) ->
    NumProcesses = Config#config.num_processes,
    lists:map(fun(Id) -> 
        Pid = spawn_link(fun() -> process_loop(Id, Config) end),
        {Id, Pid}
    end, lists:seq(0, NumProcesses - 1)).

initialize_processes(ProcessPids, Config) ->
    lists:foreach(fun({_Id, Pid}) -> 
        Pid ! {init, ProcessPids, Config}
    end, ProcessPids).

run_rounds(ProcessPids, Config) ->
    NumRounds = Config#config.num_rounds,
    lists:foreach(fun(Round) ->
        ?LOG_DEBUG("Starting round ~p", [Round]),
        
        % Send messages for this round
        lists:foreach(fun({_Id, Pid}) ->
            Pid ! {send_messages, Round, self()}
        end, ProcessPids),
        
        % Wait for all processes to complete the round
        lists:foreach(fun({_Id, _Pid}) ->
            receive
                {round_complete, Round} -> ok
            after 5000 ->
                ?LOG_WARNING("Timeout waiting for round ~p completion", [Round])
            end
        end, ProcessPids),
        
        % Small delay between rounds
        timer:sleep(10)
    end, lists:seq(0, NumRounds - 1)).

collect_results(ProcessPids, Config) ->
    % Get total message count
    TotalMessages = get_message_count(),
    
    % Get process results
    ProcessResults = lists:map(fun({Id, Pid}) ->
        Pid ! {get_result, self()},
        receive
            {result, Id, Result} -> 
                print_process_result(Id, Result, Config#config.source),
                Result
        after 2000 ->
            ?LOG_WARNING("Timeout getting result from process ~p", [Id]),
            #{id => Id, error => timeout}
        end
    end, ProcessPids),
    
    {TotalMessages, ProcessResults}.

print_process_result(Id, Result, Source) ->
    Prefix = case Id of
        Source -> "Source ";
        _ -> ""
    end,
    
    case maps:get(is_faulty, Result, false) of
        true ->
            io:format("~sProcess ~p is faulty~n", [Prefix, Id]);
        false ->
            Decision = maps:get(decision, Result, retreat),
            io:format("~sProcess ~p decides on value ~p~n", [Prefix, Id, Decision])
    end.

cleanup_processes(ProcessPids) ->
    lists:foreach(fun({_Id, Pid}) -> 
        Pid ! stop,
        % Don't wait for exit - let them clean up
        ok
    end, ProcessPids).

generate_test_cases(MaxProcesses) ->
    lists:flatten([
        [{N, M} || M <- lists:seq(1, 3), 3 * M + 1 =< N]
        || N <- lists:seq(4, MaxProcesses, 3)
    ]).

%% Process implementation

process_loop(Id, Config) ->
    IsFaulty = (Id =:= Config#config.source) orelse (Id =:= 2),
    State = #process_state{
        id = Id, 
        config = Config, 
        values = #{}, 
        is_faulty = IsFaulty,
        message_count = 0,
        processes = []
    },
    ?LOG_DEBUG("Process ~p initialized, faulty: ~p", [Id, IsFaulty]),
    process_loop(State).

process_loop(State) ->
    receive
        {init, ProcessPids, Config} ->
            NewState = State#process_state{processes = ProcessPids, config = Config},
            process_loop(NewState);
            
        {receive_message, Path, Value} ->
            NewValues = maps:put(Path, Value, State#process_state.values),
            NewState = State#process_state{
                values = NewValues,
                message_count = State#process_state.message_count + 1
            },
            increment_message_count(),
            ?LOG_DEBUG("Process ~p received message: path=~s, value=~p", 
                      [State#process_state.id, Path, Value]),
            process_loop(NewState);
            
        {send_messages, Round, From} ->
            NewState = handle_send_messages(State, Round),
            From ! {round_complete, Round},
            process_loop(NewState);
            
        {get_result, From} ->
            Result = create_result(State),
            From ! {result, State#process_state.id, Result},
            process_loop(State);
            
        stop ->
            ?LOG_DEBUG("Process ~p stopping", [State#process_state.id]),
            ok;
            
        Other ->
            ?LOG_WARNING("Process ~p received unexpected message: ~p", 
                        [State#process_state.id, Other]),
            process_loop(State)
    end.

handle_send_messages(State, Round) ->
    Id = State#process_state.id,
    Config = State#process_state.config,
    
    if 
        Round =:= 0 andalso Id =:= Config#config.source ->
            send_initial_messages(State);
        Round > 0 ->
            relay_messages(State, Round);
        true ->
            State
    end.

send_initial_messages(State) ->
    BaseValue = zero,
    ProcessPids = State#process_state.processes,
    
    lists:foreach(fun({Id, Pid}) ->
        if Id =/= State#process_state.id ->
            Value = case State#process_state.is_faulty of
                true -> 
                    % Faulty commander sends different values
                    case Id rem 2 of
                        0 -> zero;
                        1 -> one
                    end;
                false -> 
                    BaseValue
            end,
            
            Pid ! {receive_message, integer_to_list(State#process_state.id), Value},
            ?LOG_DEBUG("Commander ~p sent ~p to process ~p", 
                      [State#process_state.id, Value, Id]);
        true -> 
            ok
        end
    end, ProcessPids),
    
    State.

relay_messages(State, Round) ->
    Paths = get_paths_for_round(State, Round - 1),
    ProcessPids = State#process_state.processes,
    
    lists:foreach(fun(Path) ->
        case maps:get(Path, State#process_state.values, undefined) of
            undefined -> 
                ok;
            Value ->
                NewValue = transform_value(State, Value),
                NewPath = Path ++ integer_to_list(State#process_state.id),
                
                lists:foreach(fun({Id, Pid}) ->
                    IdStr = integer_to_list(Id),
                    case Id =/= State#process_state.id of
                        true ->
                            case string:str(NewPath, IdStr) of
                                0 -> % IdStr not found in NewPath
                                    Pid ! {receive_message, NewPath, NewValue},
                                    ?LOG_DEBUG("Process ~p relayed ~p to ~p, path: ~s", 
                                              [State#process_state.id, NewValue, Id, NewPath]);
                                _ -> % IdStr found in NewPath, skip
                                    ok
                            end;
                        false -> % Same process, skip
                            ok
                    end
                end, ProcessPids)
        end
    end, Paths),
    
    State.

transform_value(State, Value) ->
    if State#process_state.is_faulty andalso State#process_state.id =:= 2 ->
        one;
    true ->
        Value
    end.

get_paths_for_round(State, Round) ->
    if Round =:= 0 ->
        [integer_to_list((State#process_state.config)#config.source)];
    true ->
        maps:fold(fun(Path, _Value, Acc) ->
            case length(Path) of
                Len when Len =:= Round + 1 -> [Path | Acc];
                _ -> Acc
            end
        end, [], State#process_state.values)
    end.

create_result(State) ->
    Decision = if State#process_state.id =:= (State#process_state.config)#config.source ->
        % Source process uses its own value
        case State#process_state.is_faulty of
            true -> one;
            false -> zero
        end;
    true ->
        majority_vote(State)
    end,
    
    #{
        id => State#process_state.id,
        is_source => State#process_state.id =:= (State#process_state.config)#config.source,
        is_faulty => State#process_state.is_faulty,
        decision => Decision,
        message_count => State#process_state.message_count
    }.

majority_vote(State) ->
    Values = maps:values(State#process_state.values),
    Counts = lists:foldl(fun(Value, Acc) ->
        maps:update_with(Value, fun(Count) -> Count + 1 end, 1, Acc)
    end, #{zero => 0, one => 0, retreat => 0}, Values),
    
    ?LOG_DEBUG("Process ~p vote counts: ~p", [State#process_state.id, Counts]),
    
    TotalVotes = maps:fold(fun(_Value, Count, Sum) -> Sum + Count end, 0, Counts),
    
    if TotalVotes =:= 0 ->
        retreat;
    true ->
        MajorityThreshold = TotalVotes div 2,
        case maps:fold(fun(Value, Count, Acc) ->
            if Count > MajorityThreshold -> Value;
            true -> Acc
            end
        end, retreat, Counts) of
            retreat -> retreat;
            Value -> Value
        end
    end.

%% Message counter implementation

counter_loop(Count) ->
    receive
        increment ->
            counter_loop(Count + 1);
        {get_count, From} ->
            From ! {count, Count},
            counter_loop(Count);
        reset ->
            counter_loop(0);
        stop ->
            ok;
        _ ->
            counter_loop(Count)
    end.

increment_message_count() ->
    case whereis(message_counter) of
        undefined -> ok;
        Pid -> Pid ! increment
    end.

get_message_count() ->
    case whereis(message_counter) of
        undefined -> 0;
        Pid ->
            Pid ! {get_count, self()},
            receive
                {count, Count} -> Count
            after 1000 -> 0
            end
    end.

Performance Analysis and Benchmarking

To properly benchmark these implementations, we need to consider several factors:

Metrics to Measure

  1. Execution Time: How long does the algorithm take?
  2. Message Count: How many messages are exchanged?
  3. Memory Usage: Peak memory consumption
  4. Scalability: How performance degrades with increasing generals
  5. CPU Utilization: How efficiently the languages use system resources

Modern Benchmarking Approach

// Example comprehensive benchmark
pub struct BenchmarkResults {
    pub language: String,
    pub num_processes: usize,
    pub num_rounds: usize,
    pub execution_time_ms: f64,
    pub messages_sent: usize,
    pub memory_peak_mb: f64,
    pub cpu_utilization: f64,
}

pub fn comprehensive_benchmark() {
    let test_cases = vec![
        (4, 1),   // Minimum viable case
        (7, 2),   // Small scale
        (10, 3),  // Medium scale
        (16, 5),  // Larger scale
    ];

    for (processes, rounds) in test_cases {
        // Rust benchmark
        let rust_result = benchmark_rust_detailed(processes, rounds);
        
        // Elixir benchmark (would call via Port)
        let elixir_result = benchmark_elixir_detailed(processes, rounds);
        
        // Erlang benchmark (would call via Port)
        let erlang_result = benchmark_erlang_detailed(processes, rounds);
        
        compare_results(vec![rust_result, elixir_result, erlang_result]);
    }
}

Real-World Implications

The performance characteristics matter significantly in different contexts:

Blockchain Applications

  • Latency-Critical: Rust’s performance advantage matters for high-frequency trading
  • Node Count: Elixir/Erlang’s superior scaling helps with large blockchain networks
  • Fault Tolerance: Actor model languages excel at handling network partitions

IoT and Edge Computing

  • Resource Constraints: Rust’s low memory footprint is crucial
  • Device Coordination: Byzantine fault tolerance becomes critical for autonomous systems

Financial Systems

  • Regulatory Requirements: Provable consensus algorithms are increasingly required
  • High Availability: Erlang’s fault tolerance model aligns with financial system needs

Future Directions

Looking ahead, several trends will likely shape how we think about Byzantine fault tolerance:

  • Quantum Computing: Post-quantum cryptography will change how we implement Byzantine fault-tolerant signatures and may require new consensus mechanisms.
  • Climate Considerations: Energy-efficient consensus mechanisms (like Proof of Stake) are becoming increasingly important as environmental concerns grow.
  • Regulatory Frameworks: Government regulations around cryptocurrencies and distributed systems may influence which Byzantine fault-tolerant algorithms are acceptable in different contexts.
  • Edge and IoT: As computing moves to the edge, Byzantine fault tolerance becomes crucial for coordinating potentially millions of small, unreliable devices.

Performance Analysis

To compare the implementations, I measured complete wall-clock execution time including language runtime startup and algorithm execution across different process counts (10 to 2000 processes) with 1 round each. Each configuration was tested 3 times to ensure consistency. These benchmarks focus on demonstrating algorithmic correctness and relative performance characteristics rather than highly optimized production implementations.

All source code is available at https://github.com/bhatti/byz-sample for those interested in running or improving these implementations.

Results Summary

Complete Execution Time (Wall-Clock) – Updated Results:

  1. Elixir: 535ms average (range: 455-762ms)
  2. Rust: 577ms average (range: 521-667ms)
  3. Erlang: 1460ms average (range: 1401-1629ms)

Detailed Performance Breakdown

ConfigurationElixir (ms)Rust (ms)Erlang (ms)Messages
10 processes47153314078
50 processes476545140647
100 processes528587142091
200 processes4825501425199
1000 processes5685911497998
2000 processes68766116101999

Key Findings

  • Elixir maintained consistent performance across different process counts, showing good scalability characteristics
  • Rust delivered predictable performance with minimal variance, demonstrating excellent memory safety guarantees
  • Erlang showed significantly higher execution times but maintained reliability across all test configurations
  • Message counts remained consistent across languages for equivalent configurations, confirming algorithmic correctness

The results show that as process count increases from 10 to 2000:

  • Elixir scales relatively well, with execution time increasing by ~45%
  • Rust shows similar scaling characteristics, with ~24% increase
  • Erlang maintains consistent performance overhead regardless of scale

Note: These benchmarks measure wall-clock time including runtime startup overhead. The performance differences may be influenced by implementation patterns (GenServer vs raw message passing) and language-specific optimizations rather than fundamental runtime capabilities.

Try It Yourself

The complete implementation is available at https://github.com/bhatti/byz-sample with:

# Clone and run benchmarks
git clone https://github.com/bhatti/byz-sample
cd byz-sample
make benchmark

Disclaimer: Above implementation of the Byzantine Generals Problem serves as a case study for evaluating distributed computing approaches across different programming paradigms rather than benchmarking specific implementations in these languages.

Conclusion

The Byzantine Generals Problem exemplifies how fundamental computer science research can unexpectedly become the foundation for revolutionary technology. What began as an abstract theoretical exercise in 1982 became the backbone of Bitcoin in 2008 and continues to be crucial for modern distributed systems. My 2007 exploration of this problem was motivated by curiosity about distributed computing and language performance. Today, understanding Byzantine fault tolerance is essential for anyone working with blockchain technology, distributed databases, or fault-tolerant systems.

Try the implementations yourself: https://github.com/bhatti/byz-sample


October 10, 2022

Implementing Distributed Locks (Mutex and Semaphore) with Databases

Filed under: Concurrency,Rust,Uncategorized — admin @ 10:55 pm

Overview

I recently needed a way to control access to shared resources in a distributed system for concurrency control. Though, you may use Consul or Zookeeper (or low-level Raft / Paxos implementations if you are brave) for managing distributed locks but I wanted to reuse existing database without adding another dependency to my tech stack. Most databases support transactions or conditional updates with varying degree of support for transactional guarantees, but they can’t be used for distributed locks if the business logic you need to protect resides outside the databases. I found a lock client library based on AWS Databases but it didn’t support semaphores. The library implementation was tightly coupled with concerns of lock management and database access and it wasn’t easy to extend it easily. For example, following diagram shows how cyclic dependencies in the code:

class diagram

Due to above deficiencies in existing solutions, I decided to implement my own implementation of distributed locks in Rust with following capabilities:

  • Allow creating lease based locks that can either protect a single shared resource with a Mutex lock or protect a finite set of shared resources with a Semaphore lock.
  • Allow renewing leases based on periodic intervals so that stale locks can be acquired by other users.
  • Allow releasing Mutex and semaphore locks explicitly after the user performs a critical action.
  • CRUD APIs to manage Mutex and Semaphore entities in the database.
  • Multi-tenancy support for different clients in case the database is shared by multiple users.
  • Fair locks to support first-come and first serve based access grant when acquiring same lock concurrently.
  • Scalable solution for supporting tens of thousands concurrent mutexes and semaphores.
  • Support multiple data stores such as relational databases such as MySQL, PostgreSQL, Sqlite and as well as NoSQL/Cache data stores such as AWS Dynamo DB and Redis.

High-level Design

I chose Rust to build the library for managing distributed locks due to strict performance and correctness requirements. Following diagram shows the high-level components in the new library:

LockManager Interface

The client interacts with the LockManager that defines following operations to acquire, release, renew lock leases and manage lifecycle of Mutexes and Semaphores:

#[async_trait]
pub trait LockManager {
    // Attempts to acquire a lock until it either acquires the lock, or a specified additional_time_to_wait_for_lock_ms is
    // reached. This method will poll database based on the refresh_period. If it does not see the lock in database, it
    // will immediately return the lock to the caller. If it does see the lock, it will note the lease expiration on the lock. If
    // the lock is deemed stale, (that is, there is no heartbeat on it for at least the length of its lease duration) then this
    // will acquire and return it. Otherwise, if it waits for as long as additional_time_to_wait_for_lock_ms without acquiring the
    // lock, then it will return LockError::NotGranted.
    //
    async fn acquire_lock(&self, opts: &AcquireLockOptions) -> LockResult<MutexLock>;

    // Releases the given lock if the current user still has it, returning true if the lock was
    // successfully released, and false if someone else already stole the lock. Deletes the
    // lock item if it is released and delete_lock_item_on_close is set.
    async fn release_lock(&self, opts: &ReleaseLockOptions) -> LockResult<bool>;

    // Sends a heartbeat to indicate that the given lock is still being worked on.
    // This method will also set the lease duration of the lock to the given value.
    // This will also either update or delete the data from the lock, as specified in the options
    async fn send_heartbeat(&self, opts: &SendHeartbeatOptions) -> LockResult<MutexLock>;

    // Creates mutex if doesn't exist
    async fn create_mutex(&self, mutex: &MutexLock) -> LockResult<usize>;

    // Deletes mutex lock if not locked
    async fn delete_mutex(&self,
                          other_key: &str,
                          other_version: &str,
                          other_semaphore_key: Option<String>) -> LockResult<usize>;

    // Finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the
    // given lock. If the client currently has the lock, it will return the lock, and operations such as release_lock will work.
    // However, if the client does not have the lock, then operations like releaseLock will not work (after calling get_lock, the
    // caller should check mutex.expired() to figure out if it currently has the lock.)
    async fn get_mutex(&self, mutex_key: &str) -> LockResult<MutexLock>;

    // Creates or updates semaphore with given max size
    async fn create_semaphore(&self, semaphore: &Semaphore) -> LockResult<usize>;

    // Returns semaphore for the key
    async fn get_semaphore(&self, semaphore_key: &str) -> LockResult<Semaphore>;

    // find locks by semaphore
    async fn get_semaphore_mutexes(&self,
                                   other_semaphore_key: &str,
    ) -> LockResult<Vec<MutexLock>>;

    // Deletes semaphore if all associated locks are not locked
    async fn delete_semaphore(&self,
                              other_key: &str,
                              other_version: &str,
    ) -> LockResult<usize>;
}

The LockManager interacts with LockStore to access mutexes and semaphores, which delegate to implementation of mutex and semaphore repositories for lock management. The library defines two implementation of LockStore: first, DefaultLockStore that supports mutexes and semaphores where mutexes are used to acquire a singular lock whereas semaphores are used to acquire a lock from a set of finite shared resources. The second, FairLockStore uses a Redis specific implementation of fair semaphores for managing lease based semaphores that support first-come and first-serve order. The LockManager supports waiting for the lock to be available if lock is not immediately available where it periodically checks for the availability of mutex or semaphore based lock. Due to this periodic polling, the fair semaphore algorithm won’t support FIFO order if a new client requests a lock while previous lock request is waiting for next polling interval.

Create Lock Manager

You can instantiate a Lock Manager with relational database store as follows:

let config = LocksConfig::new("test_tenant");
let mutex_repo = factory::build_mutex_repository(RepositoryProvider::Rdb, &config)
	.await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Rdb, &config)
	.await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry()).expect("failed to initialize lock manager");

Alternatively, you can choose AWS Dynamo DB as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Or Redis based data-store as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Note: The AWS Dynamo DB uses strongly consistent reads feature as by default it is eventually consistent reads.

Acquiring a Mutex Lock

You will need to build options for acquiring with key name and lease period in milliseconds and then acquire it:

let opts = AcquireLockOptionsBuilder::new("mylock")
	.with_lease_duration_secs(10).build();
let lock = lock_manager.acquire_lock(&opts)
	.expect("should acquire lock");

The acquire_lock operation will automatically create mutex lock if it doesn’t exist otherwise it will wait for the period of lease-time if the lock is not available. This will return a structure for mutex lock that includes:

{
  "mutex_key":"one",
  "tenant_id":"local-host-name",
  "version":"258d513e-bae4-4d91-8608-5d500be27593",
  "lease_duration_ms":15000,
  "locked":true,
  "expires_at":"2022-10-11T03:04:43.126542"
}

Renewing the lease of Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .with_lease_duration_secs(15)
                .build();

let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .build();
lock_manager.release_lock(&release_opts)
				.expect("should release lock");

Acquiring a Semaphore based Lock

The semaphores allow you to define a set of locks for a resource with a maximum size. The operation for acquiring semaphore is similar to acquiring regular lock except you specify semaphore size, e.g.:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
				.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key":"one_0000000000",
  "tenant_id":"local-host-name",
  "version":"5ad557df-dbe6-439d-8a31-dc367e32eab9",
  "lease_duration_ms":15000,
  "semaphore_key":"one",
  "locked":true,
  "expires_at":"2022-10-11T04:03:33.662484"
}

The semaphore lock will create mutexes internally that will be numbered from 0 to max-size (exclusive). You can get semaphore details using:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "4ff77432-ed84-48b5-9831-8e53f56c2620",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": false,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  {
    "mutex_key": "one_0000000000",
    "tenant_id": "local-host-name",
    "version": "ba5a62e5-80f1-474e-a895-c4a18d252cb9",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "749b4ded-e356-4ef5-a23b-73a4984130c8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Renewing the lease of Semaphore Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new(
  				"one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Acquiring a Fair Semaphore

The fair semaphores is only available for Redis due to internal implementation, and it requires enabling it via fair_semaphore configuration option, otherwise its usage is similar to above operations, e.g.:

let mut config = LocksConfig::new("test_tenant");
config.fair_semaphore = Some(fair_semaphore);

let fair_semaphore_repo = factory::build_fair_semaphore_repository(
  	RepositoryProvider::Redis, &config)
	.await.expect("failed to create fair semaphore");
let store = Box::new(FairLockStore::new(&config, fair_semaphore_repo));
let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry())
	.expect("failed to initialize lock manager");

Then acquire lock similar to the semaphore syntax as before:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
			.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "tenant_id": "local-host-name",
  "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

The fair semaphore lock does not use mutexes internally but for the API compatibility, it builds a mutex with a key based on combination of semaphore-key and version. You can then query semaphore state as follows:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "5779b01f-eaea-4043-8ae0-9f8b942c2727",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": true,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  [
  {
    "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "tenant_id": "local-host-name",
    "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
    "expires_at": "2022-10-11T04:41:43.845711",
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Note: The mutex_key will be slightly different for unlocked mutexes as mutex-key isn’t needed for internal implementation.

Renewing the lease of Fair Semaphore Lock

You can renew lease of fair semaphore similar to above semaphore syntax, e.g.:

let opts = SendHeartbeatOptionsBuilder::new(
  			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: Due to internal implementation of fair semaphore, the version won’t be changed upon lease renewal.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new(
    			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Command Line Interface

In addition to a Rust based interface, the distributed locks library also provides a command line interface for managing mutex and semaphore based locks, e.g.:

Mutexes and Semaphores based Distributed Locks with databases.

Usage: db-locks [OPTIONS] [PROVIDER] <COMMAND>

Commands:
  acquire

  heartbeat

  release

  get-mutex

  delete-mutex

  create-mutex

  create-semaphore

  get-semaphore

  delete-semaphore

  get-semaphore-mutexes

  help
          Print this message or the help of the given subcommand(s)

Arguments:
  [PROVIDER]
          Database provider [default: rdb] [possible values: rdb, ddb, redis]

Options:
  -t, --tenant <TENANT>
          tentant-id for the database [default: local-host-name]
  -f, --fair-semaphore <FAIR_SEMAPHORE>
          fair semaphore lock [default: false] [possible values: true, false]
  -j, --json-output <JSON_OUTPUT>
          json output of result from action [default: false] [possible values: true, false]
  -c, --config <FILE>
          Sets a custom config file
  -h, --help
          Print help information
  -V, --version
          Print version information

For example, you can acquire fair semaphore lock as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis acquire --key one --semaphore-max-size 10

Which would return:

{
  "mutex_key": "one_69816448-7080-40f3-8416-ede1b0d90e80",
  "tenant_id": "local-host-name",
  "version": "69816448-7080-40f3-8416-ede1b0d90e80",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

You can run following command for renewing above lock:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis heartbeat --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

And then release it as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis release --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

Summary

I was able to meet the initial goals for implementing distributed locks and though this library is early in development. You can download and try it from https://github.com/bhatti/db-locks. Feel free to send your feedback or contribute to this library.

May 12, 2022

Applying Laws of Scalability to Technology and People

As businesses grow with larger customers size and hire more employees, they face challenges to meet the customer demands in terms of scaling their systems and maintaining rapid product development with bigger teams. The businesses aim to scale systems linearly with additional computing and human resources. However, systems architecture such as monolithic or ball of mud makes scaling systems linearly onerous. Similarly, teams become less efficient as they grow their size and become silos. A general solution to solve scaling business or technical problems is to use divide & conquer and partition it into multiple sub-problems. A number of factors affect scalability of software architecture and organizations such as the interactions among system components or communication between teams. For example, the coordination, communication and data/knowledge coherence among the system components and teams become disproportionately expensive with the growth in size. The software systems and business management have developed a number of laws and principles that can used to evaluate constraints and trade offs related to the scalability challenges. Following is a list of a few laws from the technology and business domain for scaling software architectures and business organizations:

Amdhal’s Law

Amdahl’s Law is named after Gene Amdahl that is used to predict speed up of a task execution time when it’s scaled to run on multiple processors. It simply states that the maximum speed up will be limited by the serial fraction of the task execution as it will create resource contention:

Speed up (P, N) = 1 / [ (1 - P) + P / N ]

Where P is the fraction of task that can run in parallel on N processors. When N becomes large, P / N approaches 0 so speed up is restricted to 1 / (1 – P) where the serial fraction (1 – P) becomes a source of contention due to data coherence, state synchronization, memory access, I/O or other shared resources.

Amdahl’s law can also be described in terms of throughput using:

N / [ 1 + a (N - 1) ]

Where a is the serial fraction between 0 and 1. In parallel computing, a class of problems known as embarrassingly parallel workload where the parallel tasks have a little or no dependency among tasks so their value for a will be 0 because they don’t require any inter-task communication overhead.

Amdah’s law can be used to scale teams as an organization grows where the teams can be organized as small and cross-functional groups to parallelize the feature work for different product lines or business domains, however the maximum speed up will still be limited by the serial fraction of the work. The serial work can be: build and deployment pipelines; reviewing and merging changes; communication and coordination between teams; and dependencies for deliverables from other teams. Fred Brooks described in his book The Mythical Man-Month how adding people to a highly divisible task can reduce overall task duration but other tasks are not so easily divisible: while it takes one woman nine months to make one baby, “nine women can’t make a baby in one month”.

The theoretical speedup of the latency of the execution of a program according to Amdahl’s law (credit wikipedia).

Brooks’s Law

Brooks’s law was coined by Fred Brooks that states that adding manpower to a late software project makes it later due to ramp up time. As the size of team increases, the ramp up time for new employees also increases due to quadratic communication overhead among team members, e.g.

Number of communication channels = N x (N - 1) / 2

The organizations can build small teams such as two-pizza/single-threaded teams where communication channels within each team does not explode and the cross-functional nature of the teams require less communication and dependencies from other teams. The Brook’s law can be equally applied to technology when designing distributed services or components so that each service is designed as a loosely coupled module around a business domain to minimize communication with other services and services only communicate using a well designed interfaces.

Universal Scalability Law

The Universal Scalability Law is used for capacity planning and was derived from Amdahl’s law by Dr. Neil Gunther. It describes relative capacity in terms of concurrency, contention and coherency:

C(N) = N / [1 + a(N – 1) + B.N (N – 1) ]

Where C(N) is the relative capacity, a is the serial fraction between 0 and 1 due to resource contention and B is delay for data coherency or consistency. As data coherency (B) is quadratic in N so it becomes more expensive as size of N increases, e.g. using a consensus algorithm such as Paxos is impractical to reach state consistency among large set of servers because it requires additional communication between all servers. Instead, large scale distributed storage services generally use sharding/partitioning and gossip protocol with a leader-based consensus algorithm to minimize peer to peer communication.

The Universal Scalability Law can be applied to scale teams similar to Amdahl’s law where a is modeled for serial work or dependency between teams and B is modeled for communication and consistent understanding among the team members. The cost of B can be minimized by building cross-functional small teams so that teams can make progress independently. You can also apply this model for any decision making progress by keeping the size of stake holders or decision makers small so that they can easily reach the agreement without grinding to halt.

The gossip protocols also applies to people and it can be used along with a writing culture, lunch & learn and osmotic communication to spread knowledge and learnings from one team to other teams.

Little’s Law

Little’s Law was developed by John Little to predict number of items in a queue for stable stable and non-preemptive. It is part of queueing theory and is described mathematically as:

L = A W

Where L is the average number of items within the system or queue, A is the average arrival time of items and W is the average time an item spends in the system. The Little’s law and queuing theory can be used for capacity planning for computing servers and minimizing waiting time in the queue (L).

The Little’s law can be applied for predicting task completion rate in an agile process where L represents work-in-progress (WIP) for a sprint; A represents arrival and departure rate or throughput/capacity of tasks; W represents lead-time or an average amount of time in the system.

WIP = Throughput x Lead-Time

Lead-Time = WIP / Throughput

You can use this relationship to reduce the work in progress or lead time and improve throughput of tasks completion. Little’s law observes that you can accomplish more by keeping work-in-progress or inventory small. You will be able to better respond to unpredictable delays if you keep a buffer in your capacity and avoid 100% utilization.

King’s formula

The King’s formula expands Little’s law by adding utilization and variability for predicting wait time before serving of requests:

{\displaystyle \mathbb {E} (W_{q})\approx \left({\frac {\rho }{1-\rho }}\right)\left({\frac {c_{a}^{2}+c_{s}^{2}}{2}}\right)\tau }
(credit wikipedia)

where T is the mean service time, m (1/T) is the service rate, A is the mean arrival rate, p = A/m is the utilization, ca is the coefficient of variation for arrivals and cs is the coefficient of variation for service times. The King’s formula shows that the queue sizes increases to infinity as you reach 100% utilization and you will have longer queues with greater variability of work. These insights can be applied to both technical and business processes so that you can build systems with a greater predictability of processing time, smaller wait time E(W) and higher throughput ?.

Note: See Erlang analysis for serving requests in a system without a queue where new requests are blocked or rejected if there is not sufficient capacity in the system.

Gustafson’s Law

Gustafson’s law improves Amdahl’s law with a keen observation that parallel computing enables solving larger problems by computations on very large data sets in a fixed amount of time. It is defined as:

S = s + p x N

S = (1 – s) x N

S = N + (1 – N) x s

where S is the theoretical speed up with parallelism, N is the number of processors, s is the serial fraction and p is the parallel part such that s + p = 1.

Gustafson’s law shows that limitations imposed by the sequential fraction of a program may be countered by increasing the total amount of computation. This allows solving bigger technical and business problems with a greater computing and human resources.

Conway’s Law

Conway’s law states that an organization that designs a system will produce a design whose structure is a copy of the organization’s communication structure. It means that the architecture of a system is derived from the team structures of an organization, however you can also use the architecture to derive the team structures. This allows defining building teams along the architecture boundaries so that each team is a small, cross functional and cohesive. A study by the Harvard Business School found that the often large co-located teams tended to produce more tightly-coupled and monolithic codebases whereas small distributed teams produce more modular codebases. These lessons can be applied to scaling teams and architecture so that teams and system modules are built around organizational boundaries and independent concerns to promote autonomy and reduce tight coupling.

Pareto Principle

The Pareto principle states that for many outcomes, roughly 80% of consequences come from 20% of causes. This principle shows up in numerous technical and business problems such as 20% of code has the 80% of errors; customers use 20% of functionality 80% of the time; 80% of optimization improvements comes from 20% of the effort, etc. It can also be used to identify hotspots or critical paths when scaling, as some microservices or teams may receive disproportionate demands. Though, scaling computing resources is relatively easy but scaling a team beyond an organization boundary is hard. You will have to apply other management tools such as prioritization, planning, metrics, automation and better communication to manage critical work.

Metcalfe’s Law

The Metcalfe’s law states that if there are N users of a telecommunications network, the value of the network is N2. It’s also referred as Network effects and applies to social networking sites.

Number of possible pair connections = N * (N – 1) / 2

Reed’s Law expanded this law and observed that the utility of large networks can scale exponentially with the size of the network.

Number of possible subgroups of a network = 2N – N – 1

This law explains the popularity of social networking services via viral communication. These laws can be applied to model information flow between teams or message exchange between services to avoid peer to peer communication with extremely large group of people or a set of nodes. A common alternative is to use a gossip protocol or designate a partition leader for each group that communicates with other leaders and then disseminate information to the group internally.

Dunbar Number

The Dunbar’s number is a suggested cognitive limit to the number of people with whom one can maintain stable social relationships. It has a commonly used value of 150 and can be used to limit direct communication connections within an organization.

Wirth’s Law and Parkinson’s Law

The Wirth’s Law is named after Niklaus Wirth who observed that the software is getting slower more rapidly than hardware is becoming faster. Over the last few decades, processors have become exponentially faster as a Moor’s Law but often that gain allows software developers to develop more complex software that consumes all gains of the speed. Another factor is that it allows software developers to use languages and tools that may not generate more efficient code so the code becomes bloated. There is a similar law in software development called Parkinson’s law that work expands to fill the time available for it. Though, you also have to watch for Hofstadter’s Law that states that “it always takes longer than you expect, even when you take into account Hofstadter’s Law”; and Brook’s Law, which states that “adding manpower to a late software project makes it later.”

The Wirth’s Law, named after Niklaus Wirth, posits that software tends to become slower at a rate that outpaces the speed at which hardware becomes faster. This observation reflects a trend where, despite significant advancements in processor speeds as predicted by Moor’s Law , software complexity increases correspondingly. Developers often leverage these hardware improvements to create more intricate and feature-rich software, which can negate the hardware gains. Additionally, the use of programming languages and tools that do not prioritize efficiency can lead to bloated code.

In the realm of software development, there are similar principles, such as Parkinson’s law, which suggests that work expands to fill the time allotted for its completion. This implies that given more time, software projects may become more complex or extended than initially necessary. Moreover, Hofstadter’s Law offers a cautionary perspective, stating, “It always takes longer than you expect, even when you take into account Hofstadter’s Law.” This highlights the often-unexpected delays in software development timelines. Brook’s Law further adds to these insights with the adage, “Adding manpower to a late software project makes it later.” These laws collectively emphasize that the demand upon a resource tends to expand to match the supply of the resource but adding resources later also poses challenges due to complexity in software development and project management.

Principle of Priority Inversion

In modern operating systems, the concept of priority inversion arises when a high-priority process needs resources or data from a low-priority process, but the low-priority process never gets a chance to execute due to its lower priority. This creates a deadlock or inefficiency where the high-priority process is blocked indefinitely. To avoid this, schedulers in modern operating systems adjust the priority of the lower-priority process to ensure it can complete its task and release the necessary resources, allowing the high-priority process to continue.

This same principle applies to organizational dynamics when scaling teams and projects. Imagine a high-priority initiative that requires collaboration from another team whose priorities do not align. Without proper coordination, the team working on the high-priority initiative may never get the support they need, leading to delays or blockages. Just as in operating systems, where a priority adjustment is needed to keep processes running smoothly, organizations must also ensure alignment across teams by managing a global list of priorities. A solution is to maintain a global prioritized list of projects that is visible to all teams. This ensures that the most critical initiatives are recognized and appropriately supported by every team, regardless of their individual workloads. This centralized prioritization ensures that teams working on essential projects can quickly receive the help or resources they need, avoiding bottlenecks or deadlock-like situations where progress stalls because of misaligned priorities.

Load Balancing (Round Robin, Least Connection)

Load balancing algorithms distribute tasks across multiple servers to optimize resource utilization and prevent any single server from becoming overwhelmed. Common strategies include round-robin (distributing tasks evenly across servers) and least connection (directing new tasks to the server with the fewest active connections).

Load balancing can be applied to distribute work among teams or individuals. For instance, round-robin can ensure that tasks are equally assigned to team members, while the least-connection principle can help assign tasks to those with the lightest workload, ensuring no one is overloaded. This leads to more efficient task management, better resource allocation, and balanced work distribution.

MapReduce

MapReduce splits a large task into smaller sub-tasks (map step) that can be processed in parallel, then aggregates the results (reduce step) to provide the final output. In a large project, teams or individuals can be assigned sub-tasks that they can work on independently. Once all the sub-tasks are complete, the results can be aggregated to deliver the final outcome. This fosters parallelism, reduces bottlenecks, and allows for scalable team collaboration, especially for large or complex projects.

Deadlock Prevention (Banker’s Algorithm)

The Banker’s Algorithm is used to prevent deadlocks by allocating resources in such a way that there is always a safe sequence of executing processes, avoiding circular wait conditions. In managing interdependent teams or tasks, it’s important to avoid deadlocks where teams wait on each other indefinitely. By proactively ensuring that resources (e.g., knowledge, tools, approvals) are available before committing teams to work, project managers can prevent deadlock scenarios. Prioritizing resource allocation and anticipating dependencies can ensure steady progress without one team stalling another.

Consensus Algorithms (Paxos, Raft)

Consensus algorithms ensure that distributed systems agree on a single data value or decision, despite potential failures. Paxos and Raft are used to maintain consistency across distributed nodes. In projects involving multiple stakeholders or teams, reaching consensus on decisions can be challenging, especially with different priorities and viewpoints. Consensus-building techniques, inspired by these algorithms, could involve ensuring that key stakeholders agree before any significant action is taken, much like how Paxos ensures agreement across distributed systems. This avoids misalignment and fosters collaboration and trust across teams.

Rate Limiting

Rate limiting controls the number of requests or operations that can be performed in a given timeframe to prevent overloading a system. This concept applies to managing expectations, particularly in teams with multiple incoming requests. Rate limiting can be applied to protect teams from being overwhelmed by too many requests at once. By limiting how many tasks or requests a team can handle at a time, project managers can ensure a sustainable work pace and prevent burnout, much like how rate limiting helps protect system stability.

Summary

Above laws offer strategies for optimizing both technical systems and team dynamics. Amdahl’s Law and the Universal Scalability Law highlight the challenges of parallelizing work, emphasizing the need to manage coordination and communication overhead as bottlenecks when scaling teams or systems. Brook’s and Metcalfe’s Laws reveal the exponential growth of communication paths, suggesting that effective team scaling requires managing these paths to avoid coordination paralysis. Little’s Law and Kingman’s Formula suggest limiting work in progress and preventing 100% resource utilization to ensure reliable throughput, while Conway’s Law underscores the alignment between team structures and system architecture. Teams and their responsibilities should mirror modular architectures, fostering autonomy and reducing cross-team dependencies.

The Pareto Principle can guide teams to make small but impactful changes in architecture or processes that yield significant productivity improvements. Wirth’s Law and Parkinson’s Law serve as reminders to prevent work bloat and unnecessary complexity by setting clear deadlines and objectives. Dunbar’s Number highlights the human cognitive limit in maintaining external relationships, suggesting that team dependencies should be kept minimal to maintain effective collaboration. The consensus algorithms used in distributed systems can be applied to decision-making and collaboration, ensuring alignment among teams. Error correction algorithms are useful for feedback loops, helping teams iteratively improve. Similarly, techniques like load balancing strategies can optimize task distribution and workload management across teams.

Before applying these laws, it is essential to have clear goals, metrics, and KPIs to measure baselines and improvements. Prematurely implementing these scalability strategies can exacerbate issues rather than resolve them. The focus should be on global optimization of the entire organization or system, rather than focusing on local optimizations that don’t align with broader goals.

August 15, 2021

Structured Concurrency with Swift

Filed under: Concurrency,Uncategorized — Tags: , , , — admin @ 6:19 pm

I wrote about support of structured concurrency in Javascript/Typescript, Erlang/Elixir, Go, Rust, Kotlin and Swift last year (Part-I, Part-II, Part-III, Part-IV) but Swift language was still in development for async/await and actors support. The Swift 5.5 will finally have these new concurrency features available, which are described below:

Async/Await

As described in Part-IV, Swift APIs previously used completion handlers for asynchronous methods that suffered from:

  • Poor error handling because you could not use a single way to handle errors/exceptions instead separate callbacks for errors were needed
  • Difficult to cancel asynchronous operation or exit early after a timeout.
  • Requires a global reasoning of shared state in order to prevent race conditions.
  • Stack traces from the asynchronous thread don’t include the originating request so the code becomes hard to debug.
  • As Swift/Objective-C runtime uses native threads, creating a lot of background tasks results in expensive thread resources and may cause excessive context switching.
  • Nested use of completion handlers turn the code into a callback hell.

Following example shows poor use of control flow and deficient error handling when using completion handlers:

func fetchThumbnails(for ids: [String],
    completion handler: @escaping ([String: UIImage]?, Error?) -> Void) {
    guard let id = ids.first else { return handler([:], nil) }
    let request = thumbnailURLRequest(for: id)
    URLSession.shared.dataTask(with: request) { data, response, error in
        guard let response = response,
              let data = data else { return handler(nil, error) } // Poor error handling
        UIImage(data: data)?.prepareThumbnail(of: thumbSize) { image in
            guard let image = image else { return handler(nil, ThumbnailError()) }
        }
        fetchThumbnails(for: Arrays(ids.dropFirst()) { thumbnail, error in
            // cannot use loop
            ...
        }
    }
}

Though, use of Promise libraries help a bit but it still suffers from dichotomy of control flow and error handling. Here is equivalent code using async/await:

func fetchThumbnails(for ids: [String]) async throws -> [String: UIImage] {
	let thumbnails: [String: UIImage] = [:]
    for id in ids {
    	let request = thumbnailURLRequest(for: id)
        let (data, response) = try await URLSession.shared.dataTask(for: request)
        try validateResponse(response)
        guard let image = await UIImage(data: data)?.byPreparingThumbnail(ofSize: thumbSize) else { throw ThumbnailError()) }
        thumbnails[id] = image
    }
    return thumbnails
}

As you can see, above code not only improves control flow and adds uniform error handling but it also enhances readability by removing the nested structure of completion handlers.

Tasks Hierarchy, Priority and Cancellation

When a new task is created using async/await, it inherits the priority and local values of the parent task, which are then passed to the entire hierarchy of child tasks from the parent task. When a parent task is cancelled, the Swift runtime automatically cancels all child tasks, however Swift uses cooperative cancellation so child tasks must check for cancellation state otherwise they may continue to execute, however the results from cancelled tasks are discarded.

Continuations and Scheduling

Swift previously used native threads to schedule background tasks, where new threads were automatically created when a thread is blocked or waiting for another resource. The new Swift runtime creates native threads based on the number of cores and background tasks use continuations to schedule the background task on the native threads. When a task is blocked, its state is saved on the heap and another task is scheduled for processing on the thread. The await syntax suspends current thread and releases control until the child task is completed. This cooperative scheduling requires runtime support for non-blocking I/O operations and system APIs so that native threads are not blocked and continue to work on other background tasks. This also limits background tasks from using semaphores and locks, which are discussed below.

async function

In above example, when a thread is working on a background task “updateDatabase” that starts a child tasks “add” or “save”, it saves the tasks as continuations on heap. However, if current task is suspended then the thread can work on other tasks as shown below:

Multiple Asynchronous Tasks

The async/await in Swift also allows scheduling multiple asynchronous tasks and then awaiting for them later, e.g.

struct MarketData {
    let symbol: String
    let price: Int
    let volume: Int
}

struct HistoryData {
    let symbol: String
    let history: [Int]
    func sum() -> Int {
      history.reduce(0, +)
    }
}

func fetchMarketData(symbol: String) async throws -> MarketData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(MarketData(symbol: symbol, price: 10, volume: 200)))
        }
    }
}

func fetchHistoryData(symbol: String) async throws -> HistoryData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(HistoryData(symbol: symbol, history: [5, 10, 15, 20])))
        }
    }
}

func getMovingAverage(symbol: String) async throws -> Int {
    async let marketData = fetchMarketData(symbol: symbol)
    async let historyData = fetchHistoryData(symbol: symbol)
    let sum = try await marketData.price + historyData.sum()
    return try await sum / (historyData.history.count+1)
}

The async let syntax is called concurrent binding where the child task executes in parallel to the parent task.

Task Groups

The task groups allow dispatching multiple background tasks that are executed concurrently in background and Swift automatically cancels all child tasks when a parent task is cancelled. Following example demonstrates use of group API:

func downloadImage(id: String) async throws -> UIImage {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(UIImage(data: [])))
        }
    }
}

func downloadImages(ids: [String]) async throws -> [String: UIImage] {
    var images: [String: UIImage] = [:]
    try await withThrowingTaskGroup(of: (String, UIImage).self) { group in
        for id in ids {
            group.addTask(priority: .background) {
                return (id, try await downloadImage(id: id))
            }
        }
        for try await (id, image) in group {
            images[id] = image
        }
    }
    return images
}

As these features are still in development, Swift has recently changed group.async API to group.addTask. In above example, images are downloaded in parallel and then for try await loop gathers results.

Data Races

Swift compiler will warn you if you try to mutate a shared state from multiple background tasks. In above example, the asynchronous task returns a tuple of image-id and image instead of mutating shared dictionary. The parent task then mutates the dictionary using the results from the child task in for try await loop.

Cancellation

You can also cancel a background task using cancel API or cancel all child tasks of a group using group.cancelAll(), e.g.

group.cancelAll()

The Swift runtime automatically cancels all child tasks if any of the background task fails. You can store reference to a child task in an instance variable if you need to cancel a task in a different method, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]

    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
              ...
            }
    }
    func collectionView(_ collectionView: UICollectionView,
        didEndDisplaying cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
        imageTasks[item]?.cancel()
    }
}

As cancellation in Swift is cooperative, you must check cancellation state explicitly otherwise task will continue to execute but Swift will reject the results, e.g.

if Task.isCancelled {
    return // return early
}

Timeout

The task or async/await APIs don’t directly support timeout so you must implement it manually similar to cooperative cancellation.

Semaphores and Locks

Swift does not recommend using Semaphores and Locks with background tasks because they are suspended when waiting for an external resource and can be later resumed on a different thread. Following example shows incorrect use of semaphores with background tasks:

func updateDatabase(_ asyncUpdateDatabase: @Sendable @escaping () async -> Void {
  let semaphore = DispatchSemaphore(value: 0)
  Task {
    await asyncUpdateDatabase()
    semaphore.signal()
  }
  semaphore.wait() // Do not use unsafe primitives to wait across task boundaries
}

TaskLocal

You can annotate certain properties with TaskLocal, which are stored in the context of Task and is available to the task and all of its children, e.g.

enum TracingExample {
    @TaskLocal
    static let traceID: TraceID?
}
...
guard let traceID = TracingExample.traceID else {
  print("no trace id")
  return
}
print(traceID)

Detached Tasks (Unstructured)

Above tasks and async/await APIs are based on structured concurrency where parent task is not completed until all child background tasks are done with their work. However, Swift allows launching detached tasks that can continue to execute in background without waiting for the results, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]
    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
                defer { imageTasks[item] = nil }
                let images = try await getImages(for: ids)
                Task.detached(priority: .background) {
                    await withThrowingTaskGroup(of: Void.self) { g in
                        g.addTask { try await addImageCache(for: images) }
                        g.addTask { try await logImages(for: images) }
                    }
                }
                display(images, in: cell)
            }
    }
}

Legacy APIs

The legacy code that use completion-handlers can use following continuation APIs to support async/await syntax:

func persistPosts() async throws -> [Post] {
    typealias PostContinuation = CheckedContinuation<[Post], Error>
    return try await withCheckedThrowingContinuation { (continuation: PostContinuation) in
        self.getPersistentPosts { posts, error in
            if let error = error {
                continuation.resume(throwing: error)
            } else {
                continuation.resume(returning: posts)
            }
        }
    }
}

In above example, the getPersistentPosts method used completion-handler and persistPosts method provides a bridge so that you can use async/await syntax. The resume method can only called once for the continuation. 

You may also save continuation in an instance variable when you need to resume in another method, e.g.

class MyViewController: UIViewController {
    private var activeContinuation: CheckedContinuation<[Post], Error>?
    func sharePostsFromPeer() async throws -> [Post] {
        try await withCheckedThrowingContinuation { continuation in
            self.activeContinuation = continuation
            self.peerManager.syncSharedPosts()
        }
    }
}
extension MyViewController: PeerSyncDelegate {
    func peerManager(_ manager: PeerManager, received posts: [Post]) {
        self.activeContinuation?.resume(returning: posts)
        self.activeContinuation = nil
    }
    func peerManager(_ manager: PeerManager, hadError error: Error) {
        self.activeContinuation?.resume(throwing: error)
        self.activeContinuation = nil
    }
}

Implementing WebCrawler Using Async/Await

Following example shows implementation of WebCrawler using async/await described in Part I of the concurrency series:

import Foundation
struct Request {
    let url: String
    let depth: Int
    let deadline: DispatchTime
}
enum CrawlError: Error {
    case timeoutError(String)
}
let MAX_DEPTH = 4
let MAX_URLS = 11
let DOMAINS = [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "gh.com",
  "hi.com",
  "ij.com",
  "jk.com",
  "kl.com",
  "lm.com",
  "mn.com",
  "no.com",
  "op.com",
  "pq.com",
  "qr.com",
  "rs.com",
  "st.com",
  "tu.com",
  "uv.com",
  "vw.com",
  "wx.com",
  "xy.com",
  "yz.com",
];
public func crawl(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawl(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}
public func crawlWithActors(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawlWithActors(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}

///////////////// PRIVATE METHODS ////////////////
func doCrawl(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    try await withThrowingTaskGroup(of: (Request, Int).self) { group in
        for req in requests {
	    group.addTask(priority: .background) {
	        return (req, try await handleRequest(req))
	    }
        }
        for try await (req, childURLs) in group {
	    if totalChildURLs % 10 == 0 {
		print("received request \(req)")
	    }
	    totalChildURLs += childURLs
        }
    }
    return totalChildURLs
}
func doCrawlWithActors(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    let crawler = CrawlActor()
    for req in requests {
     	let childURLs = try await crawler.handle(req)
	totalChildURLs += childURLs
    }
    return totalChildURLs
}
func handleRequest(_ request: Request) async throws -> Int {
    let contents = try await download(request.url)
    let newContents = try await jsrender(request.url, contents)
    if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
        try await index(request.url, newContents)
        let urls = try await parseURLs(request.url, newContents)
        let childURLs = try await doCrawl(urls: urls, depth: request.depth + 1, deadline: request.deadline)
        return childURLs + 1
    } else {
        return 0
    }
}
func download(_ url: String) async throws -> String {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    return randomString(100)
}
func jsrender(_ url: String, _ contents: String) async throws -> String {
    // for SPA apps that use javascript for rendering contents
    return contents
}
func index(_ url: String, _ contents: String) async throws {
    // apply standardize, stem, ngram, etc for indexing
}
func parseURLs(_ url: String, _ contents: String) async throws -> [String] {
    // tokenize contents and extract href/image/script urls
    var urls = [String]()
    for _ in 0..<MAX_URLS {
        urls.append(randomUrl())
    }
    return urls
}
func hasContentsChanged(_ url: String, _ contents: String) -> Bool {
    return true
}
func isSpam(_ url: String, _ contents: String) -> Bool {
    return false
}
func randomUrl() -> String {
    let number = Int.random(in: 0..<WebCrawler.DOMAINS.count)
    return "https://" + WebCrawler.DOMAINS[number] + "/" + randomString(20)
}
func randomString(_ length: Int) -> String {
  let letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
  return String((0..<length).map{ _ in letters.randomElement()! })
}

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using try await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following are major features of the structured concurrency in Swift:

  • Concurrency scope?—?The async/await defines scope of concurrency where all child background tasks must be completed before returning from the asynchronous function.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling?—?Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • Swift runtime schedules asynchronous tasks on a fixed number of native threads and automatically suspends tasks when they wait for I/O or other resources.

Following are the major shortcomings in Swift for its support of structured concurrency:

  • The most glaring omission in above implementation is timeout, which is not supported in Swift’s implementation.
  • Swift runtime manages scheduling of tasks and you cannot pass your own execution dispatcher for scheduling background tasks.

Actors

Actor Model is a classic abstraction from 1970s for managing concurrency where an actor keeps its internal state private and uses message passing for interaction with its state and behavior. An actor can only work on one message at a time, thus it prevents any data races when accessing from multiple background tasks. I have previously written about actors and described them Part II of the concurrency series when covering Erlang and Elixir.

actor

Instead of creating a background task using serial queue such as:

final class Counter {
    private var queue = DispatchQueue(label: "counter.queue")
    private var _count : Int = 0
    var count: Int {
        queue.sync {
            _count
        }
    }

    func incr() {
        queue.async(flags: .barrier) {
            self._count += 1
        }
    }
    func decr() {
        queue.async(flags: .barrier) {
            self._count -= 1
        }
    }
}

The actor syntax simplifies such implementation and removes all boilerplate e.g.

actor Counter {
    var count: Int = 0
    func incr() {
        count += 1
    }
    func decr() {
        count -= 1
    }
}

Above syntax protects direct access to the internal state and you must use await syntax to access the state or behavior, e.g.

Task {
	let c = Counter()
    await withTaskGroup(of: Void.self) { group in
        for i in 0..<100 {
            group.async {
                await c.incr()
            }
        }
    }
    print("count \(await c.count)")
}

Priority Inversion Principle

The dispatch queue API applies priority inversion principle when a high priority task is behind low priority tasks, which bumps up the priority of low priority tasks ahead in the queue. The runtime environment then executes the high priority task after completing those low priority tasks. The actor API instead can choose high priority task directly from the actor’s queue without waiting for completion of the low priority tasks ahead in the queue.

Actor Reentrancy

If an actor invokes another actor or background task in its function, it may get suspended until the background task is completed. In the meantime, another client may invoke the actor and modify its state so you need to check assumptions when changing internal state. A continuation used for the background task may be scheduled on a different thread after resuming the work, you cannot rely on DispatchSemaphore, NSLock, NSRecursiveLock, etc. for synchronizations.

Following code from WWDC-2021 shows how reentrancy can be handled safely:

actor ImageDownloader {
    private enum CacheEntry {
        case inProgress(Task.Handle<Image, Error>)
        case ready(Image)
    }
    private var cache: [URL: CacheEntry] = [:]
    func downloadAndCache(from url: URL) async throws -> Image? {
        if let cached = cache[url] {
            switch cached {
            case .ready(let image):
                return image
            case .inProgress(let handle):
                return try await handle.get()
            }
        }
        let handle = async {
            try await downloadImage(from: url)
        }
        cache[url] = .inProgress(handle)
        do {
            let image = try await handle.get()
            cache[url] = .ready(image)
            return image
        } catch {
            cache[url] = nil
            throw error
        }
    }
}

The ImageDownloader actor in above example downloads and caches the image and while it’s downloading an image. The actor will be suspended while it’s downloding the image but another client can reenter the downloadAndCache method and download the same image. Above code prevents duplicate requests and reuses existing request to serve multiple concurrent clients.

Actor Isolation

The actors in Swift prevent invoking methods directly but you can annotate methods with nonisolated if you need to call them directly but those methods cannot mutate state, e.g.

actor Account {
    let id: String
    var balance: Double = 0
    init(id: String) {
        self.id = id
    }
}
extension Account: Hashable {
    nonisolated func hash(into hasher: inout Hasher) {
        hasher.combine(id)
   }
   static func == (lhs: Account, rhs: Account) -> Bool {
        return lhs.id == rhs.id
   }
}

Sendable

The actors requires that any data structure used in its internal state are thread safe and implement Sendable protocol such as:

  • Value types
  • Actors
  • Immutable classes
  • Synchronized classes
  • @Sendable Functions
struct Book: Sendable {
    var title: String
    var authors: [Author]
}

@MainActor

The UI apps require that all UI updates are performed on the main thread and previously you had to dispatch UI work to DispatchQueue.main queue. Swift now allows marking functions, classes or structs with a special annotations of @MainActor where the functions are automatically executed on the main thread, e.g.

@MainActor func checkedOut(_ books: [Book]) {
  booksView.checkedOutBooks = books
}
...
await checkedOut(booksOnLoan)

Following example shows how a view-controller can be annotated with the @MainActor annotations:

@MainActor class MyViewController: UIViewController {
  func onPress() .... // implicitly on main-thread
  nonisolated func fetch() async {
    ...

In above example, all methods for MyViewController are executed on the main thread, however you can exclude certain methods via nonisolated keyword.

@globalActor

The @globalActor annotation defines a singleton global actor and @MainActor is a kind of global actor. You can also define your own global actor such as:

@globalActor
public struct GlobalSettings {
  public actor SettingsActor {
     func rememberPassword() -> Bool {
        return UserDefaults.standard.bool(forKey: "rememberPassword")
     }
  }

  public static let shared = SettingsActor()
}

...
let rememberPassword = await GlobalSettings.shared.rememberPassword()

Message Pattern Matching

As actors in Swift use methods to invoke operations on actor, they don’t support pattern matching similar to Erlang/Elixir, which offer selecting next message to process by comparing one or more fields in the message.

Local only

Unlike actors in Erlang or Elixir, actors in Swift can only communicate with other actors in the same process or application and they don’t support distributed communication to remote actors.

Actor Executor/Dispatcher

The actor protocol defines following property to access the executor:

var unownedExecutor: UnownedSerialExecutor

However, unownedExecutor is a read-only property that cannot be changed at this time.

Implementing WebCrawler Using Actors and Tasks

Following example shows implementation of WebCrawler using actors and tasks described in Part I of the concurrency series:

import Foundation
actor CrawlActor {
    public func handle(_ request: Request) async throws -> Int {
	let contents = try await download(request.url)
	let newContents = try await jsrender(request.url, contents)
  	if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
    	    try await index(request.url, newContents)
    	    let urls = try await parseURLs(request.url, newContents)
    	    let childURLs = try await doCrawlWithActors(urls: urls, depth: request.depth + 1, deadline: request.deadline)
    	    return childURLs + 1
  	} else {
    	    return 0
  	}
    }
}

Above implementation uses actors for processing crawling requests but it shares other code for parsing and downloading web pages. As an actor provides a serialize access to its state and behavior, you can’t use a single actor to implement a highly concurrent web crawler. Instead, you may divide the web domain that needs to be crawled into a pool of actors that can share the work.

Performance Comparison

Following table from Part-IV summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

You can download full code for Swift example from https://github.com/bhatti/concurency-katas/tree/main/swift.

Overall, Swift’s new features for structured concurrency including async/await and actors is a welcome addition to its platform. On the downside, Swift concurrency APIs lack support for timeouts, customized dispatcher/executors and micro benchmarks showed higher overhead than expected. However, on the positive side, the Swift runtime catches errors due to data races and the new async/await/actors syntax prevents bugs that were previously caused by incorrect use of completion handlers and error handling. This will help developers write more robust and responsive apps in the future.

December 26, 2020

Applying Structured Concurrency patterns in GO with case-study of Gitlab-runner

Filed under: Concurrency,GO — admin @ 9:31 pm

I recently wrote a series of blogs on structured concurrency (Part-I, Part-II, Part-III, Part-IV) and how it improves readability, concurrency-scope, composition and flow-control of concurrent code and adds better support for error-handling, cancellation, and timeout. I have been using GO on a number of projects over last few years and I will share a few concurrency patterns that I have used or seen in other projects such as gitlab-runner. I demonstrated in above series how structured concurrency considers GO statement harmful similar to GOTO in structured programming. Just as structured programming replaced GOTO with control-flow primitives such as single-entry/exit, if-then, loop, and functions calls; structured concurrency provides scope of concurrency where parent waits for all asynchronous code. I will show how common concurrency patterns in GO can take advantage of structured concurrency.

Asynchronous Tasks

The primary purpose of goroutines is to perform asynchronous tasks where you might be requesting a data from remote API or database, e.g. here is a sample code from gitlab-runner that uses goroutines to copy archive artifacts:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
	waitCh := make(chan error)
	go func() {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
		waitCh <- err
	}()

	select {
	case <-ctx.Done():
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
		return <-waitCh

	case err := <-waitCh:
		return err
	}
}

Here is a async/await based syntax based on async_await.go that performs similar task using structured concurrency:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
    ctx := context.Background()
    timeout := ..
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
        return nil, err
    }
    abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
    }
    async.Execute(ctx, handler, abort).Await(ctx, timeout)
}

Above code defines scope of concurrency and adds support for timeout while making the code easier to comprehend.

Note: due to lack of generics in GO, interface{} are used to accept different types that may be passed to asynchronous tasks.

Racing Asynchronous Tasks

A common use of goroutines is spawning multiple asynchronous tasks and takes result of first task that completes e.g. here is a sample code from gitlab-runner that uses goroutines to copy stdout and stderr:

	stdoutErrCh := make(chan error)
	go func() {
		_, errCopy := stdcopy.StdCopy(output, output, hijacked.Reader)
		stdoutErrCh <- errCopy
	}()

	// Write the input to the container and close its STDIN to get it to finish
	stdinErrCh := make(chan error)
	go func() {
		_, errCopy := io.Copy(hijacked.Conn, input)
		_ = hijacked.CloseWrite()
		if errCopy != nil {
			stdinErrCh <- errCopy
		}
	}()

	// Wait until either:
	// - the job is aborted/cancelled/deadline exceeded
	// - stdin has an error
	// - stdout returns an error or nil, indicating the stream has ended and
	//   the container has exited
	select {
	case <-ctx.Done():
		err = errors.New("aborted")
	case err = <-stdinErrCh:
	case err = <-stdoutErrCh:
	}

Above code creates stdoutErrCh channel to capture errors from stdout and stdinErrCh channel to capture errors from stderr and then waits for either to finish.

Here is equivalent code that uses structured concurrency with async/await primitives from async_racer.go:

    ctx := context.Background()
    timeout := ..
    pollin
    handler1 := func(ctx context.Context) (interface{}, error) {
        return nil, stdcopy.StdCopy(output, output, hijacked.Reader)
    }
    handler2 := func(ctx context.Context) (interface{}, error) {
		defer hijacked.CloseWrite()
        return nil, io.Copy(hijacked.Conn, input)
    }
    future, _ := async. ExecuteRacer(ctx, handler1, handler2)
     _, err := future.Await(ctx, timeout)

Above code uses async/await syntax to define scope of concurrency and clarifies intent of the business logic without distraction of concurrency logic.

Performing cleanup when task is aborted or cancelled

If a goroutine spawns an external process for background work, you may need to kill that process in case goroutine task is cancelled or times out. For example, here is a sample code from gitlab-runner that calls KillAndWait function to terminate external process when context.Done() is invoked:

   func (c *command) Run() error {
       err := c.cmd.Start()
       if err != nil {
           return fmt.Errorf("failed to start command: %w", err)
       }

       go c.waitForCommand()

       select {
       case err = <-c.waitCh:
           return err

       case <-c.context.Done():
           return newProcessKillWaiter(c.logger, c.gracefulKillTimeout, c.forceKillTimeout).
               KillAndWait(c.cmd, c.waitCh)
       }
   }

In above, Run method starts a command in goroutine, waits for completion in another goroutine and then listens to response from waitCh and context.Done channels.

Here is how async/await structure from async_await.go can apply structured concurrency to above code:

   func (c *command) Run() error {
       timeout := ...
       ctx := context.Background()
       handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, c.cmd.Run()
       }
       abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, KillAndWait(c.cmd, c.waitCh)
       }
       Execute(ctx, handler, abort, nil).Await(ctx, timeout)
   }

Using GO Channels as data pipe/queue

GO channels are designed based on CSP rendezvous primitives where both sender and receiver have to wait to exchange messages. However, you can add buffering to make these channels as bounded queue (for back-pressure). Here is an example code from gitlab-runner that uses channels to stream log messages:

 func (l *kubernetesLogProcessor) scan(ctx context.Context, logs io.Reader) (*bufio.Scanner, <-chan string) {
     logsScanner := bufio.NewScanner(logs)

     linesCh := make(chan string)
     go func() {
         defer close(linesCh)

         // This goroutine will exit when the calling method closes the logs stream or the context is cancelled
         for logsScanner.Scan() {
             select {
             case <-ctx.Done():
                 return
             case linesCh <- logsScanner.Text():
             }
         }
     }()

     return logsScanner, linesCh
 }

Above code creates a channel linesCh without any buffer and then creates a goroutine where logs are read and sent to linesCh channel. As I mentioned, above code will block sender until the receiver is ready to receive these log messages and you may lose log messages if receiver is slow and goroutine is killed before logs can be read. Though, structured concurrency cannot help in this case but we can simply use an event-bus or a local message-queue to stream these logs.

WaitGroup to wait for completion of goroutines

GO language supports sync.WaitGroup to wait for completion of goroutines but it’s redundant if you are also using channels to receive for reply. For example, here is a sample code from gitlab-runner that uses WaitGroup to wait for completion of groutines:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
         wg := sync.WaitGroup{}
         for _, service := range e.services {
             wg.Add(1)
             go func(service *types.Container) {
                 _ = e.waitForServiceContainer(service, time.Duration(waitForServicesTimeout)*time.Second)
                 wg.Done()
             }(service)
         }
         wg.Wait()
     }
 }

Here is how above code can be replaced by async/await syntax from my async_await.go:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
        ctx := context.Background()
        timeout := time.Duration(waitForServicesTimeout)*time.Second
        futures := make([]async.Awaiter, len(e.services))
        handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
          return e.waitForServiceContainer(payload.(string)), nil
        }
        for i:=0; i<len(e.services); i++ {
          futures[i] = async.Execute(ctx, handler, async.NoAbort, e.services[i])
	    }
        async.AwaitAll(ctx, timeout, futures...)
     }
 }

Above code is not shorter than the original but it is more readable and eliminates subtle bugs where you might be using WaitGroup incorrectly, thus resulting in deadlock.

Fork-Join based Asynchronous Tasks

One common use of goroutines is to spawn multiple asynchronous tasks and wait for their completion similar to fork-join pattern, e.g. here is a sample code from gitlab-runner that uses goroutines to perform cleanup of multiple services and sends back result via a channel.

func (s *executor) cleanupServices() {
	ch := make(chan serviceDeleteResponse)
	var wg sync.WaitGroup
	wg.Add(len(s.services))

	for _, service := range s.services {
		go s.deleteKubernetesService(service.ObjectMeta.Name, ch, &wg)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for res := range ch {
		if res.err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.err))
		}
	}
}

func (s *executor) deleteKubernetesService(serviceName string, ch chan<- serviceDeleteResponse, wg *sync.WaitGroup) {
	defer wg.Done()

	err := s.kubeClient.CoreV1().
		Services(s.configurationOverwrites.namespace).
		Delete(serviceName, &metav1.DeleteOptions{})
	ch <- serviceDeleteResponse{serviceName: serviceName, err: err}
}

The cleanupServices method above goes through a collection of services and then calls deleteKubernetesService in goroutine, which calls kubernetes API to remove given service. It waits for all background using WaitGroup and then receives any errors from error channel and logs them.

Here is how you can use async/await code from async_await.go that applies structured concurrency by abstracting low-level goroutines and channels:

func (s *executor) cleanupServices() {
    ctx := context.Background()
    timeout := time.Duration(5 * time.Second)
    futures := make([]async.Awaiter, len(s.services))
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         serviceName := payload.(string)
         err := s.kubeClient.CoreV1().
      		Services(s.configurationOverwrites.namespace).
			Delete(serviceName, &metav1.DeleteOptions{})
         return nil, err
    }
    for i:=0; i<len(s.services); i++ {
        futures[i] = async.Execute(ctx, handler, async.NoAbort, s.services[i])
	}
    results := async.AwaitAll(ctx, timeout, futures...)
	for res := range results {
		if res.Err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.Err))
		}
	}
}

Above code uses structured concurrency by defining scope of asynchronous code in cleanupServices and adds better support for cancellation, timeout and error handling. Also, you don’t need to use WaitGroup to wait for completion anymore.

Polling Asynchronous Task

In some cases, you may need to poll a background task to check its status or wait for its completion, e.g. here is a sample code from gitlab-runner that waits for pod until it’s running:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
	pollInterval := config.GetPollInterval()
	pollAttempts := config.GetPollAttempts()
	for i := 0; i <= pollAttempts; i++ {
		select {
		case r := <-triggerPodPhaseCheck(c, pod, out):
			if !r.done {
				time.Sleep(time.Duration(pollInterval) * time.Second)
				continue
			}
			return r.phase, r.err
		case <-ctx.Done():
			return api.PodUnknown, ctx.Err()
		}
	}
	return api.PodUnknown, errors.New("timed out waiting for pod to start")
}

func triggerPodPhaseCheck(c *kubernetes.Clientset, pod *api.Pod, out io.Writer) <-chan podPhaseResponse {
	errc := make(chan podPhaseResponse)
	go func() {
		defer close(errc)
		errc <- getPodPhase(c, pod, out)
	}()
	return errc
}

The waitForPodRunning method above repeatedly calls triggerPodPhaseCheck, which creates a goroutine and then invokes kubernetes API to get pod status. It then returns pod status in a channel that waitForPodRunning listens to.

Here is equivalent code using async/await from async_polling.go:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
    timeout := config.getPodRunningTimeout()
    pollInterval := config.GetPollInterval()
    handler := func(ctx context.Context, payload interface{}) (bool, interface{}, error) {
         r := getPodPhase(c, pod, out)
         return r.done, r.phase, r.err
    }
    _, phase, err := async.ExecutePolling(ctx, handler, NoAbort, 0, pollInterval).
  						Await(ctx, timeout)
	return r.phase, r.err
}

Above code removes complexity due to manually polling and managing goroutines/channels.

Background Task with watchdog

Another concurrency pattern in GO involves starting a background task but then launch another background process to monitor the task or its runtime environment so that it can terminate background task if watchdog finds any errors. For example, here is a sample code from gitlab-runner that executes a command inside kubernetes pod and then it launches another goroutine to monitor pod status, i.e.,

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...
...

	podStatusCh := s.watchPodStatus(ctx)

	select {
	case err := <-s.runInContainer(containerName, containerCommand):
		s.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
		var terminatedError *commandTerminatedError
		if err != nil && errors.As(err, &terminatedError) {
			return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
		}

		return err
	case err := <-podStatusCh:
		return &common.BuildError{Inner: err}
	case <-ctx.Done():
		return fmt.Errorf("build aborted")
	}
}

func (s *executor) watchPodStatus(ctx context.Context) <-chan error {
	// Buffer of 1 in case the context is cancelled while the timer tick case is being executed
	// and the consumer is no longer reading from the channel while we try to write to it
	ch := make(chan error, 1)

	go func() {
		defer close(ch)

		t := time.NewTicker(time.Duration(s.Config.Kubernetes.GetPollInterval()) * time.Second)
		defer t.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-t.C:
				err := s.checkPodStatus()
				if err != nil {
					ch <- err
					return
				}
			}
		}
	}()

	return ch
}

func (s *executor) runInContainer(name string, command []string) <-chan error {
	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)

		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		err := retryable.Run()
		if err != nil {
			errCh <- err
		}

		exitStatus := <-s.remoteProcessTerminated
		if *exitStatus.CommandExitCode == 0 {
			errCh <- nil
			return
		}

		errCh <- &commandTerminatedError{exitCode: *exitStatus.CommandExitCode}
	}()

	return errCh
}

In above code, runWithAttach calls watchPodStatus to monitor status of the pod in background goroutine and then executes command in runInContainer.

Here is equivalent code using async/await from async_watchdog.go:

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...

...
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
      return nil, s.runInContainer(containerName, containerCommand)
    }
    watchdogHandler := func(ctx context.Context, payload interface{}) error {
         return nil, s.checkPodStatus()
    }

    res, err := async.ExecuteWatchdog(ctx, handler, watchdogHandler, async.NoAbort, nil, poll).
  				Await(ctx, timeout)
	if err != nil {
		return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
	}
}

func (s *executor) runInContainer(name string, command []string) error {
		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		return retryable.Run()
}

Above code removes extraneous complexity due to concurrent code embedded with functional code and makes it easier to comprehend with improved support of concurrency scope, error handling, timeout and cancellation.

Other accidental complexity in Gitlab-Runner

Besides concurrency, here are a few other design choices that adds accidental complexity in gitlab-runner:

Abstraction

The primary goal of gitlab-runner is to abstract executor framework so that it can use different platforms such as Docker, Kubernetes, SSH, Shell, etc to execute processes. However, it doesn’t define an interface for common behavior such as managing runtime containers or executing a command for these executors.

Adapter/Gateway pattern

A common pattern to abstract third party library or APIs is to use an adapter or gateway pattern but gitlab-runner mixes external APIs with internal executor logic. The kubernetes executor in gitlab-runner defines logic for both interacting with external Kubernetes server and managing Kubernetes Pod or executing processes inside those pods. For example, my initial intent for looking at the gitlab-runner was to adopt APIs for interacting with Kubernetes but I could not reuse any code as a library and instead I had to copy relevant code for my use-case.

Separation of Concerns

The gitlab-runner is not only riddled with concurrency related primitives such as goroutines and channels but it also mixes other aspects such as configuration, feature-flags, logging, monitoring, etc. For example, it uses configurations for defining containers, services, volumes for Kubernetes but it hard codes various internal configurations for build, helpers, monitoring containers instead of injecting them via external configuration. Similarly, it hard codes volumes for repo, cache, logging, etc. A common design pattern in building software is to use layers of abstractions or separation of concerns where each layer or a module is responsible for a single concern (single-responsibility principle). For example, you can divide executors into three layers: adapter-layer, middle-layer and high-level layer where adapter layer strictly interacts with underlying platform such as Kubernetes and no other layer needs to know internal types or APIs of Kubernetes. The middle layer uses adapter layer to delegate any executor specific behavior and defines methods to configure containers. The high-level layer is responsible for injecting dependent services, containers and configurations to execute jobs within the gitlab environment. I understand some of these abstractions might become leaky if there are significant differences among executors but such design allows broader reuse of lower-level layers in other contexts.

Summary

Though, modern scalable and performant applications demand concurrency but sprinkling low-level concurrency patterns all over your code adds significant accidental or extraneous complexity to your code. In this blog, I demonstrated how encapsulating concurrent code with library of structured concurrency patterns can simplify your code. Concurrency is hard especially with lower-level primitives such as WaitGroup, Mutex, goroutines, channels in GO. In addition, incorrect use of these concurrency primitives can lead to deadlocks or race conditions. Using a small library for abstracting common concurrency patterns can reduce probability of concurrency related bugs. Finally, due to lack of immutability, strong ownership, generics or scope of concurrency in GO, you still have to manage race conditions and analyze your code carefully for deadlocks but applying structured concurrency can help manage concurrent code better.

Powered by WordPress