Shahzad Bhatti Welcome to my ramblings and rants!

November 7, 2025

Three Decades of Remote Calls: My Journey from COBOL Mainframes to AI Agents

Filed under: Computing,Web Services — admin @ 9:50 pm

Introduction

I started writing network code in the early 1990s on IBM mainframes, armed with nothing but Assembly and COBOL. Today, I build distributed AI agents using gRPC, RAG pipelines, and serverless functions. Between these worlds lie decades of technological evolution and an uncomfortable realization: we keep relearning the same lessons. Over the years, I’ve seen simple ideas triumph over complex ones. The technology keeps changing, but the problems stay the same. Network latency hasn’t gotten faster relative to CPU speed. Distributed systems are still hard. Complexity still kills projects. And every new generation has to learn that abstractions leak. I’ll show you the technologies I’ve used, the mistakes I’ve made, and most importantly, what the past teaches us about building better systems in the future.

The Mainframe Era

CICS and 3270 Terminals

I started my career on IBM mainframes running CICS, which was used to build online applications accessed through 3270 “green screen” terminals. It used LU6.2 (Logical Unit 6.2) protocol, part of IBM’s Systems Network Architecture (SNA) to provide peer-to-peer communication. Here’s what a typical CICS application looked like in COBOL:

IDENTIFICATION DIVISION.
PROGRAM-ID. CUSTOMER-INQUIRY.

DATA DIVISION.
WORKING-STORAGE SECTION.
01  CUSTOMER-REC.
    05  CUST-ID        PIC 9(8).
    05  CUST-NAME      PIC X(30).
    05  CUST-BALANCE   PIC 9(7)V99.

LINKAGE SECTION.
01  DFHCOMMAREA.
    05  COMM-CUST-ID   PIC 9(8).

PROCEDURE DIVISION.
    EXEC CICS
        RECEIVE MAP('CUSTMAP')
        MAPSET('CUSTSET')
        INTO(CUSTOMER-REC)
    END-EXEC.
    
    EXEC CICS
        READ FILE('CUSTFILE')
        INTO(CUSTOMER-REC)
        RIDFLD(COMM-CUST-ID)
    END-EXEC.
    
    EXEC CICS
        SEND MAP('RESULTMAP')
        MAPSET('CUSTSET')
        FROM(CUSTOMER-REC)
    END-EXEC.
    
    EXEC CICS RETURN END-EXEC.

The CICS environment handled all the complexity—transaction management, terminal I/O, file access, and inter-system communication. For the user interface, I used Basic Mapping Support (BMS), which was notoriously finicky. You had to define screen layouts in a rigid format specifying exactly where each field appeared on the 24×80 character grid:

CUSTMAP  DFHMSD TYPE=&SYSPARM,                                    X
               MODE=INOUT,                                        X
               LANG=COBOL,                                        X
               CTRL=FREEKB
         DFHMDI SIZE=(24,80)
CUSTID   DFHMDF POS=(05,20),                                      X
               LENGTH=08,                                         X
               ATTRB=(UNPROT,NUM),                                X
               INITIAL='________'
CUSTNAME DFHMDF POS=(07,20),                                      X
               LENGTH=30,                                         X
               ATTRB=PROT

This was so painful that I wrote my own tool to convert simple text-based UI templates into BMS format. Looking back, this was my first foray into creating developer tools. Key lesson I learned from the mainframe era was that developer experience mattered. Cumbersome tools slow down development and introduce errors.

Moving to UNIX

Berkeley Sockets

After working on mainframes for a couple of years, I saw the mainframes were already in decline and I then transitioned to C and UNIX systems, which I studied previously in my college. I learned about Berkeley Sockets, which was a lot more powerful and you had complete control over the network. Here’s a simple TCP server in C using Berkeley Sockets:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8080
#define BUFFER_SIZE 1024

int main() {
    int server_fd, client_fd;
    struct sockaddr_in server_addr, client_addr;
    socklen_t client_len = sizeof(client_addr);
    char buffer[BUFFER_SIZE];
    
    // Create socket
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd < 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    // Set socket options to reuse address
    int opt = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, 
                   &opt, sizeof(opt)) < 0) {
        perror("setsockopt failed");
        exit(EXIT_FAILURE);
    }
    
    // Bind to address
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = INADDR_ANY;
    server_addr.sin_port = htons(PORT);
    
    if (bind(server_fd, (struct sockaddr *)&server_addr, 
             sizeof(server_addr)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    
    // Listen for connections
    if (listen(server_fd, 10) < 0) {
        perror("listen failed");
        exit(EXIT_FAILURE);
    }
    
    printf("Server listening on port %d\n", PORT);
    
    while (1) {
        // Accept connection
        client_fd = accept(server_fd, 
                          (struct sockaddr *)&client_addr, 
                          &client_len);
        if (client_fd < 0) {
            perror("accept failed");
            continue;
        }
        
        // Read request
        ssize_t bytes_read = recv(client_fd, buffer, 
                                  BUFFER_SIZE - 1, 0);
        if (bytes_read > 0) {
            buffer[bytes_read] = '\0';
            printf("Received: %s\n", buffer);
            
            // Send response
            const char *response = "Message received\n";
            send(client_fd, response, strlen(response), 0);
        }
        
        close(client_fd);
    }
    
    close(server_fd);
    return 0;
}

As you can see, you had to track a lot of housekeeping like socket creation, binding, listening, accepting, reading, writing, and meticulous error handling at every step. Memory management was entirely manual—forget to close() a file descriptor and you’d leak resources. If you make a mistake with recv() buffer sizes and you’d overflow memory. I also experimented with Fast Sockets from UC Berkeley, which used kernel bypass techniques for lower latency and offered better performance.

Key lesson I learned was that low-level control comes at a steep cost. The cognitive load of managing these details makes it nearly impossible to focus on business logic.

Sun RPC and XDR

When working for a physics lab with a large computing facilities consists of Sun workstations, Solaris, and SPARC processors, I discovered Sun RPC (Remote Procedure Call) with XDR (External Data Representation). XDR solved a critical problem: how do you exchange data between machines with different architectures? A SPARC processor uses big-endian byte ordering, while x86 uses little-endian. XDR provided a canonical, architecture-neutral format for representing data. Here’s an XDR definition file (types.x):

/* Define a structure for customer data */
struct customer {
    int customer_id;
    string name<30>;
    float balance;
};

/* Define the RPC program */
program CUSTOMER_PROG {
    version CUSTOMER_VERS {
        int ADD_CUSTOMER(customer) = 1;
        customer GET_CUSTOMER(int) = 2;
    } = 1;
} = 0x20000001;

You’d run rpcgen on this file:

$ rpcgen types.x

This generated the client stub, server stub, and XDR serialization code automatically. Here’s what the server implementation looked like:

#include "types.h"

int *add_customer_1_svc(customer *cust, struct svc_req *rqstp) {
    static int result;
    
    // Add customer to database
    printf("Adding customer: %s (ID: %d)\n", 
           cust->name, cust->customer_id);
    
    result = 1;  // Success
    return &result;
}

customer *get_customer_1_svc(int *cust_id, struct svc_req *rqstp) {
    static customer result;
    
    // Fetch from database
    result.customer_id = *cust_id;
    result.name = strdup("John Doe");
    result.balance = 1000.50;
    
    return &result;
}

And the client:

#include "types.h"

int main(int argc, char *argv[]) {
    CLIENT *clnt;
    customer cust;
    int *result;
    
    clnt = clnt_create("localhost", CUSTOMER_PROG, 
                       CUSTOMER_VERS, "tcp");
    if (clnt == NULL) {
        clnt_pcreateerror("localhost");
        exit(1);
    }
    
    // Call remote procedure
    cust.customer_id = 123;
    cust.name = "Alice Smith";
    cust.balance = 5000.00;
    
    result = add_customer_1(&cust, clnt);
    if (result == NULL) {
        clnt_perror(clnt, "call failed");
    }
    
    clnt_destroy(clnt);
    return 0;
}

This was my first introduction to Interface Definition Languages (IDL) and I found that defining the contract once and generating code automatically reduces errors. This pattern would reappear in CORBA, Protocol Buffers, and gRPC.

Parallel Computing

During my graduate and post-graduate studies in mid 1990s while working full time, I researched into the parallel and distributed computing. I worked with MPI (Message Passing Interface) and IBM’s MPL on SP1/SP2 systems. MPI provided collective operations like broadcast, scatter, gather, and reduce (predecessor to Hadoop like map/reduce). Here’s a simple MPI example that computes the sum of an array in parallel:

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>

#define ARRAY_SIZE 1000

int main(int argc, char** argv) {
    int rank, size;
    int data[ARRAY_SIZE];
    int local_sum = 0, global_sum = 0;
    int chunk_size, start, end;
    
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    
    // Initialize data on root
    if (rank == 0) {
        for (int i = 0; i < ARRAY_SIZE; i++) {
            data[i] = i + 1;
        }
    }
    
    // Broadcast data to all processes
    MPI_Bcast(data, ARRAY_SIZE, MPI_INT, 0, MPI_COMM_WORLD);
    
    // Each process computes sum of its chunk
    chunk_size = ARRAY_SIZE / size;
    start = rank * chunk_size;
    end = (rank == size - 1) ? ARRAY_SIZE : start + chunk_size;
    
    for (int i = start; i < end; i++) {
        local_sum += data[i];
    }
    
    // Reduce all local sums to global sum
    MPI_Reduce(&local_sum, &global_sum, 1, MPI_INT, 
               MPI_SUM, 0, MPI_COMM_WORLD);
    
    if (rank == 0) {
        printf("Global sum: %d\n", global_sum);
    }
    
    MPI_Finalize();
    return 0;
}

For my post-graduate project, I built JavaNOW (Java on Networks of Workstations), which was inspired by Linda’s tuple spaces and MPI’s collective operations, but implemented in pure Java for portability. The key innovation was our Actor-inspired model. Instead of heavyweight processes communicating through message passing, I used lightweight Java threads with an Entity Space (distributed associative memory) where “actors” could put and get entities asynchronously. Here’s a simple example:

public class SumTask extends ActiveEntity {
    public Object execute(Object arg, JavaNOWAPI api) {
        Integer myId = (Integer) arg;
        EntitySpace workspace = new EntitySpace("RESULTS");
        
        // Compute partial sum
        int partialSum = 0;
        for (int i = myId * 100; i < (myId + 1) * 100; i++) {
            partialSum += i;
        }
        
        // Store result in EntitySpace
        return new Integer(partialSum);
    }
}

// Main application
public class ParallelSum extends JavaNOWApplication {
    public void master() {
        EntitySpace workspace = new EntitySpace("RESULTS");
        
        // Spawn parallel tasks
        for (int i = 0; i < 10; i++) {
            ActiveEntity task = new SumTask(new Integer(i));
            getJavaNOWAPI().eval(workspace, task, new Integer(i));
        }
        
        // Collect results
        int totalSum = 0;
        for (int i = 0; i < 10; i++) {
            Entity result = getJavaNOWAPI().get(
                workspace, new Entity(new Integer(i)));
            totalSum += ((Integer)result.getEntityValue()).intValue();
        }
        
        System.out.println("Total sum: " + totalSum);
    }
    
    public void slave(int id) {
        // Slave nodes wait for work
    }
}

Since then, I have seen the Actor model have gained a wide adoption. For example, today’s serverless functions (AWS Lambda, Azure Functions, Google Cloud Functions) and modern frameworks like Akka, Orleans, and Dapr all embrace Actor-inspired patterns.

Novell and CGI

I also briefly worked with Novell’s IPX (Internetwork Packet Exchange) protocol, which had painful APIs. Here’s a taste of IPX socket programming (simplified):

#include <nwcalls.h>
#include <nwipxspx.h>

int main() {
    IPXAddress server_addr;
    IPXPacket packet;
    WORD socket_number = 0x4000;
    
    // Open IPX socket
    IPXOpenSocket(socket_number, 0);
    
    // Setup address
    memset(&server_addr, 0, sizeof(IPXAddress));
    memcpy(server_addr.network, target_network, 4);
    memcpy(server_addr.node, target_node, 6);
    server_addr.socket = htons(socket_number);
    
    // Send packet
    packet.packetType = 4;  // IPX packet type
    memcpy(packet.data, "Hello", 5);
    IPXSendPacket(socket_number, &server_addr, &packet);
    
    IPXCloseSocket(socket_number);
    return 0;
}

Early Web Development with CGI

When the web emerged in early 1990s, I built applications using CGI (Common Gateway Interface) with Perl and C. I deployed these on Apache HTTP Server, which was the first production-quality open source web server and quickly became the dominant web server of the 1990s. Apache used process-driven concurrency where it forked a new process for each request or maintained a pool of pre-forked processes. CGI was conceptually simple: the web server launched a new UNIX process for every request, passing input via stdin and receiving output via stdout. Here’s a simple Perl CGI script:

#!/usr/bin/perl
use strict;
use warnings;
use CGI;

my $cgi = CGI->new;

print $cgi->header('text/html');
print "<html><body>\n";
print "<h1>Hello from CGI!</h1>\n";

my $name = $cgi->param('name') || 'Guest';
print "<p>Welcome, $name!</p>\n";

# Simulate database query
my $user_count = 42;
print "<p>Total users: $user_count</p>\n";

print "</body></html>\n";

And in C:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int main() {
    char *query_string = getenv("QUERY_STRING");
    
    printf("Content-Type: text/html\n\n");
    printf("<html><body>\n");
    printf("<h1>CGI in C</h1>\n");
    
    if (query_string) {
        printf("<p>Query string: %s</p>\n", query_string);
    }
    
    printf("</body></html>\n");
    return 0;
}

Later, I migrated to more performant servers: Tomcat for Java servlets, Jetty as an embedded server, and Netty for building custom high-performance network applications. These servers used asynchronous I/O and lightweight threads (or even non-blocking event loops in Netty‘s case).

Key Lesson I learned was that scalability matters. The CGI model’s inability to maintain persistent connections or share state made it unsuitable for modern web applications. The shift from process-per-request to thread pools and then to async I/O represented fundamental improvements in how we handle concurrency.

Java Adoption

When Java was released in 1995, I adopted it wholeheartedly. It saved developers from manual memory management using malloc() and free() debugging. Network programming became far more approachable:

import java.io.*;
import java.net.*;

public class SimpleServer {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            System.out.println("Server listening on port " + port);
            
            while (true) {
                try (Socket clientSocket = serverSocket.accept();
                     BufferedReader in = new BufferedReader(
                         new InputStreamReader(clientSocket.getInputStream()));
                     PrintWriter out = new PrintWriter(
                         clientSocket.getOutputStream(), true)) {
                    
                    String request = in.readLine();
                    System.out.println("Received: " + request);
                    
                    out.println("Message received");
                }
            }
        }
    }
}

Java Threads

I had previously used pthreads in C, which were hard to use but Java’s threading model was far simpler:

public class ConcurrentServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8080);
        
        while (true) {
            Socket clientSocket = serverSocket.accept();
            
            // Spawn thread to handle client
            new Thread(new ClientHandler(clientSocket)).start();
        }
    }
    
    static class ClientHandler implements Runnable {
        private Socket socket;
        
        public ClientHandler(Socket socket) {
            this.socket = socket;
        }
        
        public void run() {
            try (BufferedReader in = new BufferedReader(
                     new InputStreamReader(socket.getInputStream()));
                 PrintWriter out = new PrintWriter(
                     socket.getOutputStream(), true)) {
                
                String request = in.readLine();
                // Process request
                out.println("Response");
                
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try { socket.close(); } catch (IOException e) {}
            }
        }
    }
}

Java’s synchronized keyword simplified thread-safe programming:

public class ThreadSafeCounter {
    private int count = 0;
    
    public synchronized void increment() {
        count++;
    }
    
    public synchronized int getCount() {
        return count;
    }
}

This was so much easier than managing mutexes, condition variables, and semaphores in C!

Java RMI: Remote Objects Made

When Java added RMI (1997), distributed objects became practical. You could invoke methods on objects running on remote machines almost as if they were local. Define a remote interface:

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface Calculator extends Remote {
    int add(int a, int b) throws RemoteException;
    int multiply(int a, int b) throws RemoteException;
}

Implement it:

import java.rmi.server.UnicastRemoteObject;
import java.rmi.RemoteException;

public class CalculatorImpl extends UnicastRemoteObject 
                            implements Calculator {
    
    public CalculatorImpl() throws RemoteException {
        super();
    }
    
    public int add(int a, int b) throws RemoteException {
        return a + b;
    }
    
    public int multiply(int a, int b) throws RemoteException {
        return a * b;
    }
}

Server:

import java.rmi.Naming;
import java.rmi.registry.LocateRegistry;

public class Server {
    public static void main(String[] args) {
        try {
            LocateRegistry.createRegistry(1099);
            Calculator calc = new CalculatorImpl();
            Naming.rebind("Calculator", calc);
            System.out.println("Server ready");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Client:

import java.rmi.Naming;

public class Client {
    public static void main(String[] args) {
        try {
            Calculator calc = (Calculator) Naming.lookup(
                "rmi://localhost/Calculator");
            
            int result = calc.add(5, 3);
            System.out.println("5 + 3 = " + result);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

I found that RMI was constrained and everything had to extend Remote, and you were stuck with Java-to-Java communication. Key lesson I learned was that abstractions that feel natural to developers get adopted.

JINI: RMI with Service Discovery

At a travel booking company in the mid 2000s, I used JINI, which Sun Microsystems pitched as “RMI on steroids.” JINI extended RMI with automatic service discovery, leasing, and distributed events. The core idea: services could join a network, advertise themselves, and be discovered by clients without hardcoded locations. Here’s a JINI service interface and registration:

import net.jini.core.lookup.ServiceRegistrar;
import net.jini.discovery.LookupDiscovery;
import net.jini.lease.LeaseRenewalManager;
import java.rmi.Remote;
import java.rmi.RemoteException;

// Service interface
public interface BookingService extends Remote {
    String searchFlights(String origin, String destination) 
        throws RemoteException;
    boolean bookFlight(String flightId, String passenger) 
        throws RemoteException;
}

// Service provider
public class BookingServiceProvider implements DiscoveryListener {
    
    public void discovered(DiscoveryEvent event) {
        ServiceRegistrar[] registrars = event.getRegistrars();
        
        for (ServiceRegistrar registrar : registrars) {
            try {
                BookingService service = new BookingServiceImpl();
                Entry[] attributes = new Entry[] {
                    new Name("FlightBookingService")
                };
                
                ServiceItem item = new ServiceItem(null, service, attributes);
                ServiceRegistration reg = registrar.register(
                    item, Lease.FOREVER);
                
                // Auto-renew lease
                leaseManager.renewUntil(reg.getLease(), Lease.FOREVER, null);
                
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Client discovery and usage:

public class BookingClient implements DiscoveryListener {
    
    public void discovered(DiscoveryEvent event) {
        ServiceRegistrar[] registrars = event.getRegistrars();
        
        for (ServiceRegistrar registrar : registrars) {
            try {
                ServiceTemplate template = new ServiceTemplate(
                    null, new Class[] { BookingService.class }, null);
                
                ServiceItem item = registrar.lookup(template);
                
                if (item != null) {
                    BookingService booking = (BookingService) item.service;
                    String flights = booking.searchFlights("SFO", "NYC");
                    booking.bookFlight("FL123", "John Smith");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Though, JINI provided automatic discovery, leasing and location transparency but it was too complex and only supported Java ecosystem. The ideas were sound and reappeared later in service meshes (Consul, Eureka) and Kubernetes service discovery. I learned that service discovery is essential for dynamic systems, but the implementation must be simple.

CORBA

I used CORBA (Common Object Request Broker Architecture) for many years in 1990s when building intelligent traffic Systems. CORBA promised the language-independent, platform-independent distributed objects. You could write a service in C++, invoke it from Java, and have clients in Python using the same IDL. Here’s a simple CORBA IDL definition:

module TrafficMonitor {
    struct SensorData {
        long sensor_id;
        float speed;
        long timestamp;
    };
    
    typedef sequence<SensorData> SensorDataList;
    
    interface TrafficService {
        void reportData(in SensorData data);
        SensorDataList getRecentData(in long minutes);
        float getAverageSpeed();
    };
};

Run the IDL compiler:

$ idl traffic.idl

This generated client stubs and server skeletons for your target language. I built a message-oriented middleware (MOM) system with CORBA that collected traffic data from road sensors and provided real-time traffic information.

C++ server implementation:

#include "TrafficService_impl.h"
#include <iostream>
#include <vector>

class TrafficServiceImpl : public POA_TrafficMonitor::TrafficService {
private:
    std::vector<TrafficMonitor::SensorData> data_store;
    
public:
    void reportData(const TrafficMonitor::SensorData& data) {
        data_store.push_back(data);
        std::cout << "Received data from sensor " 
                  << data.sensor_id << std::endl;
    }
    
    TrafficMonitor::SensorDataList* getRecentData(CORBA::Long minutes) {
        TrafficMonitor::SensorDataList* result = 
            new TrafficMonitor::SensorDataList();
        
        // Filter data from last N minutes
        time_t cutoff = time(NULL) - (minutes * 60);
        for (const auto& entry : data_store) {
            if (entry.timestamp >= cutoff) {
                result->length(result->length() + 1);
                (*result)[result->length() - 1] = entry;
            }
        }
        return result;
    }
    
    CORBA::Float getAverageSpeed() {
        if (data_store.empty()) return 0.0;
        
        float sum = 0.0;
        for (const auto& entry : data_store) {
            sum += entry.speed;
        }
        return sum / data_store.size();
    }
};

Java client:

import org.omg.CORBA.*;
import TrafficMonitor.*;

public class TrafficClient {
    public static void main(String[] args) {
        try {
            // Initialize ORB
            ORB orb = ORB.init(args, null);
            
            // Get reference to service
            org.omg.CORBA.Object obj = 
                orb.string_to_object("corbaname::localhost:1050#TrafficService");
            TrafficService service = TrafficServiceHelper.narrow(obj);
            
            // Report sensor data
            SensorData data = new SensorData();
            data.sensor_id = 101;
            data.speed = 65.5f;
            data.timestamp = (int)(System.currentTimeMillis() / 1000);
            
            service.reportData(data);
            
            // Get average speed
            float avgSpeed = service.getAverageSpeed();
            System.out.println("Average speed: " + avgSpeed + " mph");
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

However, CORBA specification was massive and different ORB (Object Request Broker) implementations like Orbix, ORBacus, and TAO couldn’t reliably interoperate despite claiming CORBA compliance. The binary protocol, IIOP, had subtle incompatibilities. CORBA did introduce valuable concepts:

  • Interceptors for cross-cutting concerns (authentication, logging, monitoring)
  • IDL-first design that forced clear interface definitions
  • Language-neutral protocols that actually worked (sometimes)

I learned that standards designed by committee are often over-engineer. CORBA, SOAP tried to solve every problem for everyone and ended up being optimal for no one.

SOAP and WSDL

I used SOAP (Simple Object Access Protocol) and WSDL (Web Services Description Language) on a number of projects in early 2000s that emerged as the standard for web services. The pitch: XML-based, platform-neutral, and “simple.” Here’s a WSDL definition:

<?xml version="1.0"?>
<definitions name="CustomerService"
   targetNamespace="http://example.com/customer"
   xmlns="http://schemas.xmlsoap.org/wsdl/"
   xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
   xmlns:tns="http://example.com/customer"
   xmlns:xsd="http://www.w3.org/2001/XMLSchema">
   
   <types>
      <xsd:schema targetNamespace="http://example.com/customer">
         <xsd:complexType name="Customer">
            <xsd:sequence>
               <xsd:element name="id" type="xsd:int"/>
               <xsd:element name="name" type="xsd:string"/>
               <xsd:element name="balance" type="xsd:double"/>
            </xsd:sequence>
         </xsd:complexType>
      </xsd:schema>
   </types>
   
   <message name="GetCustomerRequest">
      <part name="customerId" type="xsd:int"/>
   </message>
   
   <message name="GetCustomerResponse">
      <part name="customer" type="tns:Customer"/>
   </message>
   
   <portType name="CustomerPortType">
      <operation name="getCustomer">
         <input message="tns:GetCustomerRequest"/>
         <output message="tns:GetCustomerResponse"/>
      </operation>
   </portType>
   
   <binding name="CustomerBinding" type="tns:CustomerPortType">
      <soap:binding transport="http://schemas.xmlsoap.org/soap/http"/>
      <operation name="getCustomer">
         <soap:operation soapAction="getCustomer"/>
         <input>
            <soap:body use="literal"/>
         </input>
         <output>
            <soap:body use="literal"/>
         </output>
      </operation>
   </binding>
   
   <service name="CustomerService">
      <port name="CustomerPort" binding="tns:CustomerBinding">
         <soap:address location="http://example.com/customer"/>
      </port>
   </service>
</definitions>

A SOAP request looked like this:

<?xml version="1.0"?>
<soap:Envelope 
    xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
    xmlns:cust="http://example.com/customer">
  <soap:Header>
    <cust:Authentication>
      <cust:username>john</cust:username>
      <cust:password>secret</cust:password>
    </cust:Authentication>
  </soap:Header>
  <soap:Body>
    <cust:getCustomer>
      <cust:customerId>12345</cust:customerId>
    </cust:getCustomer>
  </soap:Body>
</soap:Envelope>

The response:

<?xml version="1.0"?>
<soap:Envelope 
    xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
    xmlns:cust="http://example.com/customer">
  <soap:Body>
    <cust:getCustomerResponse>
      <cust:customer>
        <cust:id>12345</cust:id>
        <cust:name>John Smith</cust:name>
        <cust:balance>5000.00</cust:balance>
      </cust:customer>
    </cust:getCustomerResponse>
  </soap:Body>
</soap:Envelope>

You can look at all that XML overhead! A simple request became hundreds of bytes of markup. As SOAP was designed by committee (IBM, Oracle, Microsoft), it tried to solve every possible enterprise problem: transactions, security, reliability, routing, orchestration. I learned that simplicity beats features and SOAP collapsed under its own weight.

Java Servlets and Filters

With Java 1.1, it added support for Servlets that provided a much better model than CGI. Instead of spawning a process per request, servlets were Java classes instantiated once and reused across requests:

import javax.servlet.*;
import javax.servlet.http.*;
import java.io.*;

public class CustomerServlet extends HttpServlet {
    
    protected void doGet(HttpServletRequest request, 
                        HttpServletResponse response)
            throws ServletException, IOException {
        
        String customerId = request.getParameter("id");
        
        response.setContentType("application/json");
        PrintWriter out = response.getWriter();
        
        // Fetch customer data
        Customer customer = getCustomerFromDatabase(customerId);
        
        if (customer != null) {
            out.println(String.format(
                "{\"id\": \"%s\", \"name\": \"%s\", \"balance\": %.2f}",
                customer.getId(), customer.getName(), customer.getBalance()
            ));
        } else {
            response.setStatus(HttpServletResponse.SC_NOT_FOUND);
            out.println("{\"error\": \"Customer not found\"}");
        }
    }
    
    protected void doPost(HttpServletRequest request, 
                         HttpServletResponse response)
            throws ServletException, IOException {
        
        BufferedReader reader = request.getReader();
        StringBuilder json = new StringBuilder();
        String line;
        while ((line = reader.readLine()) != null) {
            json.append(line);
        }
        
        // Parse JSON and create customer
        Customer customer = parseJsonToCustomer(json.toString());
        saveCustomerToDatabase(customer);
        
        response.setStatus(HttpServletResponse.SC_CREATED);
        response.setContentType("application/json");
        PrintWriter out = response.getWriter();
        out.println(json.toString());
    }
}

Servlet Filters

The Filter API with Java Servlets was quite powerful and it supported a chain-of-responsibility pattern for handling cross-cutting concerns:

import javax.servlet.*;
import javax.servlet.http.*;
import java.io.IOException;

public class AuthenticationFilter implements Filter {
    
    public void doFilter(ServletRequest request, 
                        ServletResponse response,
                        FilterChain chain) 
            throws IOException, ServletException {
        
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        HttpServletResponse httpResponse = (HttpServletResponse) response;
        
        // Check for authentication token
        String token = httpRequest.getHeader("Authorization");
        
        if (token == null || !isValidToken(token)) {
            httpResponse.setStatus(HttpServletResponse.SC_UNAUTHORIZED);
            httpResponse.getWriter().println("{\"error\": \"Unauthorized\"}");
            return;
        }
        
        // Pass to next filter or servlet
        chain.doFilter(request, response);
    }
    
    private boolean isValidToken(String token) {
        // Validate token
        return token.startsWith("Bearer ") && 
               validateJWT(token.substring(7));
    }
}

Configuration in web.xml:

<filter>
    <filter-name>AuthenticationFilter</filter-name>
    <filter-class>com.example.AuthenticationFilter</filter-class>
</filter>

<filter-mapping>
    <filter-name>AuthenticationFilter</filter-name>
    <url-pattern>/api/*</url-pattern>
</filter-mapping>

You could chain filters for compression, logging, transformation, rate limiting with clean separation of concerns without touching business logic. I previously had experienced with CORBA interceptors for injecting cross-cutting business logic and the filter pattern solved similar cross-cutting concerns problem. This pattern would reappear in service meshes and API gateways.

Enterprise Java Beans

I used Enterprise Java Beans (EJB) in late 1990s and early 2000s that attempted to make distributed objects transparent. Its key idea was that use regular Java objects and let the application server handle all the distribution, persistence, transactions, and security. Here’s what an EJB 2.x entity bean looked like:

// Remote interface
public interface Customer extends EJBObject {
    String getName() throws RemoteException;
    void setName(String name) throws RemoteException;
    double getBalance() throws RemoteException;
    void setBalance(double balance) throws RemoteException;
}

// Home interface
public interface CustomerHome extends EJBHome {
    Customer create(Integer id, String name) throws CreateException, RemoteException;
    Customer findByPrimaryKey(Integer id) throws FinderException, RemoteException;
}

// Bean implementation
public class CustomerBean implements EntityBean {
    private Integer id;
    private String name;
    private double balance;
    
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public double getBalance() { return balance; }
    public void setBalance(double balance) { this.balance = balance; }
    
    // Container callbacks
    public void ejbActivate() {}
    public void ejbPassivate() {}
    public void ejbLoad() {}
    public void ejbStore() {}
    public void setEntityContext(EntityContext ctx) {}
    public void unsetEntityContext() {}
    
    public Integer ejbCreate(Integer id, String name) {
        this.id = id;
        this.name = name;
        this.balance = 0.0;
        return null;
    }
    
    public void ejbPostCreate(Integer id, String name) {}
}

The N+1 Selects Problem and Network Fallacy

The fatal flaw: EJB pretended network calls were free. I watched teams write code like this:

CustomerHome home = // ... lookup
Customer customer = home.findByPrimaryKey(customerId);

// Each getter is a remote call!
String name = customer.getName();        // Network call
double balance = customer.getBalance();  // Network call

Worse, I saw code that made remote calls in loops:

Collection customers = home.findAll();
double totalBalance = 0.0;
for (Customer customer : customers) {
    // Remote call for EVERY iteration!
    totalBalance += customer.getBalance();
}

This violated the first Fallacy of Distributed Computing: The network is reliable. It’s also not zero latency. What looked like simple property access actually made HTTP calls to a remote server. I had previously built distributed and parallel applications, so I understood network latency. But it blindsided most developers because EJB deliberately hid it.

I learned that you can’t hide distribution. Network calls are fundamentally different from local calls. Latency, failure modes, and semantics are different. Transparency is a lie.

REST Standard

Before REST became mainstream, I experimented with “Plain Old XML” (POX) over HTTP by just sending XML documents via HTTP POST without all the SOAP ceremony:

import requests
import xml.etree.ElementTree as ET

# Create XML request
root = ET.Element('getCustomer')
ET.SubElement(root, 'customerId').text = '12345'
xml_data = ET.tostring(root, encoding='utf-8')

# Send HTTP POST
response = requests.post(
    'http://api.example.com/customer',
    data=xml_data,
    headers={'Content-Type': 'application/xml'}
)

# Parse response
response_tree = ET.fromstring(response.content)
name = response_tree.find('name').text

This was simpler than SOAP, but still ad-hoc. Then REST (Representational State Transfer), based on Roy Fielding’s 2000 dissertation offered a principled approach:

  • Use HTTP methods semantically (GET, POST, PUT, DELETE)
  • Resources have URLs
  • Stateless communication
  • Hypermedia as the engine of application state (HATEOAS)

Here’s a RESTful API in Python with Flask:

from flask import Flask, jsonify, request

app = Flask(__name__)

# In-memory data store
customers = {
    '12345': {'id': '12345', 'name': 'John Smith', 'balance': 5000.00}
}

@app.route('/customers/<customer_id>', methods=['GET'])
def get_customer(customer_id):
    customer = customers.get(customer_id)
    if customer:
        return jsonify(customer), 200
    return jsonify({'error': 'Customer not found'}), 404

@app.route('/customers', methods=['POST'])
def create_customer():
    data = request.get_json()
    customer_id = data['id']
    customers[customer_id] = data
    return jsonify(data), 201

@app.route('/customers/<customer_id>', methods=['PUT'])
def update_customer(customer_id):
    if customer_id not in customers:
        return jsonify({'error': 'Customer not found'}), 404
    
    data = request.get_json()
    customers[customer_id].update(data)
    return jsonify(customers[customer_id]), 200

@app.route('/customers/<customer_id>', methods=['DELETE'])
def delete_customer(customer_id):
    if customer_id in customers:
        del customers[customer_id]
        return '', 204
    return jsonify({'error': 'Customer not found'}), 404

if __name__ == '__main__':
    app.run(debug=True)

Client code became trivial:

import requests

# GET customer
response = requests.get('http://localhost:5000/customers/12345')
if response.status_code == 200:
    customer = response.json()
    print(f"Customer: {customer['name']}")

# Create new customer
new_customer = {
    'id': '67890',
    'name': 'Alice Johnson',
    'balance': 3000.00
}
response = requests.post(
    'http://localhost:5000/customers',
    json=new_customer
)

# Update customer
update_data = {'balance': 3500.00}
response = requests.put(
    'http://localhost:5000/customers/67890',
    json=update_data
)

# Delete customer
response = requests.delete('http://localhost:5000/customers/67890')

Hypermedia and HATEOAS

True REST embraced hypermedia—responses included links to related resources:

{
  "id": "12345",
  "name": "John Smith",
  "balance": 5000.00,
  "_links": {
    "self": {"href": "/customers/12345"},
    "orders": {"href": "/customers/12345/orders"},
    "transactions": {"href": "/customers/12345/transactions"}
  }
}

In practice, most APIs called “REST” weren’t truly RESTful and didn’t implement HATEOAS or use HTTP status codes correctly. But even “REST-ish” APIs were far simpler than SOAP. Key lesson I leared was that REST succeeded because it built on HTTP, something every platform already supported. No new protocols, no complex tooling. Just URLs, HTTP verbs, and JSON.

JSON Replaces XML

With adoption of REST, I saw a decline of XML Web Services (JAX-WS) and I used JAX-RS for REST services that supported JSON payload. XML required verbose markup:

<?xml version="1.0"?>
<customer>
    <id>12345</id>
    <name>John Smith</name>
    <balance>5000.00</balance>
    <orders>
        <order>
            <id>001</id>
            <date>2024-01-15</date>
            <total>99.99</total>
        </order>
        <order>
            <id>002</id>
            <date>2024-02-20</date>
            <total>149.50</total>
        </order>
    </orders>
</customer>

The same data in JSON:

{
  "id": "12345",
  "name": "John Smith",
  "balance": 5000.00,
  "orders": [
    {
      "id": "001",
      "date": "2024-01-15",
      "total": 99.99
    },
    {
      "id": "002",
      "date": "2024-02-20",
      "total": 149.50
    }
  ]
}

JSON does have limitations. It doesn’t natively support references or circular structures, making recursive relationships awkward:

{
  "id": "A",
  "children": [
    {
      "id": "B",
      "parent_id": "A"
    }
  ]
}

You have to encode references manually, unlike some XML schemas that support IDREF.

Erlang/OTP

I learned about actor model in college and built a framework based on actors and Linda memory model. In the mid-2000s, I encountered Erlang that used actors for building distributed systems. Erlang was designed in the 1980s at Ericsson for building telecom switches and is based on following design:

  • “Let it crash” philosophy
  • No shared memory between processes
  • Lightweight processes (not OS threads—Erlang processes)
  • Supervision trees for fault recovery
  • Hot code swapping for zero-downtime updates

Here’s what an Erlang actor (process) looks like:

-module(customer_server).
-export([start/0, init/0, get_customer/1, update_balance/2]).

% Start the server
start() ->
    Pid = spawn(customer_server, init, []),
    register(customer_server, Pid),
    Pid.

% Initialize with empty state
init() ->
    State = #{},  % Empty map
    loop(State).

% Main loop - handle messages
loop(State) ->
    receive
        {get_customer, CustomerId, From} ->
            Customer = maps:get(CustomerId, State, not_found),
            From ! {customer, Customer},
            loop(State);
        
        {update_balance, CustomerId, NewBalance, From} ->
            Customer = maps:get(CustomerId, State),
            UpdatedCustomer = Customer#{balance => NewBalance},
            NewState = maps:put(CustomerId, UpdatedCustomer, State),
            From ! {ok, updated},
            loop(NewState);
        
        {add_customer, CustomerId, Customer, From} ->
            NewState = maps:put(CustomerId, Customer, State),
            From ! {ok, added},
            loop(NewState);
        
        stop ->
            ok;
        
        _ ->
            loop(State)
    end.

% Client functions
get_customer(CustomerId) ->
    customer_server ! {get_customer, CustomerId, self()},
    receive
        {customer, Customer} -> Customer
    after 5000 ->
        timeout
    end.

update_balance(CustomerId, NewBalance) ->
    customer_server ! {update_balance, CustomerId, NewBalance, self()},
    receive
        {ok, updated} -> ok
    after 5000 ->
        timeout
    end.

Erlang made concurrency became simple by using messaging passing with actors.

The Supervision Tree

A key innovation of Erlang was supervision trees. You organized processes in a hierarchy, and supervisors would restart crashed children:

-module(customer_supervisor).
-behaviour(supervisor).

-export([start_link/0, init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    % Supervisor strategy
    SupFlags = #{
        strategy => one_for_one,  % Restart only failed child
        intensity => 5,            % Max 5 restarts
        period => 60               % Per 60 seconds
    },
    
    % Child specifications
    ChildSpecs = [
        #{
            id => customer_server,
            start => {customer_server, start, []},
            restart => permanent,   % Always restart
            shutdown => 5000,
            type => worker,
            modules => [customer_server]
        },
        #{
            id => order_server,
            start => {order_server, start, []},
            restart => permanent,
            shutdown => 5000,
            type => worker,
            modules => [order_server]
        }
    ],
    
    {ok, {SupFlags, ChildSpecs}}.

If a process crashed, the supervisor automatically restarted it and the system self-healed. A key lesson I learned from actor model and Erlang was that a shared mutable state is the enemy. Message passing with isolated state is simpler, more reliable, and easier to reason about. Today, AWS Lambda, Azure Durable Functions, and frameworks like Akka all embrace the Actor model.

Distributed Erlang

Erlang made distributed computing almost trivial. Processes on different nodes communicated identically to local processes:

% On node1@host1
RemotePid = spawn('node2@host2', module, function, [args]),
RemotePid ! {message, data}.

% On node2@host2 - receives the message
receive
    {message, Data} -> 
        io:format("Received: ~p~n", [Data])
end.

The VM handled all the complexity of node discovery, connection management, and message routing. Today’s serverless functions are actors and kubernetes pods are supervised processes.

Asynchronous Messaging

As systems grew more complex, asynchronous messaging became essential. I worked extensively with Oracle Tuxedo, IBM MQSeries, WebLogic JMS, WebSphere MQ, and later ActiveMQ, MQTT / AMQP, ZeroMQ and RabbitMQ primarily for inter-service communication and asynchronous processing. Here’s a JMS producer in Java:

import javax.jms.*;
import javax.naming.*;

public class OrderProducer {
    public static void main(String[] args) throws Exception {
        Context ctx = new InitialContext();
        ConnectionFactory factory = 
            (ConnectionFactory) ctx.lookup("ConnectionFactory");
        Queue queue = (Queue) ctx.lookup("OrderQueue");
        
        Connection connection = factory.createConnection();
        Session session = connection.createSession(
            false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(queue);
        
        // Create message
        TextMessage message = session.createTextMessage();
        message.setText("{ \"orderId\": \"12345\", " +
                       "\"customerId\": \"67890\", " +
                       "\"amount\": 99.99 }");
        
        // Send message
        producer.send(message);
        System.out.println("Order sent: " + message.getText());
        
        connection.close();
    }
}

JMS consumer:

import javax.jms.*;
import javax.naming.*;

public class OrderConsumer implements MessageListener {
    
    public static void main(String[] args) throws Exception {
        Context ctx = new InitialContext();
        ConnectionFactory factory = 
            (ConnectionFactory) ctx.lookup("ConnectionFactory");
        Queue queue = (Queue) ctx.lookup("OrderQueue");
        
        Connection connection = factory.createConnection();
        Session session = connection.createSession(
            false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(queue);
        
        consumer.setMessageListener(new OrderConsumer());
        connection.start();
        
        System.out.println("Waiting for messages...");
        Thread.sleep(Long.MAX_VALUE);  // Keep running
    }
    
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Received order: " + 
                             textMessage.getText());
            
            // Process order
            processOrder(textMessage.getText());
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    
    private void processOrder(String orderJson) {
        // Business logic here
    }
}

Asynchronous messaging is essential for building resilient, scalable systems. It decouples producers from consumers, provides natural backpressure, and enables event-driven architectures.

Spring Framework and Aspect-Oriented Programming

In early 2000, I used aspect oriented programming (AOP) to inject cross cutting concerns like logging, security, monitoring, etc. Here is a typical example:

@Aspect
@Component
public class LoggingAspect {
    
    private static final Logger logger = 
        LoggerFactory.getLogger(LoggingAspect.class);
    
    @Before("execution(* com.example.service.*.*(..))")
    public void logBefore(JoinPoint joinPoint) {
        logger.info("Executing: " + 
                   joinPoint.getSignature().getName());
    }
    
    @AfterReturning(
        pointcut = "execution(* com.example.service.*.*(..))",
        returning = "result")
    public void logAfterReturning(JoinPoint joinPoint, Object result) {
        logger.info("Method " + 
                   joinPoint.getSignature().getName() + 
                   " returned: " + result);
    }
    
    @Around("@annotation(com.example.Monitored)")
    public Object measureTime(ProceedingJoinPoint joinPoint) 
            throws Throwable {
        long start = System.currentTimeMillis();
        Object result = joinPoint.proceed();
        long time = System.currentTimeMillis() - start;
        logger.info(joinPoint.getSignature().getName() + 
                   " took " + time + " ms");
        return result;
    }
}

I later adopted Spring Framework that revolutionized Java development with dependency injection and aspect-oriented programming (AOP):

// Spring configuration
@Configuration
public class AppConfig {
    
    @Bean
    public CustomerService customerService() {
        return new CustomerServiceImpl(customerRepository());
    }
    
    @Bean
    public CustomerRepository customerRepository() {
        return new DatabaseCustomerRepository(dataSource());
    }
    
    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource ds = new DriverManagerDataSource();
        ds.setDriverClassName("com.mysql.jdbc.Driver");
        ds.setUrl("jdbc:mysql://localhost/mydb");
        return ds;
    }
}

// Service class
@Service
public class CustomerServiceImpl implements CustomerService {
    private final CustomerRepository repository;
    
    @Autowired
    public CustomerServiceImpl(CustomerRepository repository) {
        this.repository = repository;
    }
    
    @Transactional
    public void updateBalance(String customerId, double newBalance) {
        Customer customer = repository.findById(customerId);
        customer.setBalance(newBalance);
        repository.save(customer);
    }
}

Spring Remoting

Spring added its own remoting protocols. HTTP Invoker serialized Java objects over HTTP:

// Server configuration
@Configuration
public class ServerConfig {
    
    @Bean
    public HttpInvokerServiceExporter customerService() {
        HttpInvokerServiceExporter exporter = 
            new HttpInvokerServiceExporter();
        exporter.setService(customerServiceImpl());
        exporter.setServiceInterface(CustomerService.class);
        return exporter;
    }
}

// Client configuration
@Configuration
public class ClientConfig {
    
    @Bean
    public HttpInvokerProxyFactoryBean customerService() {
        HttpInvokerProxyFactoryBean proxy = 
            new HttpInvokerProxyFactoryBean();
        proxy.setServiceUrl("http://localhost:8080/customer");
        proxy.setServiceInterface(CustomerService.class);
        return proxy;
    }
}

I learned that AOP addressed cross-cutting concerns elegantly for monoliths. But in microservices, these concerns moved to the infrastructure layer like service meshes, API gateways, and sidecars.

Proprietary Protocols

When working for large companies like Amazon, I encountered Amazon Coral, which is a proprietary RPC framework influenced by CORBA. Coral used an IDL to define service interfaces and supported multiple languages:

// Coral IDL
namespace com.amazon.example

structure CustomerData {
    1: required integer customerId
    2: required string name
    3: optional double balance
}

service CustomerService {
    CustomerData getCustomer(1: integer customerId)
    void updateCustomer(1: CustomerData customer)
    list<CustomerData> listCustomers()
}

The IDL compiler generated client and server code for Java, C++, and other languages. Coral handled serialization, versioning, and service discovery. When I later worked for AWS, I used Smithy that was successor Coral, which Amazon open-sourced. Here is a similar example of a Smithy contract:

namespace com.example

service CustomerService {
    version: "2024-01-01"
    operations: [
        GetCustomer
        UpdateCustomer
        ListCustomers
    ]
}

@readonly
operation GetCustomer {
    input: GetCustomerInput
    output: GetCustomerOutput
    errors: [CustomerNotFound]
}

structure GetCustomerInput {
    @required
    customerId: String
}

structure GetCustomerOutput {
    @required
    customer: Customer
}

structure Customer {
    @required
    customerId: String
    
    @required
    name: String
    
    balance: Double
}

@error("client")
structure CustomerNotFound {
    @required
    message: String
}

I learned IDL-first design remains valuable. Smithy learned from CORBA, Protocol Buffers, and Thrift.

Long Polling, WebSockets, and Real-Time

In late 2000s, I built real-time applications for streaming financial charts and technical data. I used long polling where the client made a request that the server held open until data was available:

// Client-side long polling
function pollServer() {
    fetch('/api/events')
        .then(response => response.json())
        .then(data => {
            console.log('Received event:', data);
            updateUI(data);
            
            // Immediately poll again
            pollServer();
        })
        .catch(error => {
            console.error('Polling error:', error);
            // Retry after delay
            setTimeout(pollServer, 5000);
        });
}

pollServer();

Server-side (Node.js):

const express = require('express');
const app = express();

let pendingRequests = [];

app.get('/api/events', (req, res) => {
    // Hold request open
    pendingRequests.push(res);
    
    // Timeout after 30 seconds
    setTimeout(() => {
        const index = pendingRequests.indexOf(res);
        if (index !== -1) {
            pendingRequests.splice(index, 1);
            res.json({ type: 'heartbeat' });
        }
    }, 30000);
});

// When an event occurs
function broadcastEvent(event) {
    pendingRequests.forEach(res => {
        res.json(event);
    });
    pendingRequests = [];
}

WebSockets

I also used WebSockets for real time applications that supported true bidirectional communication. However, earlier browsers didn’t fully support them so I used long polling as a fallback when websockets were not supported:

// Server (Node.js with ws library)
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
    console.log('Client connected');
    
    // Send initial data
    ws.send(JSON.stringify({
        type: 'INIT',
        data: getInitialData()
    }));
    
    // Handle messages
    ws.on('message', (message) => {
        const msg = JSON.parse(message);
        
        if (msg.type === 'SUBSCRIBE') {
            subscribeToSymbol(ws, msg.symbol);
        }
    });
    
    ws.on('close', () => {
        console.log('Client disconnected');
        unsubscribeAll(ws);
    });
});

// Stream live data
function streamPriceUpdate(symbol, price) {
    wss.clients.forEach((client) => {
        if (client.readyState === WebSocket.OPEN) {
            if (isSubscribed(client, symbol)) {
                client.send(JSON.stringify({
                    type: 'PRICE_UPDATE',
                    symbol: symbol,
                    price: price,
                    timestamp: Date.now()
                }));
            }
        }
    });
}

Client:

const ws = new WebSocket('ws://localhost:8080');

ws.onopen = () => {
    console.log('Connected to server');
    
    // Subscribe to symbols
    ws.send(JSON.stringify({
        type: 'SUBSCRIBE',
        symbol: 'AAPL'
    }));
};

ws.onmessage = (event) => {
    const message = JSON.parse(event.data);
    
    switch (message.type) {
        case 'INIT':
            initializeChart(message.data);
            break;
        case 'PRICE_UPDATE':
            updateChart(message.symbol, message.price);
            break;
    }
};

ws.onerror = (error) => {
    console.error('WebSocket error:', error);
};

ws.onclose = () => {
    console.log('Disconnected, attempting reconnect...');
    setTimeout(connectWebSocket, 1000);
};

I learned that different problems need different protocols. REST works for request-response. WebSockets excel for real-time bidirectional communication.

Vert.x and Hazelcast for High-Performance Streaming

For a production streaming chart system handling high-volume market data, I used Vert.x with Hazelcast. Vert.x is a reactive toolkit built on Netty that excels at handling thousands of concurrent connections with minimal resources. Hazelcast provided distributed caching and coordination across multiple Vert.x instances. Market data flowed into Hazelcast distributed topics, Vert.x instances subscribed to these topics and pushed updates to connected WebSocket clients. If WebSocket wasn’t supported, we fell back to long polling automatically.

import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.ServerWebSocket;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;

public class MarketDataServer {
    private final Vertx vertx;
    private final HazelcastInstance hazelcast;
    private final ConcurrentHashMap<String, Set<ServerWebSocket>> subscriptions;
    
    public MarketDataServer() {
        this.vertx = Vertx.vertx();
        this.hazelcast = Hazelcast.newHazelcastInstance();
        this.subscriptions = new ConcurrentHashMap<>();
        
        // Subscribe to market data topic
        ITopic<MarketData> topic = hazelcast.getTopic("market-data");
        topic.addMessageListener(new MessageListener<MarketData>() {
            public void onMessage(Message<MarketData> message) {
                broadcastToSubscribers(message.getMessageObject());
            }
        });
    }
    
    public void start() {
        HttpServer server = vertx.createHttpServer();
        
        server.webSocketHandler(ws -> {
            String path = ws.path();
            
            if (path.startsWith("/stream/")) {
                String symbol = path.substring(8);
                handleWebSocketConnection(ws, symbol);
            } else {
                ws.reject();
            }
        });
        
        // Long polling fallback
        server.requestHandler(req -> {
            if (req.path().startsWith("/poll/")) {
                String symbol = req.path().substring(6);
                handleLongPolling(req, symbol);
            }
        });
        
        server.listen(8080, result -> {
            if (result.succeeded()) {
                System.out.println("Market data server started on port 8080");
            }
        });
    }
    
    private void handleWebSocketConnection(ServerWebSocket ws, String symbol) {
        subscriptions.computeIfAbsent(symbol, k -> ConcurrentHashMap.newKeySet())
                     .add(ws);
        
        ws.closeHandler(v -> {
            Set<ServerWebSocket> sockets = subscriptions.get(symbol);
            if (sockets != null) {
                sockets.remove(ws);
            }
        });
        
        // Send initial snapshot from Hazelcast cache
        IMap<String, MarketData> cache = hazelcast.getMap("market-snapshot");
        MarketData data = cache.get(symbol);
        if (data != null) {
            ws.writeTextMessage(data.toJson());
        }
    }
    
    private void handleLongPolling(HttpServerRequest req, String symbol) {
        String lastEventId = req.getParam("lastEventId");
        
        // Hold request until data available or timeout
        long timerId = vertx.setTimer(30000, id -> {
            req.response()
               .putHeader("Content-Type", "application/json")
               .end("{\"type\":\"heartbeat\"}");
        });
        
        // Register one-time listener
        subscriptions.computeIfAbsent(symbol + ":poll", 
            k -> ConcurrentHashMap.newKeySet())
            .add(new PollHandler(req, timerId));
    }
    
    private void broadcastToSubscribers(MarketData data) {
        String symbol = data.getSymbol();
        
        // WebSocket subscribers
        Set<ServerWebSocket> sockets = subscriptions.get(symbol);
        if (sockets != null) {
            String json = data.toJson();
            sockets.forEach(ws -> {
                if (!ws.isClosed()) {
                    ws.writeTextMessage(json);
                }
            });
        }
        
        // Update Hazelcast cache for new subscribers
        IMap<String, MarketData> cache = hazelcast.getMap("market-snapshot");
        cache.put(symbol, data);
    }
    
    public static void main(String[] args) {
        new MarketDataServer().start();
    }
}

Publishing market data to Hazelcast from data feed:

public class MarketDataPublisher {
    private final HazelcastInstance hazelcast;
    
    public void publishUpdate(String symbol, double price, long volume) {
        MarketData data = new MarketData(symbol, price, volume, 
                                         System.currentTimeMillis());
        
        // Publish to topic - all Vert.x instances receive it
        ITopic<MarketData> topic = hazelcast.getTopic("market-data");
        topic.publish(data);
    }
}

This architecture provided:

  • Vert.x Event Loop: Non-blocking I/O handled 10,000+ concurrent WebSocket connections per instance
  • Hazelcast Distribution: Market data shared across multiple Vert.x instances without a central message broker
  • Horizontal Scaling: Adding Vert.x instances automatically joined the Hazelcast cluster
  • Low Latency: Sub-millisecond message propagation within the cluster
  • Automatic Fallback: Clients detected WebSocket support; older browsers used long polling

Facebook Thrift and Google Protocol Buffers

I experimented with Facebook Thrift and Google Protocol Buffers that provided IDL-based RPC with multiple protocols: Here is an example of Protocol Buffers:

syntax = "proto3";

package customer;

message Customer {
    int32 customer_id = 1;
    string name = 2;
    double balance = 3;
}

service CustomerService {
    rpc GetCustomer(GetCustomerRequest) returns (Customer);
    rpc UpdateBalance(UpdateBalanceRequest) returns (UpdateBalanceResponse);
    rpc ListCustomers(ListCustomersRequest) returns (CustomerList);
}

message GetCustomerRequest {
    int32 customer_id = 1;
}

message UpdateBalanceRequest {
    int32 customer_id = 1;
    double new_balance = 2;
}

message UpdateBalanceResponse {
    bool success = 1;
}

message ListCustomersRequest {}

message CustomerList {
    repeated Customer customers = 1;
}

Python server with gRPC (which uses Protocol Buffers):

import grpc
from concurrent import futures
import customer_pb2
import customer_pb2_grpc

class CustomerServicer(customer_pb2_grpc.CustomerServiceServicer):
    
    def GetCustomer(self, request, context):
        return customer_pb2.Customer(
            customer_id=request.customer_id,
            name="John Doe",
            balance=5000.00
        )
    
    def UpdateBalance(self, request, context):
        print(f"Updating balance for {request.customer_id} " +
              f"to {request.new_balance}")
        return customer_pb2.UpdateBalanceResponse(success=True)
    
    def ListCustomers(self, request, context):
        customers = [
            customer_pb2.Customer(customer_id=1, name="Alice", balance=1000),
            customer_pb2.Customer(customer_id=2, name="Bob", balance=2000),
        ]
        return customer_pb2.CustomerList(customers=customers)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    customer_pb2_grpc.add_CustomerServiceServicer_to_server(
        CustomerServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

I learned that binary protocols offer significant efficiency gains. JSON is human-readable and convenient for debugging, but in high-performance scenarios, binary protocols like Protocol Buffers reduce payload size and serialization overhead.

Serverless and Lambda: Functions as a Service

Around 2015, AWS Lambda introduced serverless computing where you wrote functions, and AWS handled all the infrastructure:

// Lambda function (Node.js)
exports.handler = async (event) => {
    const customerId = event.queryStringParameters.customerId;
    
    // Query DynamoDB
    const AWS = require('aws-sdk');
    const dynamodb = new AWS.DynamoDB.DocumentClient();
    
    const result = await dynamodb.get({
        TableName: 'Customers',
        Key: { customerId: customerId }
    }).promise();
    
    if (result.Item) {
        return {
            statusCode: 200,
            body: JSON.stringify(result.Item)
        };
    } else {
        return {
            statusCode: 404,
            body: JSON.stringify({ error: 'Customer not found' })
        };
    }
};

Serverless was powerful with no servers to manage, automatic scaling, pay-per-invocation pricing. It felt like the Actor model I’d worked for my research that offered small, stateless, event-driven functions.

However, I also encountered several problems with serverless:

  • Cold starts: First invocation could be slow (though it has improved with recent updates)
  • Timeouts: Functions had maximum execution time (15 minutes for Lambda)
  • State management: Functions were stateless; you needed external state stores
  • Orchestration: Coordinating multiple functions was complex

The ping-pong anti-pattern emerged where Lambda A calls Lambda B, which calls Lambda C, which calls Lambda D. This created hard-to-debug systems with unpredictable costs. AWS Step Functions and Azure Durable Functions addressed orchestration:

{
  "Comment": "Order processing workflow",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ValidateOrder",
      "Next": "CheckInventory"
    },
    "CheckInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:CheckInventory",
      "Next": "ChargeCustomer"
    },
    "ChargeCustomer": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ChargeCustomer",
      "Catch": [{
        "ErrorEquals": ["PaymentError"],
        "Next": "PaymentFailed"
      }],
      "Next": "ShipOrder"
    },
    "ShipOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:123456789012:function:ShipOrder",
      "End": true
    },
    "PaymentFailed": {
      "Type": "Fail",
      "Cause": "Payment processing failed"
    }
  }
}

gRPC: Modern RPC

In early 2020s, I started using gRPC extensively. It combined the best ideas from decades of RPC evolution:

  • Protocol Buffers for IDL
  • HTTP/2 for transport (multiplexing, header compression, flow control)
  • Strong typing with code generation
  • Streaming support (unary, server streaming, client streaming, bidirectional)

Here’s a gRPC service definition:

syntax = "proto3";

package customer;

service CustomerService {
    rpc GetCustomer(GetCustomerRequest) returns (Customer);
    rpc UpdateCustomer(Customer) returns (UpdateResponse);
    rpc StreamOrders(StreamOrdersRequest) returns (stream Order);
    rpc BidirectionalChat(stream ChatMessage) returns (stream ChatMessage);
}

message Customer {
    int32 customer_id = 1;
    string name = 2;
    double balance = 3;
}

message GetCustomerRequest {
    int32 customer_id = 1;
}

message UpdateResponse {
    bool success = 1;
    string message = 2;
}

message StreamOrdersRequest {
    int32 customer_id = 1;
}

message Order {
    int32 order_id = 1;
    double amount = 2;
    string status = 3;
}

message ChatMessage {
    string user = 1;
    string message = 2;
    int64 timestamp = 3;
}

Go server implementation:

package main

import (
    "context"
    "fmt"
    "log"
    "net"
    "time"
    
    "google.golang.org/grpc"
    pb "example.com/customer"
)

type server struct {
    pb.UnimplementedCustomerServiceServer
}

func (s *server) GetCustomer(ctx context.Context, req *pb.GetCustomerRequest) (*pb.Customer, error) {
    return &pb.Customer{
        CustomerId: req.CustomerId,
        Name:       "John Doe",
        Balance:    5000.00,
    }, nil
}

func (s *server) UpdateCustomer(ctx context.Context, customer *pb.Customer) (*pb.UpdateResponse, error) {
    log.Printf("Updating customer %d", customer.CustomerId)
    
    return &pb.UpdateResponse{
        Success: true,
        Message: "Customer updated successfully",
    }, nil
}

func (s *server) StreamOrders(req *pb.StreamOrdersRequest, stream pb.CustomerService_StreamOrdersServer) error {
    orders := []*pb.Order{
        {OrderId: 1, Amount: 99.99, Status: "shipped"},
        {OrderId: 2, Amount: 149.50, Status: "processing"},
        {OrderId: 3, Amount: 75.25, Status: "delivered"},
    }
    
    for _, order := range orders {
        if err := stream.Send(order); err != nil {
            return err
        }
        time.Sleep(time.Second)  // Simulate delay
    }
    
    return nil
}

func (s *server) BidirectionalChat(stream pb.CustomerService_BidirectionalChatServer) error {
    for {
        msg, err := stream.Recv()
        if err != nil {
            return err
        }
        
        log.Printf("Received: %s from %s", msg.Message, msg.User)
        
        // Echo back with server prefix
        response := &pb.ChatMessage{
            User:      "Server",
            Message:   fmt.Sprintf("Echo: %s", msg.Message),
            Timestamp: time.Now().Unix(),
        }
        
        if err := stream.Send(response); err != nil {
            return err
        }
    }
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    
    s := grpc.NewServer()
    pb.RegisterCustomerServiceServer(s, &server{})
    
    log.Println("Server listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

Go client:

package main

import (
    "context"
    "io"
    "log"
    "time"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    pb "example.com/customer"
)

func main() {
    conn, err := grpc.Dial("localhost:50051", 
        grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("Failed to connect: %v", err)
    }
    defer conn.Close()
    
    client := pb.NewCustomerServiceClient(conn)
    ctx := context.Background()
    
    // Unary call
    customer, err := client.GetCustomer(ctx, &pb.GetCustomerRequest{
        CustomerId: 12345,
    })
    if err != nil {
        log.Fatalf("GetCustomer failed: %v", err)
    }
    log.Printf("Customer: %v", customer)
    
    // Server streaming
    stream, err := client.StreamOrders(ctx, &pb.StreamOrdersRequest{
        CustomerId: 12345,
    })
    if err != nil {
        log.Fatalf("StreamOrders failed: %v", err)
    }
    
    for {
        order, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("Receive error: %v", err)
        }
        log.Printf("Order: %v", order)
    }
}

The Load Balancing Challenge

gRPC had one major gotcha in Kubernetes: connection persistence breaks load balancing. I documented this exhaustively in my blog post The Complete Guide to gRPC Load Balancing in Kubernetes and Istio. HTTP/2 multiplexes multiple requests over a single TCP connection. Once that connection is established to one pod, all requests go there. Kubernetes Service load balancing happens at L4 (TCP), so it doesn’t see individual gRPC calls and it only sees one connection. I used Istio’s Envoy sidecar, which operates at L7 and routes each gRPC call independently:

apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: grpc-service
spec:
  host: grpc-service
  trafficPolicy:
    connectionPool:
      http:
        http2MaxRequests: 100
        maxRequestsPerConnection: 10  # Force connection rotation
    loadBalancer:
      simple: LEAST_REQUEST  # Better than ROUND_ROBIN
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s

I learned that modern protocols solve old problems but introduce new ones. gRPC is excellent, but you must understand how it interacts with infrastructure. Production systems require deep integration between application protocol and deployment environment.

Modern Messaging and Streaming

I have been using Apache Kafka for many years that transformed how we think about data. It’s not just a message queue instead it’s a distributed commit log:

from kafka import KafkaProducer, KafkaConsumer
import json

# Producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

order = {
    'order_id': '12345',
    'customer_id': '67890',
    'amount': 99.99,
    'timestamp': time.time()
}

producer.send('orders', value=order)
producer.flush()

# Consumer
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='order-processors'
)

for message in consumer:
    order = message.value
    print(f"Processing order: {order['order_id']}")
    # Process order

Kafka’s provided:

  • Durability: Messages are persisted to disk
  • Replayability: Consumers can reprocess historical events
  • Partitioning: Horizontal scalability through partitions
  • Consumer groups: Multiple consumers can process in parallel

Key Lesson: Event-driven architectures enable loose coupling and temporal decoupling. Systems can be rebuilt from the event log. This is Event Sourcing—a powerful pattern that Kafka makes practical at scale.

Agentic RPC: MCP and Agent-to-Agent Protocol

Over the last year, I have been building Agentic AI applications using Model Context Protocol (MCP) and more recently Agent-to-Agent (A2A) protocol. Both use JSON-RPC 2.0 underneath. After decades of RPC evolution, from Sun RPC to CORBA to gRPC, we’ve come full circle to JSON-RPC for AI agents. I recently built a daily minutes assistant that aggregates information from multiple sources into a morning briefing. After decades of RPC evolution, from Sun RPC to CORBA to gRPC, it has come full circle to JSON-RPC for AI agents.

Service Discovery

A2A immediately reminded me of Sun’s Network Information Service (NIS), originally called Yellow Pages that I used in early 1990s. NIS provided a centralized directory service for Unix systems to look up user accounts, host names, and configuration data across a network. I saw this pattern repeated throughout the decades:

  • CORBA Naming Service (1990s): Objects registered themselves with a hierarchical naming service, and clients discovered them by name
  • JINI (late 1990s): Services advertised themselves via multicast, and clients discovered them through lookup registrars (as I described earlier in the JINI section)
  • UDDI (2000s): Universal Description, Discovery, and Integration for web services—a registry where SOAP services could be published and discovered
  • Consul, Eureka, etcd (2010s): Modern service discovery for microservices
  • Kubernetes DNS/Service Discovery (2010s-present): Built-in service registry and DNS-based discovery

Model Context Protocol (MCP)

MCP lets AI agents discover and invoke tools provided by servers. I recently built a daily minutes assistant that aggregates information from multiple sources into a morning briefing. Here’s the MCP server that exposes tools to the AI agent:

from mcp.server import Server
import mcp.types as types
from typing import Any
import asyncio

class DailyMinutesServer:
    def __init__(self):
        self.server = Server("daily-minutes")
        self.setup_handlers()
        
    def setup_handlers(self):
        @self.server.list_tools()
        async def handle_list_tools() -> list[types.Tool]:
            return [
                types.Tool(
                    name="get_emails",
                    description="Fetch recent emails from inbox",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "hours": {
                                "type": "number",
                                "description": "Hours to look back"
                            },
                            "limit": {
                                "type": "number", 
                                "description": "Max emails to fetch"
                            }
                        }
                    }
                ),
                types.Tool(
                    name="get_hackernews",
                    description="Fetch top Hacker News stories",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "limit": {
                                "type": "number",
                                "description": "Number of stories"
                            }
                        }
                    }
                ),
                types.Tool(
                    name="get_rss_feeds",
                    description="Fetch latest RSS feed items",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "feed_urls": {
                                "type": "array",
                                "items": {"type": "string"}
                            }
                        }
                    }
                ),
                types.Tool(
                    name="get_weather",
                    description="Get current weather forecast",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "location": {"type": "string"}
                        }
                    }
                )
            ]
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str, 
            arguments: dict[str, Any]
        ) -> list[types.TextContent]:
            if name == "get_emails":
                result = await email_connector.fetch_recent(
                    hours=arguments.get("hours", 24),
                    limit=arguments.get("limit", 10)
                )
            elif name == "get_hackernews":
                result = await hn_connector.fetch_top_stories(
                    limit=arguments.get("limit", 10)
                )
            elif name == "get_rss_feeds":
                result = await rss_connector.fetch_feeds(
                    feed_urls=arguments["feed_urls"]
                )
            elif name == "get_weather":
                result = await weather_connector.get_forecast(
                    location=arguments["location"]
                )
            else:
                raise ValueError(f"Unknown tool: {name}")
            
            return [types.TextContent(
                type="text",
                text=json.dumps(result, indent=2)
            )]

Each connector is a simple async module. Here’s the Hacker News connector:

import aiohttp
from typing import List, Dict

class HackerNewsConnector:
    BASE_URL = "https://hacker-news.firebaseio.com/v0"
    
    async def fetch_top_stories(self, limit: int = 10) -> List[Dict]:
        async with aiohttp.ClientSession() as session:
            # Get top story IDs
            async with session.get(f"{self.BASE_URL}/topstories.json") as resp:
                story_ids = await resp.json()
            
            # Fetch details for top N stories
            stories = []
            for story_id in story_ids[:limit]:
                async with session.get(
                    f"{self.BASE_URL}/item/{story_id}.json"
                ) as resp:
                    story = await resp.json()
                    stories.append({
                        "title": story.get("title"),
                        "url": story.get("url"),
                        "score": story.get("score"),
                        "by": story.get("by"),
                        "time": story.get("time")
                    })
            
            return stories

RSS and weather connectors follow the same pattern—simple, focused modules that the MCP server orchestrates.

JSON-RPC Under the Hood

MCP is that it’s just JSON-RPC 2.0 over stdio or HTTP. Here’s what a tool call looks like on the wire:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "get_emails",
    "arguments": {
      "hours": 12,
      "limit": 5
    }
  }
}

Response:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "[{\"from\": \"john@example.com\", \"subject\": \"Q4 Review\", ...}]"
      }
    ]
  }
}

After using Sun RPC, CORBA, SOAP, and gRPC, I appreciate MCP’s simplicity. It solves a specific problem: letting AI agents discover and invoke tools.

The Agent Workflow

My daily minutes agent follows this workflow:

  1. Agent calls get_emails to fetch recent messages
  2. Agent calls get_hackernews for tech news
  3. Agent calls get_rss_feeds for blog updates
  4. Agent calls get_weather for local forecast
  5. Agent synthesizes everything into a concise morning briefing

The AI decides which tools to call, in what order, based on the user’s preferences. I don’t hardcode the workflow.

Agent-to-Agent Protocol (A2A)

While MCP focuses on tool calling, A2A addresses agent-to-agent discovery and communication. It’s the modern equivalent of NIS/Yellow Pages for agents. Agents register their capabilities in a directory, and other agents discover and invoke them. A2A also uses JSON-RPC 2.0, but adds a discovery layer. Here’s how an agent registers itself:

from a2a import Agent, Capability

class ResearchAgent(Agent):
    def __init__(self):
        super().__init__(
            agent_id="research-agent-01",
            name="Research Agent",
            description="Performs web research and summarization"
        )
        
        # Register capabilities
        self.register_capability(
            Capability(
                name="web_search",
                description="Search the web for information",
                input_schema={
                    "type": "object",
                    "properties": {
                        "query": {"type": "string"},
                        "max_results": {"type": "integer", "default": 10}
                    },
                    "required": ["query"]
                },
                output_schema={
                    "type": "object",
                    "properties": {
                        "results": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "title": {"type": "string"},
                                    "url": {"type": "string"},
                                    "snippet": {"type": "string"}
                                }
                            }
                        }
                    }
                }
            )
        )
    
    async def handle_request(self, capability: str, params: dict):
        if capability == "web_search":
            return await self.perform_web_search(
                query=params["query"],
                max_results=params.get("max_results", 10)
            )
    
    async def perform_web_search(self, query: str, max_results: int):
        # Actual search implementation
        results = await search_engine.search(query, limit=max_results)
        return {"results": results}

Another agent discovers and invokes the research agent:

class CoordinatorAgent(Agent):
    def __init__(self):
        super().__init__(
            agent_id="coordinator-01",
            name="Coordinator Agent"
        )
        self.directory = AgentDirectory()
    
    async def research_topic(self, topic: str):
        # Discover agents with web_search capability
        agents = await self.directory.find_agents_with_capability("web_search")
        
        if not agents:
            raise Exception("No research agents available")
        
        # Select an agent (load balancing, availability, etc.)
        research_agent = agents[0]
        
        # Invoke the capability via JSON-RPC
        result = await research_agent.invoke(
            capability="web_search",
            params={
                "query": topic,
                "max_results": 20
            }
        )
        
        return result

The JSON-RPC exchange looks like this:

Discovery request:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "directory.find_agents",
  "params": {
    "capability": "web_search",
    "filters": {
      "availability": "online"
    }
  }
}

Discovery response:

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "agents": [
      {
        "agent_id": "research-agent-01",
        "name": "Research Agent",
        "endpoint": "http://agent-service:8080/rpc",
        "capabilities": ["web_search"],
        "metadata": {
          "load": 0.3,
          "response_time_ms": 150
        }
      }
    ]
  }
}

The Security Problem

Though, I appreciate the simplicity of MCP and A2A but here’s what worries me: both protocols largely ignore decades of hard-won lessons about security. The Salesloft breach in 2024 showed exactly what happens: their AI chatbot stored authentication tokens for hundreds of services. MCP and A2A give us standard protocols for tool calling and agent coordination, which is valuable. But they create a false sense of security while ignoring fundamentals we solved decades ago:

  • Authentication: How do we verify an agent’s identity?
  • Authorization: What capabilities should this agent have access to?
  • Credential rotation: How do we handle token expiration and renewal?
  • Observability: How do we trace agent interactions for debugging and auditing?
  • Principle of least privilege: How do we ensure agents only access what they need?
  • Rate limiting: How do we prevent a misbehaving agent from overwhelming services?

The community needs to address this before A2A and MCP see widespread enterprise adoption.

Lessons Learned

1. Complexity is the Enemy

Every failed technology I’ve used failed because of complexity. CORBA, SOAP, EJB—they all collapsed under their own weight. Successful technologies like REST, gRPC, Kafka focused on doing one thing well.

Implication: Be suspicious of solutions that try to solve every problem. Prefer composable, focused tools.

2. Network Calls Are Expensive

The first Fallacy of Distributed Computing haunts us still: The network is not reliable. It’s also not zero latency, infinite bandwidth, or secure. I’ve watched this lesson be relearned in every generation:

  • EJB entity beans made chatty network calls
  • Microservices make chatty REST calls
  • GraphQL makes chatty database queries

Implication: Design APIs to minimize round trips. Batch operations. Cache aggressively. Monitor network latency religiously. (See my blog on fault tolerance in microservices for details.)

3. Statelessness Scales

Stateless services scale horizontally. But real applications need state—session data, shopping carts, user preferences. The solution isn’t to make services stateful instead it’s to externalize state:

  • Session stores (Redis, Memcached)
  • Databases (PostgreSQL, DynamoDB)
  • Event logs (Kafka)
  • Distributed caches

Implication: Keep service logic stateless. Push state to specialized systems designed for it.

4. The Actor Model Is Underappreciated

My research with actors and Linda memory model convinced me that the Actor model simplifies concurrent and distributed systems. Today’s serverless functions are essentially actors. Frameworks like Akka, Orleans, and Dapr embrace it. Actors eliminate shared mutable shared state, which the source of most concurrency bugs.

Implication: For event-driven systems, consider Actor-based frameworks. They map naturally to distributed problems.

5. Observability

Modern distributed systems require extensive instrumentation. You need:

  • Structured logging with correlation IDs
  • Metrics for performance and health
  • Distributed tracing to follow requests across services
  • Alarms with proper thresholds

Implication: Instrument your services from day one. Observability is infrastructure, not a nice-to-have. (See my blog posts on fault tolerance and load shedding for specific metrics.)

6. Throttling and Load Shedding

Every production system eventually faces traffic spikes or DDoS attacks. Without throttling and load shedding, your system will collapse. Key techniques:

  • Rate limiting by client/user/IP
  • Admission control based on queue depth
  • Circuit breakers to fail fast
  • Backpressure to slow down producers

Implication: Build throttling and load shedding into your architecture early. They’re harder to retrofit. (See my comprehensive blog post on this topic.)

7. Idempotency

Network failures mean requests may be retried. If your operations aren’t idempotent, you’ll process payments twice, create duplicate orders, and corrupt data (See my blog on idempotency topic). Make operations idempotent:

  • Use idempotency keys
  • Check if operation already succeeded
  • Design APIs to be safely retryable

Implication: Every non-read operation should be idempotent. It saves you from a world of hurt.

8. External and Internal APIs Should Differ

I have learned that external APIs need a good UX and developer empathy so that APIs are intuitive, consistent, well-documented. Internal APIs can optimize for performance, reliability, and operational needs. Don’t expose your internal architecture to external consumers. Use API gateways to translate between external contracts and internal services.

Implication: Design external APIs for developers using them. Design internal APIs for operational excellence.

9. Standards Beat Proprietary Solutions

Novell IPX failed because it was proprietary. Sun RPC succeeded as an open standard. REST thrived because it built on HTTP. gRPC uses open standards (HTTP/2, Protocol Buffers).

Implication: Prefer open standards. If you must use proprietary tech, understand the exit strategy.

10. Developer Experience Matters

Technologies with great developer experience get adopted. Java succeeded because it was easier than C++. REST beat SOAP because it was simpler. Kubernetes won because it offered a powerful abstraction.

Implication: Invest in developer tools, documentation, and ergonomics. Friction kills momentum.

Upcoming Trends

WebAssembly: The Next Runtime

WebAssembly (Wasm) is emerging as a universal runtime. Code written in Rust, Go, C, or AssemblyScript compiles to Wasm and runs anywhere. Platforms like wasmCloud, Fermyon, and Lunatic are building Actor-based systems on Wasm. Combined with the Component Model and WASI (WebAssembly System Interface), Wasm offers near-native performance, strong sandboxing, and portability. It might replace Docker containers for some workloads. Solomon Hykes, creator of Docker, famously said:

“If WASM+WASI existed in 2008, we wouldn’t have needed to create Docker. That’s how important it is. WebAssembly on the server is the future of computing. A standardized system interface was the missing link. Let’s hope WASI is up to the task!” — Solomon Hykes, March 2019

WebAssembly isn’t ready yet. Critical gaps:

  • WASI maturity: Still evolving (Preview 2 in development)
  • Async I/O: Limited compared to native runtimes
  • Database drivers: Many don’t support WASM
  • Networking: WASI sockets still experimental
  • Ecosystem tooling: Debugging, profiling still primitive

Service Meshes

Istio, Linkerd, Dapr move cross-cutting concerns out of application code:

  • Authentication/authorization
  • Rate limiting
  • Circuit breaking
  • Retries with exponential backoff
  • Distributed tracing
  • Metrics collection

Tradeoff: Complexity shifts from application code to infrastructure. Teams need deep Kubernetes and service mesh expertise.

The Edge Is Growing

Edge computing brings computation closer to users. CDNs like Cloudflare Workers and Fastly Compute@Edge run code globally with single-digit millisecond latency. This requires new thinking like eventual consistency, CRDTs (Conflict-free Replicated Data Types), and geo-distributed state management.

AI Agents and Multi-Agent Systems

I’m currently building agentic AI systems using LangGraph, RAG, and MCP. These are inherently distributed and agents communicate asynchronously, maintain local state, and coordinate through message passing. It’s the Actor model again.

What’s Missing

Despite all this progress, we still struggle with:

  • Distributed transactions: Two-phase commit doesn’t scale; SAGA patterns are complex
  • Testing distributed systems: Mocking services, simulating failures, and reproducing production bugs remain hard. I have written a number of tools for mock testing.
  • Observability at scale: Tracing millions of requests generates too much data
  • Cost management: Cloud bills spiral as systems grow
  • Cognitive load: Modern systems require expertise in dozens of technologies

Conclusion

I’ve been writing network code for decades and have used dozens of protocols, frameworks, and paradigms. Here is what I have learned:

  • Simplicity beats complexity (SOAP died, REST thrived)
  • Network calls aren’t free (EJB entity beans, chatty microservices)
  • State is hard; externalize it (Erlang, serverless functions)
  • Observability is essential (You can’t fix what you can’t see)
  • Developer experience matters (Java beat C++, REST beat SOAP)
  • Make It Work, Then Make It Fast
  • Design for Failure from Day One (Systems built with circuit breakers, retries, timeouts, and graceful degradation from the start).

Other tips from evolution of remote services include:

  • Design systems as message-passing actors from the start. Whether that’s Erlang processes, Akka actors, Orleans grains, or Lambda functions—embrace isolated state and message passing.
  • Invest in Observability with structured logging with correlation IDs, instrumented metrics, distributed tracing and alarms.
  • Separate External and Internal APIs. Use REST or GraphQL for external APIs (with versioning) and use gRPC or Thrift for internal communication (efficient).
  • Build Throttling and Load Shedding by rate limiting by client/user/IP at the edge and implement admission control at the service level (See my blog on Effective Load Shedding and Throttling).
  • Make Everything Idempotent as networks fail and requests get retried. Use idempotency keys for all mutations.
  • Choose Boring Technology (See Choose Boring Technology). For your core infrastructure, use proven tech (PostgreSQL, Redis, Kafka).
  • Test for Failure. Most code only handles the happy path. Production is all about unhappy paths.
  • Learn about the Fallacies of Distributed Computing and read A Note on Distributed Computing (1994).
  • Make chaos engineering part of CI/CD and use property-based testing (See my blog on property-based testing).

The technologies change like mainframes to serverless, Assembly to Go, CICS to Kubernetes. But the underlying principles remain constant. We oscillate between extremes:

  • Monoliths -> Microservices -> (now) Modular Monoliths
  • Strongly typed IDLs (CORBA) -> Untyped JSON -> Strongly typed again (gRPC)
  • Centralized -> Distributed -> Edge -> (soon) Peer-to-peer?
  • Synchronous RPC -> Asynchronous messaging -> Reactive streams

Each swing teaches us something. CORBA was too complex, but IDL-first design is valuable. REST was liberating, but binary protocols are more efficient. Microservices enable agility, but operational complexity explodes. The sweet spot is usually in the middle. Modular monoliths with clear boundaries. REST for external APIs, gRPC for internal communication. Some synchronous calls, some async messaging.

Here are a few trends that I see becoming prevalent:

  1. WebAssembly may replace containers for some workloads: Faster startup, better security with platforms like wasmCloud and Fermyon.
  2. Service meshes are becoming invisible: Currently they are too complex. Ambient mesh (no sidecars) and eBPF-based routing are gaining wider adoption.
  3. The Actor model will eat the world: Serverless functions are actors and durable functions are actor orchestration.
  4. Edge computing will force new patterns: We can’t rely on centralized state and may need CRDTs and eventual consistency.
  5. AI agents will need distributed coordination. Multi-agent systems = distributed systems and may need message passing between agents.

The best engineers don’t just learn the latest framework, they study the history, understand the trade-offs, and recognize when old ideas solve new problems. The future of distributed systems won’t be built by inventing entirely new paradigms instead it’ll be built by taking the best ideas from the past, learning from the failures, and applying them with better tools.


Check out my other blog posts:

August 30, 2025

Bridging HTTP and gRPC: A Standardized Approach to Header Mapping in Microservices

Filed under: Computing,Web Services — admin @ 10:49 pm

Modern microservices architectures often require supporting both HTTP REST APIs and gRPC services simultaneously. While Google’s gRPC-Gateway provides HTTP and gRPC transcoding capabilities, the challenge of bidirectional header mapping between these protocols remains a common source of inconsistency, bugs, and maintenance overhead across services. This article explores the technical challenges of HTTP-gRPC header mapping, examines current approaches and their limitations, and presents a standardized middleware solution that addresses these issues.

Understanding gRPC AIP and HTTP/gRPC Transcoding

Google’s Application Programming Interface Improvement (AIP) standards define how to build consistent, intuitive APIs. For example, AIP-127: HTTP and gRPC Transcoding enables a single service implementation to serve both HTTP REST and gRPC traffic through protocol transcoding.

How gRPC-Gateway Transcoding Works

The gRPC-Gateway acts as a reverse proxy that translates HTTP requests into gRPC calls:

HTTP Client ? gRPC-Gateway ? gRPC Server
     ?              ?            ?
REST Request   Proto Message   gRPC Service

Following is the transcoding process:

  1. URL Path to RPC Method: HTTP paths map to gRPC service methods
  2. HTTP Body to Proto Message: JSON payloads become protobuf messages
  3. Query Parameters to Fields: URL parameters populate message fields
  4. HTTP Headers to gRPC Metadata: Headers become gRPC metadata key-value pairs

The Header Mapping Challenge

While gRPC-Gateway handles most transcoding automatically, header mapping requires explicit configuration. Consider this common scenario:

HTTP Request:

POST /v1/users
Authorization: Bearer abc123
X-Request-ID: req-456
X-User-Role: admin
Content-Type: application/json

Desired gRPC Metadata:

metadata.MD{
    "authorization": []string{"Bearer abc123"},
    "request-id":    []string{"req-456"}, 
    "user-role":     []string{"admin"},
}

Response Headers Needed:

X-Request-ID: req-456
X-Processing-Time: 150ms
X-Server-Version: v1.2.0

Without proper configuration, headers are lost, inconsistently mapped, or require custom code in each service.

Current Problems and Anti-Patterns

Problem 1: Fragmented Header Mapping Solutions

Most services implement header mapping ad-hoc:

// Service A approach
func (s *ServiceA) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    md, _ := metadata.FromIncomingContext(ctx)
    authHeader := md.Get("authorization")
    userID := md.Get("x-user-id")
    // ... custom mapping logic
}

// Service B approach  
func (s *ServiceB) GetOrder(ctx context.Context, req *pb.GetOrderRequest) (*pb.Order, error) {
    // Different header names, different extraction logic
    md, _ := metadata.FromIncomingContext(ctx)
    auth := md.Get("auth")  // Different from Service A!
    requestID := md.Get("request_id")  // Different format!
}

This leads to:

  • Inconsistent header naming across services
  • Duplicated mapping logic in every service
  • Maintenance burden when headers change
  • Testing complexity due to custom implementations

Problem 2: Context Abuse and Memory Issues

I have often observed misuse of Go’s context for storing large amounts of data that puts the service at risk of being killed due to OOM:

// ANTI-PATTERN: Storing large objects in context
type UserContext struct {
    User        *User           // Large user object
    Permissions []Permission    // Array of permissions  
    Preferences *UserPrefs      // User preferences
    AuditLog    []AuditEntry   // Historical data
}

func StoreUserInContext(ctx context.Context, user *UserContext) context.Context {
    return context.WithValue(ctx, "user", user)  // BAD: Large object in context
}

Why This Causes Problems:

  1. Memory Leaks: Contexts are passed through the entire request chain and may not be garbage collected promptly
  2. Performance Degradation: Large context objects increase allocation pressure
  3. Goroutine Overhead: Each concurrent request carries this memory burden
  4. Service Instability: Under load, memory usage can spike and cause OOM kills

Proper Pattern:

// GOOD: Store only identifiers in context  
func StoreUserIDInContext(ctx context.Context, userID string) context.Context {
    return context.WithValue(ctx, "user_id", userID)  // Small string only
}

// Fetch data when needed from database/cache
func GetUserFromContext(ctx context.Context) (*User, error) {
    userID := ctx.Value("user_id").(string)
    return userService.GetUser(userID)  // Fetch from datastore
}

Problem 3: Inconsistent Response Header Handling

Setting response headers requires different approaches across the stack:

// gRPC: Set headers via metadata
grpc.SendHeader(ctx, metadata.New(map[string]string{
    "x-server-version": "v1.2.0",
}))

// HTTP: Set headers on ResponseWriter  
w.Header().Set("X-Server-Version", "v1.2.0")

// gRPC-Gateway: Headers must be set in specific metadata format
grpc.SetHeader(ctx, metadata.New(map[string]string{
    "grpc-metadata-x-server-version": "v1.2.0",  // Prefix required
}))

This complexity leads to missing response headers and inconsistent client experiences.

Solution: Standardized Header Mapping Middleware

The solution is a dedicated middleware that handles bidirectional header mapping declaratively, allowing services to focus on business logic while ensuring consistent header handling across the entire API surface.

Core Architecture

HTTP Request ? Gateway Middleware ? gRPC Interceptor ? Service
     ?              ?                    ?              ?
HTTP Headers ? Metadata Annotation ? Context Metadata ? Business Logic
                                                         ?
HTTP Response ? Response Modifier ? Header Metadata ? Service Response

The middleware operates at two key points:

  1. Gateway Level: Maps HTTP headers to gRPC metadata for incoming requests
  2. Interceptor Level: Processes metadata and manages response header mapping

Configuration-Driven Approach

Instead of custom code, header mapping is configured declaratively:

mapper := headermapper.NewBuilder().
    // Authentication headers
    AddIncomingMapping("Authorization", "authorization").WithRequired(true).
    AddIncomingMapping("X-API-Key", "api-key").
    
    // Request tracking (bidirectional)  
    AddBidirectionalMapping("X-Request-ID", "request-id").
    AddBidirectionalMapping("X-Trace-ID", "trace-id").
    
    // Response headers
    AddOutgoingMapping("processing-time", "X-Processing-Time").
    AddOutgoingMapping("server-version", "X-Server-Version").
    
    // Transformations
    AddIncomingMapping("Authorization", "auth-token").
    WithTransform(headermapper.ChainTransforms(
        headermapper.TrimSpace,
        headermapper.RemovePrefix("Bearer "),
    )).
    
    Build()

This configuration drives all header mapping behavior without requiring service-specific code.

How The Middleware Works: Step-by-Step

Step 1: HTTP Request Processing

When an HTTP request arrives at the gRPC-Gateway:

POST /v1/users HTTP/1.1
Authorization: Bearer abc123
X-Request-ID: req-456
X-User-Role: admin
Content-Type: application/json

The MetadataAnnotator processes configured incoming mappings:

func (hm *HeaderMapper) MetadataAnnotator() func(context.Context, *http.Request) metadata.MD {
    return func(ctx context.Context, req *http.Request) metadata.MD {
        md := metadata.New(map[string]string{})
        
        for _, mapping := range hm.config.Mappings {
            if mapping.Direction == Outgoing {
                continue  // Skip outgoing-only mappings
            }
            
            headerValue := req.Header.Get(mapping.HTTPHeader)
            if headerValue != "" {
                // Apply transformations if configured
                if mapping.Transform != nil {
                    headerValue = mapping.Transform(headerValue)
                }
                md.Set(mapping.GRPCMetadata, headerValue)
            }
        }
        return md
    }
}

Result: HTTP headers become gRPC metadata:

metadata.MD{
    "authorization": []string{"Bearer abc123"},
    "auth-token":    []string{"abc123"},        // Transformed  
    "request-id":    []string{"req-456"},
    "user-role":     []string{"admin"},
}

Step 2: gRPC Interceptor Processing

The gRPC unary interceptor receives the enhanced context:

func (hm *HeaderMapper) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
        // Context already contains mapped metadata from Step 1
        
        // Call the actual service method
        resp, err := handler(ctx, req)
        
        // Response headers are handled by ResponseModifier
        return resp, err
    }
}

Step 3: Service Implementation

The service method accesses headers through standard gRPC metadata APIs:

func (s *UserService) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
    md, _ := metadata.FromIncomingContext(ctx)
    
    // Headers are consistently available
    authToken := getFirstValue(md, "auth-token")      // "abc123" (transformed)
    requestID := getFirstValue(md, "request-id")      // "req-456"  
    userRole := getFirstValue(md, "user-role")        // "admin"
    
    // Set response headers
    grpc.SetHeader(ctx, metadata.New(map[string]string{
        "processing-time": "150",
        "server-version": "v1.2.0",  
        "request-id": requestID,     // Echo back request ID
    }))
    
    return &pb.User{...}, nil
}

Step 4: Response Header Processing

The ResponseModifier maps gRPC metadata to HTTP response headers:

func (hm *HeaderMapper) ResponseModifier() func(context.Context, http.ResponseWriter, proto.Message) error {
    return func(ctx context.Context, w http.ResponseWriter, msg proto.Message) error {
        md, ok := runtime.ServerMetadataFromContext(ctx)
        if !ok {
            return nil
        }
        
        for _, mapping := range hm.config.Mappings {
            if mapping.Direction == Incoming {
                continue  // Skip incoming-only mappings  
            }
            
            values := md.HeaderMD.Get(mapping.GRPCMetadata)
            if len(values) > 0 {
                headerValue := values[0]
                
                // Apply transformations
                if mapping.Transform != nil {
                    headerValue = mapping.Transform(headerValue)  
                }
                
                w.Header().Set(mapping.HTTPHeader, headerValue)
            }
        }
        return nil
    }
}

Final HTTP Response:

HTTP/1.1 200 OK
X-Request-ID: req-456
X-Processing-Time: 150ms  
X-Server-Version: v1.2.0
Content-Type: application/json

{"user": {...}}

Advanced Features

Header Transformations

The middleware supports header value transformations:

// Extract JWT tokens
AddIncomingMapping("Authorization", "jwt-token").
WithTransform(headermapper.ChainTransforms(
    headermapper.TrimSpace,
    headermapper.RemovePrefix("Bearer "),
    headermapper.Truncate(100),  // Prevent large tokens
))

// Sanitize user agents
AddIncomingMapping("User-Agent", "client-info").  
WithTransform(headermapper.RegexReplace(`\d+\.\d+(\.\d+)*`, "x.x.x"))

// Format timestamps
AddOutgoingMapping("response-time", "X-Response-Time").
WithTransform(headermapper.AddSuffix("ms"))

Configuration from Files

For complex deployments, configuration can be externalized:

# header-mapping.yaml
mappings:
  - http_header: "Authorization"
    grpc_metadata: "authorization" 
    direction: 0  # Incoming
    required: true
    
  - http_header: "X-Request-ID"
    grpc_metadata: "request-id"
    direction: 2  # Bidirectional
    default_value: "auto-generated"

skip_paths:
  - "/health"
  - "/metrics"
  
debug: false
config, err := headermapper.LoadConfigFromFile("header-mapping.yaml")
if err != nil {
    log.Fatal("Failed to load config:", err)
}

mapper := headermapper.NewHeaderMapper(config)

Path-Based Filtering

Skip header processing for specific endpoints:

mapper := headermapper.NewBuilder().
    AddIncomingMapping("Authorization", "authorization").
    SkipPaths("/health", "/metrics", "/debug").  // No auth required
    Build()

Integration Guide

Basic Integration

package main

import (
    "github.com/your-org/grpc-header-mapper/headermapper"
    "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
)

func main() {
    // Create header mapper
    mapper := headermapper.NewBuilder().
        AddIncomingMapping("Authorization", "authorization").
        AddBidirectionalMapping("X-Request-ID", "request-id").
        Build()
    
    // Configure gRPC server
    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(mapper.UnaryServerInterceptor()),
    )
    
    // Configure HTTP gateway
    mux := headermapper.CreateGatewayMux(mapper)
    
    // Register services...
}

Production Deployment

func createProductionMapper() *headermapper.HeaderMapper {
    return headermapper.NewBuilder().
        // Authentication
        AddIncomingMapping("Authorization", "authorization").WithRequired(true).
        AddIncomingMapping("X-API-Key", "api-key").
        
        // Request correlation
        AddBidirectionalMapping("X-Request-ID", "request-id").
        AddBidirectionalMapping("X-Correlation-ID", "correlation-id"). 
        AddBidirectionalMapping("X-Trace-ID", "trace-id").
        
        // Client information
        AddIncomingMapping("User-Agent", "user-agent").
        AddIncomingMapping("X-Client-Version", "client-version").
        
        // Response headers
        AddOutgoingMapping("processing-time-ms", "X-Processing-Time").
        AddOutgoingMapping("server-version", "X-Server-Version").
        AddOutgoingMapping("rate-limit-remaining", "X-RateLimit-Remaining").
        
        // Security headers
        AddOutgoingMapping("content-security-policy", "Content-Security-Policy").
        WithDefault("default-src 'self'").
        
        // Skip system endpoints
        SkipPaths("/health", "/metrics", "/debug", "/admin").
        
        // Production settings
        Debug(false).
        OverwriteExisting(true).
        Build()
}

Performance and Reliability Benefits

Consistent Memory Usage

By standardizing header extraction and avoiding context abuse, services maintain predictable memory profiles:

// Before: Inconsistent, potentially large context values
ctx = context.WithValue(ctx, "user", largeUserObject)      // BAD
ctx = context.WithValue(ctx, "permissions", permissionList) // BAD

// After: Consistent, minimal context usage  
// Headers extracted to standard metadata, large objects fetched on-demand
func GetUserFromContext(ctx context.Context) (*User, error) {
    userID := getMetadata(ctx, "user-id")
    return userCache.Get(userID)  // Cached lookup
}

Reduced Code Duplication

Header mapping logic is centralized, eliminating per-service implementations:

Improved Observability

Consistent header handling enables better monitoring:

// All services automatically have request correlation
func (s *AnyService) AnyMethod(ctx context.Context, req *AnyRequest) (*AnyResponse, error) {
    requestID := getMetadata(ctx, "request-id")  // Always available
    log.WithField("request_id", requestID).Info("Processing request")
    
    // Business logic...
    
    return response, nil
}

Testing Benefits

Standardized header mapping simplifies integration testing:

func TestServiceWithHeaders(t *testing.T) {
    // Headers work consistently across all services
    client := pb.NewUserServiceClient(conn)
    
    ctx := metadata.NewOutgoingContext(context.Background(), metadata.New(map[string]string{
        "authorization": "Bearer test-token",
        "request-id":    "test-req-123",
    }))
    
    resp, err := client.CreateUser(ctx, &pb.CreateUserRequest{...})
    
    // Response headers are consistently available
    md, _ := metadata.FromIncomingContext(ctx)
    requestID := getMetadata(md, "request-id")  // "test-req-123"
}

Security Considerations

Header Validation

The middleware supports header validation and sanitization:

mapper := headermapper.NewBuilder().
    AddIncomingMapping("Authorization", "authorization").
    WithTransform(headermapper.ChainTransforms(
        headermapper.TrimSpace,
        headermapper.Truncate(512),  // Prevent oversized headers
        validateJWTFormat,           // Custom validation
    )).
    Build()

func validateJWTFormat(token string) string {
    if !strings.HasPrefix(token, "Bearer ") {
        return "invalid"  // Reject malformed tokens
    }
    return token
}

Sensitive Data Handling

Headers containing sensitive data can be masked in logs:

AddIncomingMapping("Authorization", "authorization").
WithTransform(headermapper.MaskSensitive(4)).  // Show first/last 4 chars

Rate Limiting Integration

Response headers can include rate limiting information:

AddOutgoingMapping("rate-limit-remaining", "X-RateLimit-Remaining").
AddOutgoingMapping("rate-limit-reset", "X-RateLimit-Reset").

Monitoring and Debugging

Debug Mode

Enable debug logging to verify header mapping:

mapper := headermapper.NewBuilder().
    Debug(true).  // Enable detailed logging
    Build()

mapper.SetLogger(customLogger)  // Use your logging framework

Debug Output:

[DEBUG] [HeaderMapper] Mapped incoming headers: map[authorization:[Bearer abc123] request-id:[req-456]]
[DEBUG] [HeaderMapper] Mapped outgoing headers to response  

Metrics Integration

The middleware can integrate with monitoring systems:

stats := mapper.GetStats()
prometheus.IncomingHeadersMappedCounter.Add(stats.IncomingMappings)
prometheus.OutgoingHeadersMappedCounter.Add(stats.OutgoingMappings)
prometheus.MappingErrorsCounter.Add(stats.FailedMappings)

Why This Matters

Microservices Consistency

In large microservices architectures, inconsistent header handling creates operational overhead:

  • Debugging becomes difficult when services use different header names
  • Client libraries must handle different header formats per service
  • Security policies cannot be uniformly enforced
  • Observability suffers from inconsistent request correlation

Standardized header mapping addresses these issues by ensuring consistency across the entire service mesh.

Developer Productivity

Developers spend significant time on infrastructure concerns rather than business logic. This middleware eliminates:

  • Boilerplate code for header extraction and response setting
  • Testing complexity around header handling edge cases
  • Documentation overhead for service-specific header requirements
  • Bug investigation related to missing or malformed headers

Operational Excellence

Standard header mapping enables:

  • Automated monitoring with consistent request correlation
  • Security scanning with predictable header formats
  • Performance analysis across service boundaries
  • Compliance auditing with standardized access logging

Conclusion

HTTP and gRPC transcoding is a powerful pattern for modern APIs, but header mapping complexity has been a persistent challenge. The gRPC Header Mapper middleware presented in this article provides a solution that enables true bidirectional header mapping between HTTP and gRPC protocols.

By providing a standardized, configuration-driven middleware solution available at github.com/bhatti/grpc-header-mapper, teams can:

  1. Eliminate inconsistencies across services with bidirectional header mapping
  2. Reduce maintenance burden through centralized configuration
  3. Improve reliability by avoiding context misuse and memory leaks
  4. Enhance developer productivity by removing boilerplate code
  5. Support complex transformations with built-in and custom transformation functions

The middleware’s bidirectional mapping capability means that headers flow seamlessly in both directions – HTTP requests to gRPC metadata for service processing, and gRPC metadata back to HTTP response headers for client consumption. This eliminates the common problem where request headers are available to services but response headers are lost or inconsistently handled.

The complete implementation, examples, and documentation are available at github.com/bhatti/grpc-header-mapper.

August 16, 2025

The Complete Guide to gRPC Load Balancing in Kubernetes and Istio

Filed under: Computing,Web Services — Tags: , — admin @ 12:05 pm

TL;DR – The Test Results Matrix

ConfigurationLoad BalancingWhy
Local gRPC? NoneSingle server instance
Kubernetes + gRPC? NoneConnection-level LB only
Kubernetes + Istio? PerfectL7 proxy with request-level LB
Client-side LB?? LimitedRequires multiple endpoints
kubectl port-forward + Istio? NoneBypasses service mesh

Complete test suite ?


Introduction: The gRPC Load Balancing Problem

When you deploy a gRPC service in Kubernetes with multiple replicas, you expect load balancing. You won’t get it. This guide tests every possible configuration to prove why, and shows exactly how to fix it. According to the official gRPC documentation:

“gRPC uses HTTP/2, which multiplexes multiple calls on a single TCP connection. This means that once the connection is established, all gRPC calls will go to the same backend.”


Complete Test Matrix

We’ll test 6 different configurations:

  1. Baseline: Local Testing (Single server)
  2. Kubernetes without Istio (Standard deployment)
  3. Kubernetes with Istio (Service mesh solution)
  4. Client-side Load Balancing (gRPC built-in)
  5. Advanced Connection Testing (Multiple connections)
  6. Real-time Monitoring (Live traffic analysis)

Prerequisites

git clone https://github.com/bhatti/grpc-lb-test
cd grpc-lb-test

# Build all components
make build

Test 1: Baseline – Local Testing

Purpose: Establish baseline behavior with a single server.

# Terminal 1: Start local server
./bin/server

# Terminal 2: Test with basic client
./bin/client -target localhost:50051 -requests 50

Expected Result:

? Load Distribution Results:
Server: unknown-1755316152
Pod: unknown (IP: unknown)
Requests: 50 (100.0%)
????????????????????
? Total servers hit: 1
?? WARNING: All requests went to a single server!
This indicates NO load balancing is happening.

Analysis: This confirms our client implementation works correctly and establishes the baseline.


Test 2: Kubernetes Without Istio

Purpose: Prove that standard Kubernetes doesn’t provide gRPC request-level load balancing.

Deploy the Service

# Deploy 5 replicas without Istio
./scripts/test-without-istio.sh

The k8s/without-istio/deployment.yaml creates:

  • 5 gRPC server replicas
  • Standard Kubernetes Service
  • No Istio annotations

Test Results

???? Load Distribution Results:
================================
Server: grpc-echo-server-5b657689db-gh5z5-1755316388
  Pod: grpc-echo-server-5b657689db-gh5z5 (IP: 10.1.4.148)
  Requests: 30 (100.0%)
  ????????????????????

???? Total servers hit: 1
??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.

???? Connection Analysis:
Without Istio, gRPC maintains a single TCP connection to the Kubernetes Service IP.
The kube-proxy performs L4 load balancing, but gRPC reuses the same connection.

???? Cleaning up...
deployment.apps "grpc-echo-server" deleted
service "grpc-echo-service" deleted
./scripts/test-without-istio.sh: line 57: 17836 Terminated: 15   
kubectl port-forward service/grpc-echo-service 50051:50051 > /dev/null 2>&1

??  RESULT: No load balancing observed - all requests went to single pod!

Why This Happens

The Kubernetes Service documentation explains:

“For each Service, kube-proxy installs iptables rules which capture traffic to the Service’s clusterIP and port, and redirect that traffic to one of the Service’s backend endpoints.”

Kubernetes Services perform L4 (connection-level) load balancing, but gRPC maintains persistent connections.

Connection Analysis

Run the analysis tool to see connection behavior:

./bin/analysis -target localhost:50051 -requests 100 -test-scenarios true

Result:

? NO LOAD BALANCING: All requests to single server

???? Connection Reuse Analysis:
  Average requests per connection: 1.00
  ??  Low connection reuse (many short connections)

? Connection analysis complete!

Test 3: Kubernetes With Istio

Purpose: Demonstrate how Istio’s L7 proxy solves the load balancing problem.

Install Istio

./scripts/install-istio.sh

This follows Istio’s official installation guide:

istioctl install --set profile=demo -y
kubectl label namespace default istio-injection=enabled

Deploy With Istio

./scripts/test-with-istio.sh

The k8s/with-istio/deployment.yaml includes:

annotations:
  sidecar.istio.io/inject: "true"
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: grpc-echo-service
spec:
  host: grpc-echo-service
  trafficPolicy:
    connectionPool:
      http:
        http2MaxRequests: 100
        maxRequestsPerConnection: 10
    loadBalancer:
      simple: ROUND_ROBIN

Critical Testing Gotcha

? Wrong way (what most people do):

kubectl port-forward service/grpc-echo-service 50051:50051
./bin/client -target localhost:50051 -requests 50
# Result: Still no load balancing!

According to Istio’s architecture docs, kubectl port-forward bypasses the Envoy sidecar proxy.

? Correct Testing Method

Test from inside the service mesh:

./scripts/test-with-istio.sh

Test Results With Istio

???? Load Distribution Results:
================================

Server: grpc-echo-server-579dfbc76b-m2v7x-1755357769
  Pod: grpc-echo-server-579dfbc76b-m2v7x (IP: 10.1.4.237)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-fkgkk-1755357769
  Pod: grpc-echo-server-579dfbc76b-fkgkk (IP: 10.1.4.240)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-bsjdv-1755357769
  Pod: grpc-echo-server-579dfbc76b-bsjdv (IP: 10.1.4.241)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-dw2m7-1755357770
  Pod: grpc-echo-server-579dfbc76b-dw2m7 (IP: 10.1.4.236)
  Requests: 10 (20.0%)
  ????????

Server: grpc-echo-server-579dfbc76b-x85jm-1755357769
  Pod: grpc-echo-server-579dfbc76b-x85jm (IP: 10.1.4.238)
  Requests: 10 (20.0%)
  ????????

???? Total unique servers: 5

? Load balancing detected across 5 servers!
   Expected requests per server: 10.0
   Distribution variance: 0.00

How Istio Solves This

From Istio’s traffic management documentation:

“Envoy proxies are deployed as sidecars to services, logically augmenting the services with traffic management capabilities… Envoy proxies are the only Istio components that interact with data plane traffic.”

Istio’s solution:

  1. Envoy sidecar intercepts all traffic
  2. Performs L7 (application-level) load balancing
  3. Maintains connection pools to all backends
  4. Routes each request independently

Test 4: Client-Side Load Balancing

Purpose: Test gRPC’s built-in client-side load balancing capabilities.

Standard Client-Side LB

./scripts/test-client-lb.sh

The cmd/client-lb/main.go implements gRPC’s native load balancing:

conn, err := grpc.Dial(
    "dns:///"+target,
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
)

Results and Limitations

 Load Distribution Results:
================================
Server: grpc-echo-server-5b657689db-g9pbw-1755359830
  Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
  Requests: 10 (100.0%)
  ????????????????????

???? Total servers hit: 1
??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.
? Normal client works - service is accessible

???? Test 2: Client-side round-robin (from inside cluster)
?????????????????????????????????????????????????????
Creating test pod inside cluster for proper DNS resolution...
pod "client-lb-test" deleted
./scripts/test-client-lb.sh: line 71: 48208 Terminated: 15          kubectl port-forward service/grpc-echo-service 50051:50051 > /dev/null 2>&1

??  Client-side LB limitation explanation:
   gRPC client-side round-robin expects multiple A records
   But Kubernetes Services return only one ClusterIP
   Result: 'no children to pick from' error

???? What happens with client-side LB:
   1. Client asks DNS for: grpc-echo-service
   2. DNS returns: 10.105.177.23 (single IP)
   3. gRPC round-robin needs: multiple IPs for load balancing
   4. Result: Error 'no children to pick from'

? This proves client-side LB doesn't work with K8s Services!

???? Test 3: Demonstrating the DNS limitation
?????????????????????????????????????????????
What gRPC client-side LB sees:
   Service name: grpc-echo-service:50051
   DNS resolution: 10.105.177.23:50051
   Available endpoints: 1 (needs multiple for round-robin)

What gRPC client-side LB needs:
   Multiple A records from DNS, like:
   grpc-echo-service ? 10.1.4.241:50051
   grpc-echo-service ? 10.1.4.240:50051
   grpc-echo-service ? 10.1.4.238:50051
   (But Kubernetes Services don't provide this)

???? Test 4: Alternative - Multiple connections
????????????????????????????????????????????
Testing alternative approach with multiple connections...

???? Configuration:
   Target: localhost:50052
   API: grpc.Dial
   Load Balancing: round-robin
   Multi-endpoint: true
   Requests: 20

???? Using multi-endpoint resolver

???? Sending 20 unary requests...

? Request 1 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 2 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 3 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 4 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 5 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 6 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 7 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 8 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 9 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 10 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
? Request 11 -> Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)

? Successful requests: 20/20

???? Load Distribution Results:
================================

Server: grpc-echo-server-5b657689db-g9pbw-1755359830
  Pod: grpc-echo-server-5b657689db-g9pbw (IP: 10.1.4.242)
  Requests: 20 (100.0%)
  ????????????????????????????????????????

???? Total unique servers: 1

??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.
   This is expected for gRPC without Istio or special configuration.
? Multi-connection approach works!
   (This simulates multiple endpoints for testing)

???????????????????????????????????????????????????????????????
                         SUMMARY
???????????????????????????????????????????????????????????????

? KEY FINDINGS:
   • Standard gRPC client: Works (uses single connection)
   • Client-side round-robin: Fails (needs multiple IPs)
   • Kubernetes DNS: Returns single ClusterIP only
   • Alternative: Multiple connections can work

???? CONCLUSION:
   Client-side load balancing doesn't work with standard
   Kubernetes Services because they provide only one IP address.
   This proves why Istio (L7 proxy) is needed for gRPC load balancing!

Why this fails: Kubernetes Services provide a single ClusterIP, not multiple IPs for DNS resolution.

From the gRPC load balancing documentation:

“The gRPC client will use the list of IP addresses returned by the name resolver and distribute RPCs among them.”

Alternative: Multiple Connections

Start five instances of servers with different ports:

# Terminal 1
GRPC_PORT=50051 ./bin/server

# Terminal 2  
GRPC_PORT=50052 ./bin/server

# Terminal 3
GRPC_PORT=50053 ./bin/server

# Terminal 4
GRPC_PORT=50054 ./bin/server

# Terminal 5
GRPC_PORT=50055 ./bin/server

The cmd/client-v2/main.go implements manual connection management:

./bin/client-v2 -target localhost:50051 -requests 50 -multi-endpoint

Results:

???? Load Distribution Results:
================================

Server: unknown-1755360953
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360963
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360970
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360980
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

Server: unknown-1755360945
  Pod: unknown (IP: unknown)
  Requests: 10 (20.0%)
  ????????

???? Total unique servers: 5

? Load balancing detected across 5 servers!
   Expected requests per server: 10.0
   Distribution variance: 0.00

Test 5: Advanced Connection Testing

Purpose: Analyze connection patterns and performance implications.

Multiple Connection Strategy

./bin/advanced-client \
  -target localhost:50051 \
  -requests 1000 \
  -clients 10 \
  -connections 5

Results:

???? Detailed Load Distribution Results:
=====================================
Test Duration: 48.303709ms
Total Requests: 1000
Failed Requests: 0
Requests/sec: 20702.34

Server Distribution:

Server: unknown-1755360945
  Pod: unknown (IP: unknown)
  Requests: 1000 (100.0%)
  First seen: 09:18:51.842
  Last seen: 09:18:51.874
  ????????????????????????????????????????

???? Analysis:
Total unique servers: 1
Average requests per server: 1000.00
Standard deviation: 0.00

??  WARNING: All requests went to a single server!
   This indicates NO load balancing is happening.
   This is expected behavior for gRPC without Istio.

Even sophisticated connection pooling can’t overcome the fundamental issue:
• Multiple connections to SAME endpoint = same server
• Advanced client techniques ? load balancing
• Connection management ? request distribution

Performance Comparison

./scripts/benchmark.sh

???? Key Insights:
• Single server: High performance, no load balancing
• Multiple connections: Same performance, still no LB
• Kubernetes: Small overhead, still no LB
• Istio: Small additional overhead, but enables LB
• Client-side LB: Complex setup, limited effectiveness


Official Documentation References

gRPC Load Balancing

From the official gRPC blog:

“Load balancing within gRPC happens on a per-call basis, not a per-connection basis. In other words, even if all requests come from a single client, we want to distribute them across all servers.”

The problem: Standard deployments don’t achieve per-call balancing.

Istio’s Solution

From Istio’s service mesh documentation:

“Istio’s data plane is composed of a set of intelligent proxies (Envoy) deployed as sidecars. These proxies mediate and control all network communication between microservices.”

Kubernetes Service Limitations

From Kubernetes networking concepts:

“kube-proxy… only supports TCP and UDP… doesn’t understand HTTP and doesn’t provide load balancing for HTTP requests.”


Complete Test Results Summary

After running comprehensive tests across all possible gRPC load balancing configurations, here are the definitive results that prove the fundamental limitations and solutions:

???? Core Test Matrix Results

ConfigurationLoad BalancingServers HitDistributionKey Insight
Local gRPC? None1/1 (100%)Single serverBaseline behavior confirmed
Kubernetes + gRPC? None1/5 (100%)Single podK8s Services don’t solve it
Kubernetes + Istio? Perfect5/5 (20% each)Even distributionIstio enables true LB
Client-side LB? Failed1/5 (100%)Single podDNS limitation fatal
kubectl port-forward + Istio? None1/5 (100%)Single podTesting methodology matters
Advanced multi-connection? None1/1 (100%)Single endpointComplex ? effective

???? Detailed Test Scenario Analysis

Scenario 1: Baseline Tests

Local single server:     ? PASS - 50 requests ? 1 server (100%)
Local multiple conn:     ? PASS - 1000 requests ? 1 server (100%)

Insight: Confirms gRPC’s connection persistence behavior. Multiple connections to same endpoint don’t change distribution.

Scenario 2: Kubernetes Standard Deployment

K8s without Istio:      ? PASS - 50 requests ? 1 pod (100%)
Expected behavior:      ? NO load balancing
Actual behavior:        ? NO load balancing  

Insight: Standard Kubernetes deployment with 5 replicas provides zero request-level load balancing for gRPC services.

Scenario 3: Istio Service Mesh

K8s with Istio (port-forward):  ??  BYPASS - 50 requests ? 1 pod (100%)
K8s with Istio (in-mesh):       ? SUCCESS - 50 requests ? 5 pods (20% each)

Insight: Istio provides perfect load balancing when tested correctly. Port-forward testing gives false negatives.

Scenario 4: Client-Side Approaches

DNS round-robin:        ? FAIL - "no children to pick from"
Multi-endpoint client:  ? PARTIAL - Works with manual endpoint management
Advanced connections:   ? FAIL - Still single endpoint limitation

Insight: Client-side solutions are complex, fragile, and limited in Kubernetes environments.

???? Deep Technical Analysis

The DNS Problem (Root Cause)

Our testing revealed the fundamental architectural issue:

# What Kubernetes provides
nslookup grpc-echo-service
? 10.105.177.23 (single ClusterIP)

# What gRPC client-side LB needs  
nslookup grpc-echo-service
? 10.1.4.241, 10.1.4.242, 10.1.4.243, 10.1.4.244, 10.1.4.245 (multiple IPs)

Impact: This single vs. multiple IP difference makes client-side load balancing architecturally impossible with standard Kubernetes Services.

Connection Persistence Evidence

Our advanced client test with 1000 requests, 10 concurrent clients, and 5 connections:

Test Duration: 48ms
Requests/sec: 20,702
Servers Hit: 1 (100%)
Connection Reuse: Perfect (efficient but unbalanced)

Conclusion: Even sophisticated connection management can’t overcome the single-endpoint limitation.

Istio’s L7 Magic

Comparing the same test scenario:

# Without Istio
50 requests ? grpc-echo-server-abc123 (100%)

# With Istio  
50 requests ? 5 different pods (20% each)
Distribution variance: 0.00 (perfect)

Technical Detail: Istio’s Envoy sidecar performs request-level routing, creating independent routing decisions for each gRPC call.

? Performance Impact Analysis

Based on our benchmark results:

ConfigurationReq/sOverheadLoad BalancingProduction Suitable
Local baseline~25,0000%None? Not scalable
K8s standard~22,00012%None? Unbalanced
K8s + Istio~20,00020%Perfect? Recommended
Client-side~23,0008%Complex?? Maintenance burden

Insight: Istio’s 20% performance overhead is a reasonable trade-off for enabling proper load balancing and gaining a production-ready service mesh.


Production Recommendations

For Development Teams:

  1. Standard Kubernetes deployment of gRPC services will not load balance
  2. Istio is the proven solution for production gRPC load balancing
  3. Client-side approaches add complexity without solving the fundamental issue
  4. Testing methodology critically affects results (avoid port-forward for Istio tests)

For Architecture Decisions:

  1. Plan for Istio if deploying multiple gRPC services
  2. Accept the 20% performance cost for operational benefits
  3. Avoid client-side load balancing in Kubernetes environments
  4. Use proper testing practices to validate service mesh behavior

For Production Readiness:

  1. Istio + DestinationRules provide enterprise-grade gRPC load balancing
  2. Monitoring and observability come built-in with Istio
  3. Circuit breaking and retry policies integrate seamlessly
  4. Zero client-side complexity reduces maintenance burden

???? Primary Recommendation: Istio Service Mesh

Our testing proves Istio is the only solution that provides reliable gRPC load balancing in Kubernetes:

# Production-tested DestinationRule configuration
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: grpc-service-production
spec:
  host: grpc-service
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http2MaxRequests: 1000
        maxRequestsPerConnection: 10  # Tested: Ensures request distribution
        connectTimeout: 30s
    loadBalancer:
      simple: LEAST_REQUEST  # Better than ROUND_ROBIN for varying request costs
    outlierDetection:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50

Why this configuration works:

  • maxRequestsPerConnection: 10 – Forces connection rotation (tested in our scenario)
  • LEAST_REQUEST – Better performance than round-robin for real workloads
  • outlierDetection – Automatic failure handling (something client-side LB can’t provide)

Expected results based on our testing:

  • ? Perfect 20% distribution across 5 replicas
  • ? ~20% performance overhead (trade-off worth it)
  • ? Built-in observability and monitoring
  • ? Zero client-side complexity

???? Configuration Best Practices

1. Enable Istio Injection Properly

# Enable for entire namespace (recommended)
kubectl label namespace production istio-injection=enabled

# Or per-deployment (more control)
metadata:
  annotations:
    sidecar.istio.io/inject: "true"

2. Validate Load Balancing is Working

# WRONG: This will show false negatives
kubectl port-forward service/grpc-service 50051:50051

# CORRECT: Test from inside the mesh
kubectl run test-client --rm -it --restart=Never \
  --image=your-grpc-client \
  --annotations="sidecar.istio.io/inject=true" \
  -- ./client -target grpc-service:50051 -requests 100

3. Monitor Distribution Quality

# Check Envoy stats for load balancing
kubectl exec deployment/grpc-service -c istio-proxy -- \
  curl localhost:15000/stats | grep upstream_rq_

?? What NOT to Do (Based on Our Test Failures)

1. Don’t Rely on Standard Kubernetes Services

# This WILL NOT load balance gRPC traffic
apiVersion: v1
kind: Service
metadata:
  name: grpc-service
spec:
  ports:
  - port: 50051
  selector:
    app: grpc-server
# Result: 100% traffic to single pod (proven in our tests)

2. Don’t Use Client-Side Load Balancing

// This approach FAILS in Kubernetes (tested and failed)
conn, err := grpc.Dial(
    "dns:///grpc-service:50051",
    grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
// Error: "no children to pick from" (proven in our tests)

3. Don’t Implement Complex Connection Pooling

// This adds complexity without solving the core issue
type LoadBalancedClient struct {
    conns []grpc.ClientConnInterface
    next  int64
}
// Still results in 100% traffic to single endpoint (proven in our tests)

???? Alternative Solutions (If Istio Not Available)

If you absolutely cannot use Istio, here are the only viable alternatives (with significant caveats):

Option 1: External Load Balancer with HTTP/2 Support

# Use nginx/envoy/haproxy outside Kubernetes
apiVersion: v1
kind: Service
metadata:
  name: grpc-service-lb
spec:
  type: LoadBalancer
  ports:
  - port: 50051
    targetPort: 50051

Limitations: Requires external infrastructure, loss of Kubernetes-native benefits

Option 2: Headless Service + Custom Service Discovery

apiVersion: v1
kind: Service
metadata:
  name: grpc-service-headless
spec:
  clusterIP: None  # Headless service
  ports:
  - port: 50051
  selector:
    app: grpc-server

Limitations: Complex client implementation, manual health checking


Conclusion

After testing every possible gRPC load balancing configuration in Kubernetes, the evidence is clear and definitive:

  • Standard Kubernetes + gRPC = Zero load balancing (100% traffic to single pod)
  • The problem is architectural, not implementation
  • Client-side solutions fail due to DNS limitations (“no children to pick from”)
  • Complex workarounds add overhead without solving the core issue

???? Istio is the Proven Solution

The evidence overwhelmingly supports Istio as the production solution:

  • ? Perfect load balancing: 20% distribution across 5 pods (0.00 variance)
  • ? Reasonable overhead: 20% performance cost for complete solution
  • ? Production features: Circuit breaking, retries, observability included
  • ? Zero client complexity: Works transparently with existing gRPC clients

???? Critical Testing Insight

Our testing revealed a major pitfall that leads to incorrect conclusions:

  • kubectl port-forward bypasses Istio ? false negative results
  • Most developers get wrong results when testing Istio + gRPC
  • Always test from inside the service mesh for accurate results

Full test suite and results ?

August 15, 2025

Building Robust Error Handling with gRPC and REST APIs

Filed under: Computing,Web Services — admin @ 2:23 pm

Introduction

Error handling is often an afterthought in API development, yet it’s one of the most critical aspects of a good developer experience. For example, a cryptic error message like { "error": "An error occurred" } can lead to hours of frustrating debugging. In this guide, we will build a robust, production-grade error handling framework for a Go application that serves both gRPC and a REST/HTTP proxy based on industry standards like RFC9457 (Problem Details for HTTP APIs) and RFC7807 (obsoleted).

Tenets

Following are tenets of a great API error:

  1. Structured: machine-readable, not just a string.
  2. Actionable: explains the developer why the error occurred and, if possible, how to fix it.
  3. Consistent: all errors, from validation to authentication to server faults, follow the same format.
  4. Secure: never leaks sensitive internal information like stack traces or database schemas.

Our North Star for HTTP errors will be the Problem Details for HTTP APIs (RFC 9457/7807):

{
  "type": "https://example.com/docs/errors/validation-failed",
  "title": "Validation Failed",
  "status": 400,
  "detail": "The request body failed validation.",
  "instance": "/v1/todos",
  "invalid_params": [
    {
      "field": "title",
      "reason": "must not be empty"
    }
  ]
}

We will adapt this model for gRPC by embedding a similar structure in the gRPC status details, creating a single source of truth for all errors.

API Design

Let’s start by defining our TODO API in Protocol Buffers:

syntax = "proto3";

package todo.v1;

import "google/api/annotations.proto";
import "google/api/field_behavior.proto";
import "google/api/resource.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/field_mask.proto";
import "buf/validate/validate.proto";

option go_package = "github.com/bhatti/todo-api-errors/api/proto/todo/v1;todo";

// TodoService provides task management operations
service TodoService {
  // CreateTask creates a new task
  rpc CreateTask(CreateTaskRequest) returns (Task) {
    option (google.api.http) = {
      post: "/v1/tasks"
      body: "*"
    };
  }

  // GetTask retrieves a specific task
  rpc GetTask(GetTaskRequest) returns (Task) {
    option (google.api.http) = {
      get: "/v1/{name=tasks/*}"
    };
  }

  // ListTasks retrieves all tasks
  rpc ListTasks(ListTasksRequest) returns (ListTasksResponse) {
    option (google.api.http) = {
      get: "/v1/tasks"
    };
  }

  // UpdateTask updates an existing task
  rpc UpdateTask(UpdateTaskRequest) returns (Task) {
    option (google.api.http) = {
      patch: "/v1/{task.name=tasks/*}"
      body: "task"
    };
  }

  // DeleteTask removes a task
  rpc DeleteTask(DeleteTaskRequest) returns (DeleteTaskResponse) {
    option (google.api.http) = {
      delete: "/v1/{name=tasks/*}"
    };
  }

  // BatchCreateTasks creates multiple tasks at once
  rpc BatchCreateTasks(BatchCreateTasksRequest) returns (BatchCreateTasksResponse) {
    option (google.api.http) = {
      post: "/v1/tasks:batchCreate"
      body: "*"
    };
  }
}

// Task represents a TODO item
message Task {
  option (google.api.resource) = {
    type: "todo.example.com/Task"
    pattern: "tasks/{task}"
    singular: "task"
    plural: "tasks"
  };

  // Resource name of the task
  string name = 1 [
    (google.api.field_behavior) = IDENTIFIER,
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // Task title
  string title = 2 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).string = {
      min_len: 1
      max_len: 200
    }
  ];

  // Task description
  string description = 3 [
    (google.api.field_behavior) = OPTIONAL,
    (buf.validate.field).string = {
      max_len: 1000
    }
  ];

  // Task status
  Status status = 4 [
    (google.api.field_behavior) = REQUIRED
  ];

  // Task priority
  Priority priority = 5 [
    (google.api.field_behavior) = OPTIONAL
  ];

  // Due date for the task
  google.protobuf.Timestamp due_date = 6 [
    (google.api.field_behavior) = OPTIONAL,
    (buf.validate.field).timestamp = {
      gt_now: true
    }
  ];

  // Task creation time
  google.protobuf.Timestamp create_time = 7 [
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // Task last update time
  google.protobuf.Timestamp update_time = 8 [
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // User who created the task
  string created_by = 9 [
    (google.api.field_behavior) = OUTPUT_ONLY
  ];

  // Tags associated with the task
  repeated string tags = 10 [
    (buf.validate.field).repeated = {
      max_items: 10
      items: {
        string: {
          pattern: "^[a-z0-9-]+$"
          max_len: 50
        }
      }
    }
  ];
}

// Task status enumeration
enum Status {
  STATUS_UNSPECIFIED = 0;
  STATUS_PENDING = 1;
  STATUS_IN_PROGRESS = 2;
  STATUS_COMPLETED = 3;
  STATUS_CANCELLED = 4;
}

// Task priority enumeration
enum Priority {
  PRIORITY_UNSPECIFIED = 0;
  PRIORITY_LOW = 1;
  PRIORITY_MEDIUM = 2;
  PRIORITY_HIGH = 3;
  PRIORITY_CRITICAL = 4;
}

// CreateTaskRequest message
message CreateTaskRequest {
  // Task to create
  Task task = 1 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).required = true
  ];
}

// GetTaskRequest message
message GetTaskRequest {
  // Resource name of the task
  string name = 1 [
    (google.api.field_behavior) = REQUIRED,
    (google.api.resource_reference) = {
      type: "todo.example.com/Task"
    },
    (buf.validate.field).string = {
      pattern: "^tasks/[a-zA-Z0-9-]+$"
    }
  ];
}

// ListTasksRequest message
message ListTasksRequest {
  // Maximum number of tasks to return
  int32 page_size = 1 [
    (buf.validate.field).int32 = {
      gte: 0
      lte: 1000
    }
  ];

  // Page token for pagination
  string page_token = 2;

  // Filter expression
  string filter = 3;

  // Order by expression
  string order_by = 4;
}

// ListTasksResponse message
message ListTasksResponse {
  // List of tasks
  repeated Task tasks = 1;

  // Token for next page
  string next_page_token = 2;

  // Total number of tasks
  int32 total_size = 3;
}

// UpdateTaskRequest message
message UpdateTaskRequest {
  // Task to update
  Task task = 1 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).required = true
  ];

  // Fields to update
  google.protobuf.FieldMask update_mask = 2 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).required = true
  ];
}

// DeleteTaskRequest message
message DeleteTaskRequest {
  // Resource name of the task
  string name = 1 [
    (google.api.field_behavior) = REQUIRED,
    (google.api.resource_reference) = {
      type: "todo.example.com/Task"
    }
  ];
}

// DeleteTaskResponse message
message DeleteTaskResponse {
  // Confirmation message
  string message = 1;
}

// BatchCreateTasksRequest message
message BatchCreateTasksRequest {
  // Tasks to create
  repeated CreateTaskRequest requests = 1 [
    (google.api.field_behavior) = REQUIRED,
    (buf.validate.field).repeated = {
      min_items: 1
      max_items: 100
    }
  ];
}

// BatchCreateTasksResponse message
message BatchCreateTasksResponse {
  // Created tasks
  repeated Task tasks = 1;
}
syntax = "proto3";

package errors.v1;

import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

option go_package = "github.com/bhatti/todo-api-errors/api/proto/errors/v1;errors";

// ErrorDetail provides a structured, machine-readable error payload.
// It is designed to be embedded in the `details` field of a `google.rpc.Status` message.
message ErrorDetail {
  // A unique, application-specific error code.
  string code = 1;
  // A short, human-readable summary of the problem type.
  string title = 2;
  // A human-readable explanation specific to this occurrence of the problem.
  string detail = 3;
  // A list of validation errors, useful for INVALID_ARGUMENT responses.
  repeated FieldViolation field_violations = 4;
  // Optional trace ID for request correlation
  string trace_id = 5;
  // Optional timestamp when the error occurred
  google.protobuf.Timestamp timestamp = 6;
  // Optional instance path where the error occurred
  string instance = 7;
  // Optional extensions for additional error context
  map<string, google.protobuf.Any> extensions = 8;
}

// Describes a single validation failure.
message FieldViolation {
  // The path to the field that failed validation, e.g., "title".
  string field = 1;
  // A developer-facing description of the validation rule that failed.
  string description = 2;
  // Application-specific error code for this validation failure
  string code = 3;
}

// AppErrorCode defines a list of standardized, application-specific error codes.
enum AppErrorCode {
  APP_ERROR_CODE_UNSPECIFIED = 0;

  // Validation failures
  VALIDATION_FAILED = 1;
  REQUIRED_FIELD = 2;
  TOO_SHORT = 3;
  TOO_LONG = 4;
  INVALID_FORMAT = 5;
  MUST_BE_FUTURE = 6;
  INVALID_VALUE = 7;
  DUPLICATE_TAG = 8;
  INVALID_TAG_FORMAT = 9;
  OVERDUE_COMPLETION = 10;
  EMPTY_BATCH = 11;
  BATCH_TOO_LARGE = 12;
  DUPLICATE_TITLE = 13;

  // Resource errors
  RESOURCE_NOT_FOUND = 1001;
  RESOURCE_CONFLICT = 1002;

  // Authentication and authorization
  AUTHENTICATION_FAILED = 2001;
  PERMISSION_DENIED = 2002;

  // Rate limiting and service availability
  RATE_LIMIT_EXCEEDED = 3001;
  SERVICE_UNAVAILABLE = 3002;

  // Internal errors
  INTERNAL_ERROR = 9001;
}

Error Handling Implementation

Now let’s implement our error handling framework:

package errors

import (
	"fmt"

	errorspb "github.com/bhatti/todo-api-errors/api/proto/errors/v1"
	"google.golang.org/genproto/googleapis/rpc/errdetails"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/anypb"
	"google.golang.org/protobuf/types/known/timestamppb"
)

// AppError is our custom error type using protobuf definitions.
type AppError struct {
	GRPCCode        codes.Code
	AppCode         errorspb.AppErrorCode
	Title           string
	Detail          string
	FieldViolations []*errorspb.FieldViolation
	TraceID         string
	Instance        string
	Extensions      map[string]*anypb.Any
	CausedBy        error // For internal logging
}

func (e *AppError) Error() string {
	return fmt.Sprintf("gRPC Code: %s, App Code: %s, Title: %s, Detail: %s", e.GRPCCode, e.AppCode, e.Title, e.Detail)
}

// ToGRPCStatus converts our AppError into a gRPC status.Status.
func (e *AppError) ToGRPCStatus() *status.Status {
	st := status.New(e.GRPCCode, e.Title)

	errorDetail := &errorspb.ErrorDetail{
		Code:            e.AppCode.String(),
		Title:           e.Title,
		Detail:          e.Detail,
		FieldViolations: e.FieldViolations,
		TraceId:         e.TraceID,
		Timestamp:       timestamppb.Now(),
		Instance:        e.Instance,
		Extensions:      e.Extensions,
	}

	// For validation errors, we also attach the standard BadRequest detail
	// so that gRPC-Gateway and other standard tools can understand it.
	if e.GRPCCode == codes.InvalidArgument && len(e.FieldViolations) > 0 {
		br := &errdetails.BadRequest{}
		for _, fv := range e.FieldViolations {
			br.FieldViolations = append(br.FieldViolations, &errdetails.BadRequest_FieldViolation{
				Field:       fv.Field,
				Description: fv.Description,
			})
		}
		st, _ = st.WithDetails(br, errorDetail)
		return st
	}

	st, _ = st.WithDetails(errorDetail)
	return st
}

// Helper functions for creating common errors

func NewValidationFailed(violations []*errorspb.FieldViolation, traceID string) *AppError {
	return &AppError{
		GRPCCode:        codes.InvalidArgument,
		AppCode:         errorspb.AppErrorCode_VALIDATION_FAILED,
		Title:           "Validation Failed",
		Detail:          fmt.Sprintf("The request contains %d validation errors", len(violations)),
		FieldViolations: violations,
		TraceID:         traceID,
	}
}

func NewNotFound(resource string, id string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.NotFound,
		AppCode:  errorspb.AppErrorCode_RESOURCE_NOT_FOUND,
		Title:    "Resource Not Found",
		Detail:   fmt.Sprintf("%s with ID '%s' was not found.", resource, id),
		TraceID:  traceID,
	}
}

func NewConflict(resource, reason string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.AlreadyExists,
		AppCode:  errorspb.AppErrorCode_RESOURCE_CONFLICT,
		Title:    "Resource Conflict",
		Detail:   fmt.Sprintf("Conflict creating %s: %s", resource, reason),
		TraceID:  traceID,
	}
}

func NewInternal(message string, traceID string, causedBy error) *AppError {
	return &AppError{
		GRPCCode: codes.Internal,
		AppCode:  errorspb.AppErrorCode_INTERNAL_ERROR,
		Title:    "Internal Server Error",
		Detail:   message,
		TraceID:  traceID,
		CausedBy: causedBy,
	}
}

func NewPermissionDenied(resource, action string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.PermissionDenied,
		AppCode:  errorspb.AppErrorCode_PERMISSION_DENIED,
		Title:    "Permission Denied",
		Detail:   fmt.Sprintf("You don't have permission to %s %s", action, resource),
		TraceID:  traceID,
	}
}

func NewServiceUnavailable(message string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.Unavailable,
		AppCode:  errorspb.AppErrorCode_SERVICE_UNAVAILABLE,
		Title:    "Service Unavailable",
		Detail:   message,
		TraceID:  traceID,
	}
}

func NewRequiredField(field, message string, traceID string) *AppError {
	return &AppError{
		GRPCCode: codes.InvalidArgument,
		AppCode:  errorspb.AppErrorCode_VALIDATION_FAILED,
		Title:    "Validation Failed",
		Detail:   "The request contains validation errors",
		FieldViolations: []*errorspb.FieldViolation{
			{
				Field:       field,
				Code:        errorspb.AppErrorCode_REQUIRED_FIELD.String(),
				Description: message,
			},
		},
		TraceID: traceID,
	}
}

Validation Framework

Let’s implement validation that returns all errors at once:

package validation

import (
	"errors"
	"fmt"
	"regexp"
	"strings"

	"buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go/buf/validate"
	"buf.build/go/protovalidate"
	errorspb "github.com/bhatti/todo-api-errors/api/proto/errors/v1"
	todopb "github.com/bhatti/todo-api-errors/api/proto/todo/v1"
	apperrors "github.com/bhatti/todo-api-errors/internal/errors"
	"google.golang.org/protobuf/proto"
)

var pv protovalidate.Validator

func init() {
	var err error
	pv, err = protovalidate.New()
	if err != nil {
		panic(fmt.Sprintf("failed to initialize protovalidator: %v", err))
	}
}

// ValidateRequest checks a proto message and returns an AppError with all violations.
func ValidateRequest(req proto.Message, traceID string) error {
	if err := pv.Validate(req); err != nil {
		var validationErrs *protovalidate.ValidationError
		if errors.As(err, &validationErrs) {
			var violations []*errorspb.FieldViolation
			for _, violation := range validationErrs.Violations {
				fieldPath := ""
				if violation.Proto.GetField() != nil {
					fieldPath = formatFieldPath(violation.Proto.GetField())
				}

				ruleId := violation.Proto.GetRuleId()
				message := violation.Proto.GetMessage()

				violations = append(violations, &errorspb.FieldViolation{
					Field:       fieldPath,
					Description: message,
					Code:        mapConstraintToCode(ruleId),
				})
			}
			return apperrors.NewValidationFailed(violations, traceID)
		}
		return apperrors.NewInternal("Validation failed", traceID, err)
	}
	return nil
}

// ValidateTask performs additional business logic validation
func ValidateTask(task *todopb.Task, traceID string) error {
	var violations []*errorspb.FieldViolation

	// Proto validation first
	if err := ValidateRequest(task, traceID); err != nil {
		if appErr, ok := err.(*apperrors.AppError); ok {
			violations = append(violations, appErr.FieldViolations...)
		}
	}

	// Additional business rules
	if task.Status == todopb.Status_STATUS_COMPLETED && task.DueDate != nil {
		if task.UpdateTime != nil && task.UpdateTime.AsTime().After(task.DueDate.AsTime()) {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       "due_date",
				Code:        errorspb.AppErrorCode_OVERDUE_COMPLETION.String(),
				Description: "Task was completed after the due date",
			})
		}
	}

	// Validate tags format
	for i, tag := range task.Tags {
		if !isValidTag(tag) {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       fmt.Sprintf("tags[%d]", i),
				Code:        errorspb.AppErrorCode_INVALID_TAG_FORMAT.String(),
				Description: fmt.Sprintf("Tag '%s' must be lowercase letters, numbers, and hyphens only", tag),
			})
		}
	}

	// Check for duplicate tags
	tagMap := make(map[string]bool)
	for i, tag := range task.Tags {
		if tagMap[tag] {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       fmt.Sprintf("tags[%d]", i),
				Code:        errorspb.AppErrorCode_DUPLICATE_TAG.String(),
				Description: fmt.Sprintf("Tag '%s' appears multiple times", tag),
			})
		}
		tagMap[tag] = true
	}

	if len(violations) > 0 {
		return apperrors.NewValidationFailed(violations, traceID)
	}

	return nil
}

// ValidateBatchCreateTasks validates batch operations
func ValidateBatchCreateTasks(req *todopb.BatchCreateTasksRequest, traceID string) error {
	var violations []*errorspb.FieldViolation

	// Check batch size
	if len(req.Requests) == 0 {
		violations = append(violations, &errorspb.FieldViolation{
			Field:       "requests",
			Code:        errorspb.AppErrorCode_EMPTY_BATCH.String(),
			Description: "Batch must contain at least one task",
		})
	}

	if len(req.Requests) > 100 {
		violations = append(violations, &errorspb.FieldViolation{
			Field:       "requests",
			Code:        errorspb.AppErrorCode_BATCH_TOO_LARGE.String(),
			Description: fmt.Sprintf("Batch size %d exceeds maximum of 100", len(req.Requests)),
		})
	}

	// Validate each task
	for i, createReq := range req.Requests {
		if createReq.Task == nil {
			violations = append(violations, &errorspb.FieldViolation{
				Field:       fmt.Sprintf("requests[%d].task", i),
				Code:        errorspb.AppErrorCode_REQUIRED_FIELD.String(),
				Description: "Task is required",
			})
			continue
		}

		// Validate task
		if err := ValidateTask(createReq.Task, traceID); err != nil {
			if appErr, ok := err.(*apperrors.AppError); ok {
				for _, violation := range appErr.FieldViolations {
					violation.Field = fmt.Sprintf("requests[%d].task.%s", i, violation.Field)
					violations = append(violations, violation)
				}
			}
		}
	}

	// Check for duplicate titles
	titleMap := make(map[string][]int)
	for i, createReq := range req.Requests {
		if createReq.Task != nil && createReq.Task.Title != "" {
			titleMap[createReq.Task.Title] = append(titleMap[createReq.Task.Title], i)
		}
	}

	for title, indices := range titleMap {
		if len(indices) > 1 {
			for _, idx := range indices {
				violations = append(violations, &errorspb.FieldViolation{
					Field:       fmt.Sprintf("requests[%d].task.title", idx),
					Code:        errorspb.AppErrorCode_DUPLICATE_TITLE.String(),
					Description: fmt.Sprintf("Title '%s' is used by multiple tasks in the batch", title),
				})
			}
		}
	}

	if len(violations) > 0 {
		return apperrors.NewValidationFailed(violations, traceID)
	}

	return nil
}

// Helper functions
func formatFieldPath(fieldPath *validate.FieldPath) string {
	if fieldPath == nil {
		return ""
	}

	// Build field path from elements
	var parts []string
	for _, element := range fieldPath.GetElements() {
		if element.GetFieldName() != "" {
			parts = append(parts, element.GetFieldName())
		} else if element.GetFieldNumber() != 0 {
			parts = append(parts, fmt.Sprintf("field_%d", element.GetFieldNumber()))
		}
	}

	return strings.Join(parts, ".")
}

func mapConstraintToCode(ruleId string) string {
	switch {
	case strings.Contains(ruleId, "required"):
		return errorspb.AppErrorCode_REQUIRED_FIELD.String()
	case strings.Contains(ruleId, "min_len"):
		return errorspb.AppErrorCode_TOO_SHORT.String()
	case strings.Contains(ruleId, "max_len"):
		return errorspb.AppErrorCode_TOO_LONG.String()
	case strings.Contains(ruleId, "pattern"):
		return errorspb.AppErrorCode_INVALID_FORMAT.String()
	case strings.Contains(ruleId, "gt_now"):
		return errorspb.AppErrorCode_MUST_BE_FUTURE.String()
	case ruleId == "":
		return errorspb.AppErrorCode_VALIDATION_FAILED.String()
	default:
		return errorspb.AppErrorCode_INVALID_VALUE.String()
	}
}

var validTagPattern = regexp.MustCompile(`^[a-z0-9-]+$`)

func isValidTag(tag string) bool {
	return len(tag) <= 50 && validTagPattern.MatchString(tag)
}

Error Handler Middleware

Now let’s create middleware to handle errors consistently:

package middleware

import (
	"context"
	"errors"
	"log"

	apperrors "github.com/bhatti/todo-api-errors/internal/errors"
	"google.golang.org/grpc"
	"google.golang.org/grpc/status"
)

// UnaryErrorInterceptor translates application errors into gRPC statuses.
func UnaryErrorInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	resp, err := handler(ctx, req)
	if err == nil {
		return resp, nil
	}

	var appErr *apperrors.AppError
	if errors.As(err, &appErr) {
		if appErr.CausedBy != nil {
			log.Printf("ERROR: %s, Original cause: %v", appErr.Title, appErr.CausedBy)
		}
		return nil, appErr.ToGRPCStatus().Err()
	}

	if _, ok := status.FromError(err); ok {
		return nil, err // Already a gRPC status
	}

	log.Printf("UNEXPECTED ERROR: %v", err)
	return nil, apperrors.NewInternal("An unexpected error occurred", "", err).ToGRPCStatus().Err()
}
package middleware

import (
	"context"
	"encoding/json"
	"net/http"
	"runtime/debug"
	"time"

	errorspb "github.com/bhatti/todo-api-errors/api/proto/errors/v1"
	apperrors "github.com/bhatti/todo-api-errors/internal/errors"
	"github.com/google/uuid"
	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/encoding/protojson"
)

// HTTPErrorHandler handles errors for HTTP endpoints
func HTTPErrorHandler(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Add trace ID to context
		traceID := r.Header.Get("X-Trace-ID")
		if traceID == "" {
			traceID = uuid.New().String()
		}
		ctx := context.WithValue(r.Context(), "traceID", traceID)
		r = r.WithContext(ctx)

		// Create response wrapper to intercept errors
		wrapped := &responseWriter{
			ResponseWriter: w,
			request:        r,
			traceID:        traceID,
		}

		// Handle panics
		defer func() {
			if err := recover(); err != nil {
				handlePanic(wrapped, err)
			}
		}()

		// Process request
		next.ServeHTTP(wrapped, r)
	})
}

// responseWriter wraps http.ResponseWriter to intercept errors
type responseWriter struct {
	http.ResponseWriter
	request    *http.Request
	traceID    string
	statusCode int
	written    bool
}

func (w *responseWriter) WriteHeader(code int) {
	if !w.written {
		w.statusCode = code
		w.ResponseWriter.WriteHeader(code)
		w.written = true
	}
}

func (w *responseWriter) Write(b []byte) (int, error) {
	if !w.written {
		w.WriteHeader(http.StatusOK)
	}
	return w.ResponseWriter.Write(b)
}

// handlePanic converts panics to proper error responses
func handlePanic(w *responseWriter, recovered interface{}) {
	// Log stack trace
	debug.PrintStack()

	appErr := apperrors.NewInternal("An unexpected error occurred. Please try again later.", w.traceID, nil)
	writeErrorResponse(w, appErr)
}

// CustomHTTPError handles gRPC gateway error responses
func CustomHTTPError(ctx context.Context, mux *runtime.ServeMux,
	marshaler runtime.Marshaler, w http.ResponseWriter, r *http.Request, err error) {

	// Extract trace ID
	traceID := r.Header.Get("X-Trace-ID")
	if traceID == "" {
		if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
			traceID = span.SpanContext().TraceID().String()
		} else {
			traceID = uuid.New().String()
		}
	}

	// Convert gRPC error to HTTP response
	st, _ := status.FromError(err)

	// Check if we have our custom error detail in status details
	for _, detail := range st.Details() {
		if errorDetail, ok := detail.(*errorspb.ErrorDetail); ok {
			// Update the error detail with current request context
			errorDetail.TraceId = traceID
			errorDetail.Instance = r.URL.Path

			// Convert to JSON and write response
			w.Header().Set("Content-Type", "application/problem+json")
			w.WriteHeader(runtime.HTTPStatusFromCode(st.Code()))

			// Create a simplified JSON response that matches RFC 7807
			response := map[string]interface{}{
				"type":      getTypeForCode(errorDetail.Code),
				"title":     errorDetail.Title,
				"status":    runtime.HTTPStatusFromCode(st.Code()),
				"detail":    errorDetail.Detail,
				"instance":  errorDetail.Instance,
				"traceId":   errorDetail.TraceId,
				"timestamp": errorDetail.Timestamp,
			}

			// Add field violations if present
			if len(errorDetail.FieldViolations) > 0 {
				violations := make([]map[string]interface{}, len(errorDetail.FieldViolations))
				for i, fv := range errorDetail.FieldViolations {
					violations[i] = map[string]interface{}{
						"field":   fv.Field,
						"code":    fv.Code,
						"message": fv.Description,
					}
				}
				response["errors"] = violations
			}

			// Add extensions if present
			if len(errorDetail.Extensions) > 0 {
				extensions := make(map[string]interface{})
				for k, v := range errorDetail.Extensions {
					// Convert Any to JSON
					if jsonBytes, err := protojson.Marshal(v); err == nil {
						var jsonData interface{}
						if err := json.Unmarshal(jsonBytes, &jsonData); err == nil {
							extensions[k] = jsonData
						}
					}
				}
				if len(extensions) > 0 {
					response["extensions"] = extensions
				}
			}

			if err := json.NewEncoder(w).Encode(response); err != nil {
				http.Error(w, `{"error": "Failed to encode error response"}`, 500)
			}
			return
		}
	}

	// Fallback: create new error response
	fallbackErr := apperrors.NewInternal(st.Message(), traceID, nil)
	fallbackErr.GRPCCode = st.Code()
	writeAppErrorResponse(w, fallbackErr, r.URL.Path)
}

// Helper functions
func getTypeForCode(code string) string {
	switch code {
	case errorspb.AppErrorCode_VALIDATION_FAILED.String():
		return "https://api.example.com/errors/validation-failed"
	case errorspb.AppErrorCode_RESOURCE_NOT_FOUND.String():
		return "https://api.example.com/errors/resource-not-found"
	case errorspb.AppErrorCode_RESOURCE_CONFLICT.String():
		return "https://api.example.com/errors/resource-conflict"
	case errorspb.AppErrorCode_PERMISSION_DENIED.String():
		return "https://api.example.com/errors/permission-denied"
	case errorspb.AppErrorCode_INTERNAL_ERROR.String():
		return "https://api.example.com/errors/internal-error"
	case errorspb.AppErrorCode_SERVICE_UNAVAILABLE.String():
		return "https://api.example.com/errors/service-unavailable"
	default:
		return "https://api.example.com/errors/unknown"
	}
}

func writeErrorResponse(w http.ResponseWriter, err error) {
	if appErr, ok := err.(*apperrors.AppError); ok {
		writeAppErrorResponse(w, appErr, "")
	} else {
		http.Error(w, err.Error(), http.StatusInternalServerError)
	}
}

func writeAppErrorResponse(w http.ResponseWriter, appErr *apperrors.AppError, instance string) {
	statusCode := runtime.HTTPStatusFromCode(appErr.GRPCCode)

	response := map[string]interface{}{
		"type":      getTypeForCode(appErr.AppCode.String()),
		"title":     appErr.Title,
		"status":    statusCode,
		"detail":    appErr.Detail,
		"traceId":   appErr.TraceID,
		"timestamp": time.Now(),
	}

	if instance != "" {
		response["instance"] = instance
	}

	if len(appErr.FieldViolations) > 0 {
		violations := make([]map[string]interface{}, len(appErr.FieldViolations))
		for i, fv := range appErr.FieldViolations {
			violations[i] = map[string]interface{}{
				"field":   fv.Field,
				"code":    fv.Code,
				"message": fv.Description,
			}
		}
		response["errors"] = violations
	}

	w.Header().Set("Content-Type", "application/problem+json")
	w.WriteHeader(statusCode)
	json.NewEncoder(w).Encode(response)
}

Service Implementation

Now let’s implement our TODO service with proper error handling:

package service

import (
	"context"
	"fmt"
	todopb "github.com/bhatti/todo-api-errors/api/proto/todo/v1"
	"github.com/bhatti/todo-api-errors/internal/errors"
	"github.com/bhatti/todo-api-errors/internal/repository"
	"github.com/bhatti/todo-api-errors/internal/validation"
	"github.com/google/uuid"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
	"google.golang.org/protobuf/types/known/timestamppb"
	"strings"
)

var tracer = otel.Tracer("todo-service")

// TodoService implements the TODO API
type TodoService struct {
	todopb.UnimplementedTodoServiceServer
	repo repository.TodoRepository
}

// NewTodoService creates a new TODO service
func NewTodoService(repo repository.TodoRepository) (*TodoService, error) {
	return &TodoService{
		repo: repo,
	}, nil
}

// CreateTask creates a new task
func (s *TodoService) CreateTask(ctx context.Context, req *todopb.CreateTaskRequest) (*todopb.Task, error) {
	ctx, span := tracer.Start(ctx, "CreateTask")
	defer span.End()

	// Get trace ID for error responses
	traceID := span.SpanContext().TraceID().String()

	// Validate request
	if req.Task == nil {
		return nil, errors.NewRequiredField("task", "Task object is required", traceID)
	}

	// Validate task fields using the new validation package
	if err := validation.ValidateTask(req.Task, traceID); err != nil {
		span.SetAttributes(attribute.String("validation.error", err.Error()))
		return nil, err
	}

	// Check for duplicate title
	existing, err := s.repo.GetTaskByTitle(ctx, req.Task.Title)
	if err != nil && !repository.IsNotFound(err) {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	if existing != nil {
		return nil, errors.NewConflict("task", "A task with this title already exists", traceID)
	}

	// Generate task ID
	taskID := uuid.New().String()
	task := &todopb.Task{
		Name:        fmt.Sprintf("tasks/%s", taskID),
		Title:       req.Task.Title,
		Description: req.Task.Description,
		Status:      req.Task.Status,
		Priority:    req.Task.Priority,
		DueDate:     req.Task.DueDate,
		Tags:        req.Task.Tags,
		CreateTime:  timestamppb.Now(),
		UpdateTime:  timestamppb.Now(),
		CreatedBy:   s.getUserFromContext(ctx),
	}

	// Set defaults
	if task.Status == todopb.Status_STATUS_UNSPECIFIED {
		task.Status = todopb.Status_STATUS_PENDING
	}
	if task.Priority == todopb.Priority_PRIORITY_UNSPECIFIED {
		task.Priority = todopb.Priority_PRIORITY_MEDIUM
	}

	// Save to repository
	if err := s.repo.CreateTask(ctx, task); err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	span.SetAttributes(
		attribute.String("task.id", taskID),
		attribute.String("task.title", task.Title),
	)

	return task, nil
}

// GetTask retrieves a specific task
func (s *TodoService) GetTask(ctx context.Context, req *todopb.GetTaskRequest) (*todopb.Task, error) {
	ctx, span := tracer.Start(ctx, "GetTask")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request using the new validation package
	if err := validation.ValidateRequest(req, traceID); err != nil {
		return nil, err
	}

	// Extract task ID
	parts := strings.Split(req.Name, "/")
	if len(parts) != 2 || parts[0] != "tasks" {
		return nil, errors.NewRequiredField("name", "Task name must be in format 'tasks/{id}'", traceID)
	}

	taskID := parts[1]
	span.SetAttributes(attribute.String("task.id", taskID))

	// Get from repository
	task, err := s.repo.GetTask(ctx, taskID)
	if err != nil {
		if repository.IsNotFound(err) {
			return nil, errors.NewNotFound("Task", taskID, traceID)
		}
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Check permissions
	if !s.canAccessTask(ctx, task) {
		return nil, errors.NewPermissionDenied("task", "read", traceID)
	}

	return task, nil
}

// ListTasks retrieves all tasks
func (s *TodoService) ListTasks(ctx context.Context, req *todopb.ListTasksRequest) (*todopb.ListTasksResponse, error) {
	ctx, span := tracer.Start(ctx, "ListTasks")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request using the new validation package
	if err := validation.ValidateRequest(req, traceID); err != nil {
		return nil, err
	}

	// Default page size
	pageSize := req.PageSize
	if pageSize == 0 {
		pageSize = 50
	}
	if pageSize > 1000 {
		pageSize = 1000
	}

	span.SetAttributes(
		attribute.Int("page.size", int(pageSize)),
		attribute.String("filter", req.Filter),
	)

	// Parse filter
	filter, err := s.parseFilter(req.Filter)
	if err != nil {
		return nil, errors.NewRequiredField("filter", fmt.Sprintf("Failed to parse filter: %v", err), traceID)
	}

	// Get tasks from repository
	tasks, nextPageToken, err := s.repo.ListTasks(ctx, repository.ListOptions{
		PageSize:  int(pageSize),
		PageToken: req.PageToken,
		Filter:    filter,
		OrderBy:   req.OrderBy,
		UserID:    s.getUserFromContext(ctx),
	})

	if err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Get total count
	totalSize, err := s.repo.CountTasks(ctx, filter, s.getUserFromContext(ctx))
	if err != nil {
		// Log but don't fail the request
		span.RecordError(err)
		totalSize = -1
	}

	return &todopb.ListTasksResponse{
		Tasks:         tasks,
		NextPageToken: nextPageToken,
		TotalSize:     int32(totalSize),
	}, nil
}

// UpdateTask updates an existing task
func (s *TodoService) UpdateTask(ctx context.Context, req *todopb.UpdateTaskRequest) (*todopb.Task, error) {
	ctx, span := tracer.Start(ctx, "UpdateTask")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request
	if req.Task == nil {
		return nil, errors.NewRequiredField("task", "Task object is required", traceID)
	}

	if req.UpdateMask == nil || len(req.UpdateMask.Paths) == 0 {
		return nil, errors.NewRequiredField("update_mask", "Update mask must specify which fields to update", traceID)
	}

	// Extract task ID
	parts := strings.Split(req.Task.Name, "/")
	if len(parts) != 2 || parts[0] != "tasks" {
		return nil, errors.NewRequiredField("task.name", "Invalid task name format", traceID)
	}

	taskID := parts[1]
	span.SetAttributes(attribute.String("task.id", taskID))

	// Get existing task
	existing, err := s.repo.GetTask(ctx, taskID)
	if err != nil {
		if repository.IsNotFound(err) {
			return nil, errors.NewNotFound("Task", taskID, traceID)
		}
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Check permissions
	if !s.canModifyTask(ctx, existing) {
		return nil, errors.NewPermissionDenied("task", "update", traceID)
	}

	// Apply updates based on field mask
	updated := s.applyFieldMask(existing, req.Task, req.UpdateMask)
	updated.UpdateTime = timestamppb.Now()

	// Validate updated task using the new validation package
	if err := validation.ValidateTask(updated, traceID); err != nil {
		return nil, err
	}

	// Save to repository
	if err := s.repo.UpdateTask(ctx, updated); err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	return updated, nil
}

// DeleteTask removes a task
func (s *TodoService) DeleteTask(ctx context.Context, req *todopb.DeleteTaskRequest) (*todopb.DeleteTaskResponse, error) {
	ctx, span := tracer.Start(ctx, "DeleteTask")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate request using the new validation package
	if err := validation.ValidateRequest(req, traceID); err != nil {
		return nil, err
	}

	// Extract task ID
	parts := strings.Split(req.Name, "/")
	if len(parts) != 2 || parts[0] != "tasks" {
		return nil, errors.NewRequiredField("name", "Invalid task name format", traceID)
	}

	taskID := parts[1]
	span.SetAttributes(attribute.String("task.id", taskID))

	// Get existing task to check permissions
	existing, err := s.repo.GetTask(ctx, taskID)
	if err != nil {
		if repository.IsNotFound(err) {
			return nil, errors.NewNotFound("Task", taskID, traceID)
		}
		return nil, s.handleRepositoryError(err, traceID)
	}

	// Check permissions
	if !s.canModifyTask(ctx, existing) {
		return nil, errors.NewPermissionDenied("task", "delete", traceID)
	}

	// Delete from repository
	if err := s.repo.DeleteTask(ctx, taskID); err != nil {
		span.RecordError(err)
		return nil, s.handleRepositoryError(err, traceID)
	}

	return &todopb.DeleteTaskResponse{
		Message: fmt.Sprintf("Task %s deleted successfully", req.Name),
	}, nil
}

// BatchCreateTasks creates multiple tasks at once
func (s *TodoService) BatchCreateTasks(ctx context.Context, req *todopb.BatchCreateTasksRequest) (*todopb.BatchCreateTasksResponse, error) {
	ctx, span := tracer.Start(ctx, "BatchCreateTasks")
	defer span.End()

	traceID := span.SpanContext().TraceID().String()

	// Validate batch request using the new validation package
	if err := validation.ValidateBatchCreateTasks(req, traceID); err != nil {
		span.SetAttributes(attribute.String("validation.error", err.Error()))
		return nil, err
	}

	// Process each task
	var created []*todopb.Task
	var batchErrors []string

	for i, createReq := range req.Requests {
		task, err := s.CreateTask(ctx, createReq)
		if err != nil {
			// Collect errors for batch response
			batchErrors = append(batchErrors, fmt.Sprintf("Task %d: %s", i, err.Error()))
			continue
		}
		created = append(created, task)
	}

	// If all tasks failed, return error
	if len(created) == 0 && len(batchErrors) > 0 {
		return nil, errors.NewInternal("All batch operations failed", traceID, nil)
	}

	// Return partial success
	response := &todopb.BatchCreateTasksResponse{
		Tasks: created,
	}

	// Add partial errors to response metadata if any
	if len(batchErrors) > 0 {
		span.SetAttributes(
			attribute.Int("batch.total", len(req.Requests)),
			attribute.Int("batch.success", len(created)),
			attribute.Int("batch.failed", len(batchErrors)),
		)
	}

	return response, nil
}

// Helper methods

func (s *TodoService) handleRepositoryError(err error, traceID string) error {
	if repository.IsConnectionError(err) {
		return errors.NewServiceUnavailable("Unable to connect to the database. Please try again later.", traceID)
	}

	// Log internal error details
	span := trace.SpanFromContext(context.Background())
	if span != nil {
		span.RecordError(err)
	}

	return errors.NewInternal("An unexpected error occurred while processing your request", traceID, err)
}

func (s *TodoService) getUserFromContext(ctx context.Context) string {
	// In a real implementation, this would extract user info from auth context
	if user, ok := ctx.Value("user").(string); ok {
		return user
	}
	return "anonymous"
}

func (s *TodoService) canAccessTask(ctx context.Context, task *todopb.Task) bool {
	// In a real implementation, check if user can access this task
	user := s.getUserFromContext(ctx)
	return user == task.CreatedBy || user == "admin"
}

func (s *TodoService) canModifyTask(ctx context.Context, task *todopb.Task) bool {
	// In a real implementation, check if user can modify this task
	user := s.getUserFromContext(ctx)
	return user == task.CreatedBy || user == "admin"
}

func (s *TodoService) parseFilter(filter string) (map[string]interface{}, error) {
	// Simple filter parser - in production, use a proper parser
	parsed := make(map[string]interface{})

	if filter == "" {
		return parsed, nil
	}

	// Example: "status=COMPLETED AND priority=HIGH"
	parts := strings.Split(filter, " AND ")
	for _, part := range parts {
		kv := strings.Split(strings.TrimSpace(part), "=")
		if len(kv) != 2 {
			return nil, fmt.Errorf("invalid filter expression: %s", part)
		}

		key := strings.TrimSpace(kv[0])
		value := strings.Trim(strings.TrimSpace(kv[1]), "'\"")

		// Validate filter keys
		switch key {
		case "status", "priority", "created_by":
			parsed[key] = value
		default:
			return nil, fmt.Errorf("unknown filter field: %s", key)
		}
	}

	return parsed, nil
}

func (s *TodoService) applyFieldMask(existing, update *todopb.Task, mask *fieldmaskpb.FieldMask) *todopb.Task {
	result := *existing

	for _, path := range mask.Paths {
		switch path {
		case "title":
			result.Title = update.Title
		case "description":
			result.Description = update.Description
		case "status":
			result.Status = update.Status
		case "priority":
			result.Priority = update.Priority
		case "due_date":
			result.DueDate = update.DueDate
		case "tags":
			result.Tags = update.Tags
		}
	}
	return &result
}

Server Implementation

Now let’s put it all together in our server:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	todopb "github.com/bhatti/todo-api-errors/api/proto/todo/v1"
	"github.com/bhatti/todo-api-errors/internal/middleware"
	"github.com/bhatti/todo-api-errors/internal/monitoring"
	"github.com/bhatti/todo-api-errors/internal/repository"
	"github.com/bhatti/todo-api-errors/internal/service"

	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/reflection"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/encoding/protojson"
)

func main() {
	// Initialize monitoring
	if err := monitoring.InitOpenTelemetryMetrics(); err != nil {
		log.Printf("Failed to initialize OpenTelemetry metrics: %v", err)
		// Continue without OpenTelemetry - Prometheus will still work
	}

	// Initialize repository
	repo := repository.NewInMemoryRepository()

	// Initialize service
	todoService, err := service.NewTodoService(repo)
	if err != nil {
		log.Fatalf("Failed to create service: %v", err)
	}

	// Start gRPC server
	grpcPort := ":50051"
	go func() {
		if err := startGRPCServer(grpcPort, todoService); err != nil {
			log.Fatalf("Failed to start gRPC server: %v", err)
		}
	}()

	// Start HTTP gateway
	httpPort := ":8080"
	go func() {
		if err := startHTTPGateway(httpPort, grpcPort); err != nil {
			log.Fatalf("Failed to start HTTP gateway: %v", err)
		}
	}()

	// Start metrics server
	go func() {
		http.Handle("/metrics", promhttp.Handler())
		if err := http.ListenAndServe(":9090", nil); err != nil {
			log.Printf("Failed to start metrics server: %v", err)
		}
	}()

	log.Printf("TODO API server started")
	log.Printf("gRPC server listening on %s", grpcPort)
	log.Printf("HTTP gateway listening on %s", httpPort)
	log.Printf("Metrics available at :9090/metrics")

	// Wait for interrupt signal
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh

	log.Println("Shutting down...")
}

func startGRPCServer(port string, todoService todopb.TodoServiceServer) error {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		return fmt.Errorf("failed to listen: %w", err)
	}

	// Create gRPC server with interceptors - now using the new UnaryErrorInterceptor
	opts := []grpc.ServerOption{
		grpc.ChainUnaryInterceptor(
			middleware.UnaryErrorInterceptor, // Using new protobuf-based error interceptor
			loggingInterceptor(),
			recoveryInterceptor(),
		),
	}

	server := grpc.NewServer(opts...)

	// Register service
	todopb.RegisterTodoServiceServer(server, todoService)

	// Register reflection for debugging
	reflection.Register(server)

	return server.Serve(lis)
}

func startHTTPGateway(httpPort, grpcPort string) error {
	ctx := context.Background()

	// Create gRPC connection
	conn, err := grpc.DialContext(
		ctx,
		"localhost"+grpcPort,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		return fmt.Errorf("failed to dial gRPC server: %w", err)
	}

	// Create gateway mux with custom error handler
	mux := runtime.NewServeMux(
		runtime.WithErrorHandler(middleware.CustomHTTPError), // Using new protobuf-based error handler
		runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{
			MarshalOptions: protojson.MarshalOptions{
				UseProtoNames:   true,
				EmitUnpopulated: false,
			},
			UnmarshalOptions: protojson.UnmarshalOptions{
				DiscardUnknown: true,
			},
		}),
	)

	// Register service handler
	if err := todopb.RegisterTodoServiceHandler(ctx, mux, conn); err != nil {
		return fmt.Errorf("failed to register service handler: %w", err)
	}

	// Create HTTP server with middleware
	handler := middleware.HTTPErrorHandler( // Using new protobuf-based HTTP error handler
		corsMiddleware(
			authMiddleware(
				loggingHTTPMiddleware(mux),
			),
		),
	)

	server := &http.Server{
		Addr:         httpPort,
		Handler:      handler,
		ReadTimeout:  10 * time.Second,
		WriteTimeout: 10 * time.Second,
		IdleTimeout:  120 * time.Second,
	}

	return server.ListenAndServe()
}

// Middleware implementations

func loggingInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
		start := time.Now()

		// Call handler
		resp, err := handler(ctx, req)

		// Log request
		duration := time.Since(start)
		statusCode := "OK"
		if err != nil {
			statusCode = status.Code(err).String()
		}

		log.Printf("gRPC: %s %s %s %v", info.FullMethod, statusCode, duration, err)

		return resp, err
	}
}

func recoveryInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		defer func() {
			if r := recover(); r != nil {
				log.Printf("Recovered from panic: %v", r)
				monitoring.RecordPanicRecovery(ctx)
				err = status.Error(codes.Internal, "Internal server error")
			}
		}()

		return handler(ctx, req)
	}
}

func loggingHTTPMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		start := time.Now()

		// Wrap response writer to capture status
		wrapped := &statusResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}

		// Process request
		next.ServeHTTP(wrapped, r)

		// Log request
		duration := time.Since(start)
		log.Printf("HTTP: %s %s %d %v", r.Method, r.URL.Path, wrapped.statusCode, duration)
	})
}

func corsMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		w.Header().Set("Access-Control-Allow-Origin", "*")
		w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS, PATCH")
		w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Trace-ID")

		if r.Method == "OPTIONS" {
			w.WriteHeader(http.StatusOK)
			return
		}

		next.ServeHTTP(w, r)
	})
}

func authMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Simple auth for demo - in production use proper authentication
		authHeader := r.Header.Get("Authorization")
		if authHeader == "" {
			authHeader = "Bearer anonymous"
		}

		// Extract user from token
		user := "anonymous"
		if len(authHeader) > 7 && authHeader[:7] == "Bearer " {
			user = authHeader[7:]
		}

		// Add user to context
		ctx := context.WithValue(r.Context(), "user", user)
		next.ServeHTTP(w, r.WithContext(ctx))
	})
}

type statusResponseWriter struct {
	http.ResponseWriter
	statusCode int
}

func (w *statusResponseWriter) WriteHeader(code int) {
	w.statusCode = code
	w.ResponseWriter.WriteHeader(code)
}

Example API Usage

Let’s see our error handling in action with some example requests:

Example 1: Validation Error with Multiple Issues

Request with multiple validation errors

curl -X POST http://localhost:8080/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"task": {
"title": "",
"description": "This description is wayyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy too long…",
"status": "INVALID_STATUS",
"tags": ["INVALID TAG", "tag-1", "tag-1"]
}
}'

Response

< HTTP/1.1 422 Unprocessable Entity
< Content-Type: application/problem+json
{
  "detail": "The request contains 5 validation errors",
  "errors": [
    {
      "code": "TOO_SHORT",
      "field": "title",
      "message": "value length must be at least 1 characters"
    },
    {
      "code": "TOO_LONG",
      "field": "description",
      "message": "value length must be at most 100 characters"
    },
    {
      "code": "INVALID_FORMAT",
      "field": "tags",
      "message": "value does not match regex pattern `^[a-z0-9-]+$`"
    },
    {
      "code": "INVALID_TAG_FORMAT",
      "field": "tags[0]",
      "message": "Tag 'INVALID TAG' must be lowercase letters, numbers, and hyphens only"
    },
    {
      "code": "DUPLICATE_TAG",
      "field": "tags[2]",
      "message": "Tag 'tag-1' appears multiple times"
    }
  ],
  "instance": "/v1/tasks",
  "status": 400,
  "timestamp": {
    "seconds": 1755288524,
    "nanos": 484865000
  },
  "title": "Validation Failed",
  "traceId": "eb4bfb3f-9397-4547-8618-ce9952a16067",
  "type": "https://api.example.com/errors/validation-failed"
}

Example 2: Not Found Error

Request for non-existent task

curl http://localhost:8080/v1/tasks/non-existent-id

Response

< HTTP/1.1 404 Not Found
< Content-Type: application/problem+json
{
  "detail": "Task with ID 'non-existent-id' was not found.",
  "instance": "/v1/tasks/non-existent-id",
  "status": 404,
  "timestamp": {
    "seconds": 1755288565,
    "nanos": 904607000
  },
  "title": "Resource Not Found",
  "traceId": "6ce00cd8-d0b7-47f1-b6f6-9fc1375c26a4",
  "type": "https://api.example.com/errors/resource-not-found"
}

Example 3: Conflict Error

curl -X POST http://localhost:8080/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"task": {
"title": "Existing Task Title"
}
}'

curl -X POST http://localhost:8080/v1/tasks \
-H "Content-Type: application/json" \
-d '{
"task": {
"title": "Existing Task Title"
}
}'

Response

< HTTP/1.1 409 Conflict
< Content-Type: application/problem+json
{
  "detail": "Conflict creating task: A task with this title already exists",
  "instance": "/v1/tasks",
  "status": 409,
  "timestamp": {
    "seconds": 1755288593,
    "nanos": 594458000
  },
  "title": "Resource Conflict",
  "traceId": "ed2e78d2-591d-492a-8d71-6b6843ce86f7",
  "type": "https://api.example.com/errors/resource-conflict"
}

Example 4: Service Unavailable (Transient Error)

When database is down

curl http://localhost:8080/v1/tasks

Response

HTTP/1.1 503 Service Unavailable
Content-Type: application/problem+json
Retry-After: 30
{
  "type": "https://api.example.com/errors/service-unavailable",
  "title": "Service Unavailable",
  "status": 503,
  "detail": "Database connection pool exhausted. Please try again later.",
  "instance": "/v1/tasks",
  "traceId": "db-pool-001",
  "timestamp": "2025-08-15T10:30:00Z",
  "extensions": {
    "retryable": true,
    "retryAfter": "2025-08-15T10:30:30Z",
    "maxRetries": 3,
    "backoffType": "exponential",
    "backoffMs": 1000,
    "errorCategory": "database"
  }
}

Best Practices Summary

Our implementation demonstrates several key best practices:

1. Consistent Error Format

All errors follow RFC 9457 (Problem Details) format, providing:

  • Machine-readable type URIs
  • Human-readable titles and details
  • HTTP status codes
  • Request tracing
  • Extensible metadata

2. Comprehensive Validation

  • All validation errors are returned at once, not one by one
  • Clear field paths for nested objects
  • Descriptive error codes and messages
  • Support for batch operations with partial success

3. Security-Conscious Design

  • No sensitive information in error messages
  • Internal errors are logged but not exposed
  • Generic messages for authentication failures
  • Request IDs for support without exposing internals

4. Developer Experience

  • Clear, actionable error messages
  • Helpful suggestions for fixing issues
  • Consistent error codes across protocols
  • Rich metadata for debugging

5. Protocol Compatibility

  • Seamless translation between gRPC and HTTP
  • Proper status code mapping
  • Preservation of error details across protocols

6. Observability

  • Structured logging with trace IDs
  • Prometheus metrics for monitoring
  • OpenTelemetry integration
  • Error categorization for analysis

Conclusion

This comprehensive guide demonstrates how to build robust error handling for modern APIs. By treating errors as a first-class feature of our API, we’ve achieved several key benefits:

  • Consistency: All errors, regardless of their source, are presented to clients in a predictable format.
  • Clarity: Developers consuming our API get clear, actionable feedback, helping them debug and integrate faster.
  • Developer Ergonomics: Our internal service code is cleaner, as handlers focus on business logic while the middleware handles the boilerplate of error conversion.
  • Security: We have a clear separation between internal error details (for logging) and public error responses, preventing leaks.

Additional Resources

You can find the full source code for this example in this GitHub repository.

July 17, 2025

Zero-Downtime Services with Lifecycle Management on Kubernetes and Istio

Filed under: Computing,Web Services — admin @ 3:12 pm

Introduction

In the world of cloud-native applications, service lifecycle management is often an afterthought—until it causes a production outage. Whether you’re running gRPC or REST APIs on Kubernetes with Istio, proper lifecycle management is the difference between smooth deployments and 3 AM incident calls. Consider these scenarios:

  • Your service takes 45 seconds to warm up its cache, but Kubernetes kills it after 30 seconds of startup wait.
  • During deployments, clients receive connection errors as pods terminate abruptly.
  • A hiccup in a database or dependent service causes your entire service mesh to cascade fail.
  • Your service mesh sidecar shuts down before your application is terminated or drops in-flight requests.
  • A critical service receives SIGKILL during transaction processing, leaving data in inconsistent states.
  • After a regional outage, services restart but data drift goes undetected for hours.
  • Your RTO target is 15 seconds, but services take 30 seconds just to start up properly.

These aren’t edge cases—they’re common problems that proper lifecycle management solves. More critically, unsafe shutdowns can cause data corruption, financial losses, and breach compliance requirements. This guide covers what you need to know about building services that start safely, shut down gracefully, and handle failures intelligently.

The Hidden Complexity of Service Lifecycles

Modern microservices don’t exist in isolation. A typical request might flow through:

Typical Request Flow.

Each layer adds complexity to startup and shutdown sequences. Without proper coordination, you’ll experience:

  • Startup race conditions: Application tries to make network calls before the sidecar proxy is ready
  • Shutdown race conditions: Sidecar terminates while the application is still processing requests
  • Premature traffic: Load balancer routes traffic before the application is truly ready
  • Dropped connections: Abrupt shutdowns leave clients hanging
  • Data corruption: In-flight transactions get interrupted, leaving databases in inconsistent states
  • Compliance violations: Financial services may face regulatory penalties for data integrity failures

Core Concepts: The Three Types of Health Checks

Kubernetes provides three distinct probe types, each serving a specific purpose:

1. Liveness Probe: “Is the process alive?”

  • Detects deadlocks and unrecoverable states
  • Should be fast and simple (e.g., HTTP GET /healthz)
  • Failure triggers container restart
  • Common mistake: Making this check too complex

2. Readiness Probe: “Can the service handle traffic?”

  • Validates all critical dependencies are available
  • Prevents routing traffic to pods that aren’t ready
  • Should perform “deep” checks of dependencies
  • Common mistake: Using the same check as liveness

3. Startup Probe: “Is the application still initializing?”

  • Provides grace period for slow-starting containers
  • Disables liveness/readiness probes until successful
  • Prevents restart loops during initialization
  • Common mistake: Not using it for slow-starting apps

The Hidden Dangers of Unsafe Shutdowns

While graceful shutdown is ideal, it’s not always possible. Kubernetes will send SIGKILL after the termination grace period, and infrastructure failures can terminate pods instantly. This creates serious risks:

Data Corruption Scenarios

Financial Transaction Example:

// DANGEROUS: Non-atomic operation
func (s *PaymentService) ProcessPayment(req *PaymentRequest) error {
    // Step 1: Debit source account
    if err := s.debitAccount(req.FromAccount, req.Amount); err != nil {
        return err
    }
    
    // ???? SIGKILL here leaves money debited but not credited
    // Step 2: Credit destination account  
    if err := s.creditAccount(req.ToAccount, req.Amount); err != nil {
        // Money is lost! Source debited but destination not credited
        return err
    }
    
    // Step 3: Record transaction
    return s.recordTransaction(req)
}

E-commerce Inventory Example:

// DANGEROUS: Race condition during shutdown
func (s *InventoryService) ReserveItem(req *ReserveRequest) error {
    // Check availability
    if s.getStock(req.ItemID) < req.Quantity {
        return ErrInsufficientStock
    }
    
    // ???? SIGKILL here can cause double-reservation
    // Another request might see the same stock level
    
    // Reserve the item
    return s.updateStock(req.ItemID, -req.Quantity)
}

RTO/RPO Impact

Recovery Time Objective (RTO): How quickly can we restore service?

  • Poor lifecycle management increases startup time
  • Services may need manual intervention to reach consistent state
  • Cascading failures extend recovery time across the entire system

Recovery Point Objective (RPO): How much data can we afford to lose?

  • Unsafe shutdowns can corrupt recent transactions
  • Without idempotency, replay of messages may create duplicates
  • Data inconsistencies may not be detected until much later

The Anti-Entropy Solution

Since graceful shutdown isn’t always possible, production systems need reconciliation processes to detect and repair inconsistencies:

// Anti-entropy pattern for data consistency
type ReconciliationService struct {
    paymentDB    PaymentDatabase
    accountDB    AccountDatabase
    auditLog     AuditLogger
    alerting     AlertingService
}

func (r *ReconciliationService) ReconcilePayments(ctx context.Context) error {
    // Find payments without matching account entries
    orphanedPayments, err := r.paymentDB.FindOrphanedPayments(ctx)
    if err != nil {
        return err
    }
    
    for _, payment := range orphanedPayments {
        // Check if this was a partial transaction
        sourceDebit, _ := r.accountDB.GetTransaction(payment.FromAccount, payment.ID)
        destCredit, _ := r.accountDB.GetTransaction(payment.ToAccount, payment.ID)
        
        switch {
        case sourceDebit != nil && destCredit == nil:
            // Complete the transaction
            if err := r.creditAccount(payment.ToAccount, payment.Amount); err != nil {
                r.alerting.SendAlert("Failed to complete orphaned payment", payment.ID)
                continue
            }
            r.auditLog.RecordReconciliation("completed_payment", payment.ID)
            
        case sourceDebit == nil && destCredit != nil:
            // Reverse the credit
            if err := r.debitAccount(payment.ToAccount, payment.Amount); err != nil {
                r.alerting.SendAlert("Failed to reverse orphaned credit", payment.ID)
                continue
            }
            r.auditLog.RecordReconciliation("reversed_credit", payment.ID)
            
        default:
            // Both or neither exist - needs investigation
            r.alerting.SendAlert("Ambiguous payment state", payment.ID)
        }
    }
    
    return nil
}

// Run reconciliation periodically
func (r *ReconciliationService) Start(ctx context.Context) {
    ticker := time.NewTicker(5 * time.Minute)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.ReconcilePayments(ctx); err != nil {
                log.Printf("Reconciliation failed: %v", err)
            }
        }
    }
}

Building a Resilient Service: Complete Example

Let’s build a production-ready service that demonstrates all best practices. We’ll create two versions: one with anti-patterns (bad-service) and one with best practices (good-service).

Sequence diagram of a typical API with proper Kubernetes and Istio configuration.

The Application Code

//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative api/demo.proto

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "net"
    "net/http"
    "os"
    "os/signal"
    "sync/atomic"
    "syscall"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    health "google.golang.org/grpc/health/grpc_health_v1"
    "google.golang.org/grpc/status"
)

// Service represents our application with health state
type Service struct {
    isHealthy         atomic.Bool
    isShuttingDown    atomic.Bool
    activeRequests    atomic.Int64
    dependencyHealthy atomic.Bool
}

// HealthChecker implements the gRPC health checking protocol
type HealthChecker struct {
    svc *Service
}

func (h *HealthChecker) Check(ctx context.Context, req *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
    service := req.GetService()
    
    // Liveness: Simple check - is the process responsive?
    if service == "" || service == "liveness" {
        if h.svc.isShuttingDown.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        return &health.HealthCheckResponse{
            Status: health.HealthCheckResponse_SERVING,
        }, nil
    }
    
    // Readiness: Deep check - can we handle traffic?
    if service == "readiness" {
        // Check application health
        if !h.svc.isHealthy.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        
        // Check critical dependencies
        if !h.svc.dependencyHealthy.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        
        // Check if shutting down
        if h.svc.isShuttingDown.Load() {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        
        return &health.HealthCheckResponse{
            Status: health.HealthCheckResponse_SERVING,
        }, nil
    }
    
    // Synthetic readiness: Complex business logic check for monitoring
    if service == "synthetic-readiness" {
        // Simulate a complex health check that validates business logic
        // This would make actual API calls, database queries, etc.
        if !h.performSyntheticCheck(ctx) {
            return &health.HealthCheckResponse{
                Status: health.HealthCheckResponse_NOT_SERVING,
            }, nil
        }
        return &health.HealthCheckResponse{
            Status: health.HealthCheckResponse_SERVING,
        }, nil
    }
    
    return nil, status.Errorf(codes.NotFound, "unknown service: %s", service)
}

func (h *HealthChecker) performSyntheticCheck(ctx context.Context) bool {
    // In a real service, this would:
    // 1. Create a test transaction
    // 2. Query the database
    // 3. Call dependent services
    // 4. Validate the complete flow works
    return h.svc.isHealthy.Load() && h.svc.dependencyHealthy.Load()
}

func (h *HealthChecker) Watch(req *health.HealthCheckRequest, server health.Health_WatchServer) error {
    return status.Error(codes.Unimplemented, "watch not implemented")
}

// DemoServiceServer implements your business logic
type DemoServiceServer struct {
    UnimplementedDemoServiceServer
    svc *Service
}

func (s *DemoServiceServer) ProcessRequest(ctx context.Context, req *ProcessRequest) (*ProcessResponse, error) {
    s.svc.activeRequests.Add(1)
    defer s.svc.activeRequests.Add(-1)
    
    // Simulate processing
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case <-time.After(100 * time.Millisecond):
        return &ProcessResponse{
            Result: fmt.Sprintf("Processed: %s", req.GetData()),
        }, nil
    }
}

func main() {
    var (
        port         = flag.Int("port", 8080, "gRPC port")
        mgmtPort     = flag.Int("mgmt-port", 8090, "Management port")
        startupDelay = flag.Duration("startup-delay", 10*time.Second, "Startup delay")
    )
    flag.Parse()
    
    svc := &Service{}
    svc.dependencyHealthy.Store(true) // Assume healthy initially
    
    // Management endpoints for testing
    mux := http.NewServeMux()
    mux.HandleFunc("/toggle-health", func(w http.ResponseWriter, r *http.Request) {
        current := svc.dependencyHealthy.Load()
        svc.dependencyHealthy.Store(!current)
        fmt.Fprintf(w, "Dependency health toggled to: %v\n", !current)
    })
    mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintf(w, "active_requests %d\n", svc.activeRequests.Load())
        fmt.Fprintf(w, "is_healthy %v\n", svc.isHealthy.Load())
        fmt.Fprintf(w, "is_shutting_down %v\n", svc.isShuttingDown.Load())
    })
    
    mgmtServer := &http.Server{
        Addr:    fmt.Sprintf(":%d", *mgmtPort),
        Handler: mux,
    }
    
    // Start management server
    go func() {
        log.Printf("Management server listening on :%d", *mgmtPort)
        if err := mgmtServer.ListenAndServe(); err != http.ErrServerClosed {
            log.Fatalf("Management server failed: %v", err)
        }
    }()
    
    // Simulate slow startup
    log.Printf("Starting application (startup delay: %v)...", *startupDelay)
    time.Sleep(*startupDelay)
    svc.isHealthy.Store(true)
    log.Println("Application initialized and ready")
    
    // Setup gRPC server
    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    
    grpcServer := grpc.NewServer()
    RegisterDemoServiceServer(grpcServer, &DemoServiceServer{svc: svc})
    health.RegisterHealthServer(grpcServer, &HealthChecker{svc: svc})
    
    // Start gRPC server
    go func() {
        log.Printf("gRPC server listening on :%d", *port)
        if err := grpcServer.Serve(lis); err != nil {
            log.Fatalf("gRPC server failed: %v", err)
        }
    }()
    
    // Wait for shutdown signal
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    sig := <-sigCh
    
    log.Printf("Received signal: %v, starting graceful shutdown...", sig)
    
    // Graceful shutdown sequence
    svc.isShuttingDown.Store(true)
    svc.isHealthy.Store(false) // Fail readiness immediately
    
    // Stop accepting new requests
    grpcServer.GracefulStop()
    
    // Wait for active requests to complete
    timeout := time.After(30 * time.Second)
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-timeout:
            log.Println("Shutdown timeout reached, forcing exit")
            os.Exit(1)
        case <-ticker.C:
            active := svc.activeRequests.Load()
            if active == 0 {
                log.Println("All requests completed")
                goto shutdown
            }
            log.Printf("Waiting for %d active requests to complete...", active)
        }
    }
    
shutdown:
    // Cleanup
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    mgmtServer.Shutdown(ctx)
    
    log.Println("Graceful shutdown complete")
}

Kubernetes Manifests: Anti-Patterns vs Best Practices

Bad Service (Anti-Patterns)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: bad-service
  namespace: demo
spec:
  replicas: 2
  selector:
    matchLabels:
      app: bad-service
  template:
    metadata:
      labels:
        app: bad-service
      # MISSING: Critical Istio annotations!
    spec:
      # DEFAULT: Only 30s grace period
      containers:
      - name: app
        image: myregistry/demo-service:latest
        ports:
        - containerPort: 8080
          name: grpc
        - containerPort: 8090
          name: mgmt
        args: ["--startup-delay=45s"]  # Longer than default probe timeout!
        
        # ANTI-PATTERN: Identical liveness and readiness probes
        livenessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080"]
          initialDelaySeconds: 10
          periodSeconds: 10
          failureThreshold: 3  # Will fail after 40s total
          
        readinessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080"]  # Same as liveness!
          initialDelaySeconds: 10
          periodSeconds: 10
        
        # MISSING: No startup probe for slow initialization
        # MISSING: No preStop hook for graceful shutdown

Good Service (Best Practices)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: good-service
  namespace: demo
spec:
  replicas: 2
  selector:
    matchLabels:
      app: good-service
  template:
    metadata:
      labels:
        app: good-service
      annotations:
        # Critical for Istio/Envoy sidecar lifecycle management
        sidecar.istio.io/holdApplicationUntilProxyStarts: "true"
        proxy.istio.io/config: |
          proxyMetadata:
            EXIT_ON_ZERO_ACTIVE_CONNECTIONS: "true"
        sidecar.istio.io/proxyCPU: "100m"
        sidecar.istio.io/proxyMemory: "128Mi"
    spec:
      # Extended grace period: preStop (15s) + app shutdown (30s) + buffer (20s)
      terminationGracePeriodSeconds: 65
      
      containers:
      - name: app
        image: myregistry/demo-service:latest
        ports:
        - containerPort: 8080
          name: grpc
        - containerPort: 8090
          name: mgmt
        args: ["--startup-delay=45s"]
        
        # Resource management for predictable performance
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
        
        # Startup probe for slow initialization
        startupProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080", "-service=readiness"]
          initialDelaySeconds: 0
          periodSeconds: 5
          failureThreshold: 24  # 5s * 24 = 120s total startup time
          successThreshold: 1
        
        # Simple liveness check
        livenessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080", "-service=liveness"]
          initialDelaySeconds: 0  # Startup probe handles initialization
          periodSeconds: 10
          failureThreshold: 3
          timeoutSeconds: 5
        
        # Deep readiness check
        readinessProbe:
          exec:
            command: ["/bin/grpc_health_probe", "-addr=:8080", "-service=readiness"]
          initialDelaySeconds: 0
          periodSeconds: 5
          failureThreshold: 2
          successThreshold: 1
          timeoutSeconds: 5
        
        # Graceful shutdown coordination
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 15"]  # Allow LB to drain
        
        # Environment variables for cloud provider integration
        env:
        - name: CLOUD_PROVIDER
          value: "auto-detect"  # Works with GCP, AWS, Azure
        - name: ENABLE_PROFILING
          value: "true"

Istio Service Mesh: Beyond Basic Lifecycle Management

While proper health checks and graceful shutdown are foundational, Istio adds critical production-grade capabilities that dramatically improve fault tolerance:

Automatic Retries and Circuit Breaking

apiVersion: networking.istio.io/v1alpha3
kind: DestinationRule
metadata:
  name: payment-service
  namespace: demo
spec:
  host: payment-service.demo.svc.cluster.local
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        http1MaxPendingRequests: 100
        maxRequestsPerConnection: 2
    circuitBreaker:
      consecutiveErrors: 5
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50
    retryPolicy:
      attempts: 3
      perTryTimeout: 2s
      retryOn: 5xx,gateway-error,connect-failure,refused-stream
      retryRemoteLocalities: true

Key Benefits for Production Systems

  1. Automatic Request Retries: If a pod fails or becomes unavailable, Istio automatically retries requests to healthy instances
  2. Circuit Breaking: Prevents cascading failures by temporarily cutting off traffic to unhealthy services
  3. Load Balancing: Distributes traffic intelligently across healthy pods
  4. Mutual TLS: Secures service-to-service communication without code changes
  5. Observability: Provides detailed metrics, tracing, and logging for all inter-service communication
  6. Canary Deployments: Enables safe rollouts with automatic traffic shifting
  7. Rate Limiting: Protects services from being overwhelmed
  8. Timeout Management: Prevents hanging requests with configurable timeouts

Termination Grace Period Calculation

The critical formula for calculating termination grace periods:

terminationGracePeriodSeconds = preStop delay + application shutdown timeout + buffer

Examples:
- Simple service: 10s + 20s + 5s = 35s
- Complex service: 15s + 45s + 5s = 65s
- Batch processor: 30s + 120s + 10s = 160s

Important: Services requiring more than 90-120 seconds to shut down should be re-architected using checkpoint-and-resume patterns.

Advanced Patterns for Production

1. Idempotency: Handling Duplicate Requests

Critical for production: When pods restart or network issues occur, clients may retry requests. Without idempotency, this can cause duplicate transactions, corrupted state, or financial losses. This is mandatory for all state-modifying operations.

package idempotency

import (
    "context"
    "crypto/sha256"
    "encoding/hex"
    "time"
    "sync"
    "errors"
)

var (
    ErrDuplicateRequest = errors.New("duplicate request detected")
    ErrProcessingInProgress = errors.New("request is currently being processed")
)

// IdempotencyStore tracks request execution with persistence
type IdempotencyStore struct {
    mu        sync.RWMutex
    records   map[string]*Record
    persister PersistenceLayer // Database or Redis for durability
}

type Record struct {
    Key         string
    Response    interface{}
    Error       error
    Status      ProcessingStatus
    ExpiresAt   time.Time
    CreatedAt   time.Time
    ProcessedAt *time.Time
}

type ProcessingStatus int

const (
    StatusPending ProcessingStatus = iota
    StatusProcessing
    StatusCompleted
    StatusFailed
)

// ProcessIdempotent ensures exactly-once processing semantics
func (s *IdempotencyStore) ProcessIdempotent(
    ctx context.Context,
    key string,
    ttl time.Duration,
    fn func() (interface{}, error),
) (interface{}, error) {
    // Check if we've seen this request before
    s.mu.RLock()
    record, exists := s.records[key]
    s.mu.RUnlock()
    
    if exists {
        switch record.Status {
        case StatusCompleted:
            if time.Now().Before(record.ExpiresAt) {
                return record.Response, record.Error
            }
        case StatusProcessing:
            return nil, ErrProcessingInProgress
        case StatusFailed:
            if time.Now().Before(record.ExpiresAt) {
                return record.Response, record.Error
            }
        }
    }
    
    // Mark as processing
    record = &Record{
        Key:       key,
        Status:    StatusProcessing,
        ExpiresAt: time.Now().Add(ttl),
        CreatedAt: time.Now(),
    }
    
    s.mu.Lock()
    s.records[key] = record
    s.mu.Unlock()
    
    // Persist the processing state
    if err := s.persister.Save(ctx, record); err != nil {
        return nil, err
    }
    
    // Execute the function
    response, err := fn()
    processedAt := time.Now()
    
    // Update record with result
    s.mu.Lock()
    record.Response = response
    record.Error = err
    record.ProcessedAt = &processedAt
    if err != nil {
        record.Status = StatusFailed
    } else {
        record.Status = StatusCompleted
    }
    s.mu.Unlock()
    
    // Persist the final state
    s.persister.Save(ctx, record)
    
    return response, err
}

// Example: Idempotent payment processing
func (s *PaymentService) ProcessPayment(ctx context.Context, req *PaymentRequest) (*PaymentResponse, error) {
    // Generate idempotency key from request
    key := generateIdempotencyKey(req)
    
    result, err := s.idempotencyStore.ProcessIdempotent(
        ctx,
        key,
        24*time.Hour, // Keep records for 24 hours
        func() (interface{}, error) {
            // Atomic transaction processing
            return s.processPaymentTransaction(ctx, req)
        },
    )
    
    if err != nil {
        return nil, err
    }
    return result.(*PaymentResponse), nil
}

// Atomic transaction processing
func (s *PaymentService) processPaymentTransaction(ctx context.Context, req *PaymentRequest) (*PaymentResponse, error) {
    // Use database transaction for atomicity
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()
    
    // Step 1: Validate accounts
    if err := s.validateAccounts(ctx, tx, req); err != nil {
        return nil, err
    }
    
    // Step 2: Process payment atomically
    paymentID, err := s.executePayment(ctx, tx, req)
    if err != nil {
        return nil, err
    }
    
    // Step 3: Commit transaction
    if err := tx.Commit(); err != nil {
        return nil, err
    }
    
    return &PaymentResponse{
        PaymentID: paymentID,
        Status:    "completed",
        Timestamp: time.Now(),
    }, nil
}

2. Checkpoint and Resume: Long-Running Operations

For operations that may exceed the termination grace period, implement checkpointing:

package checkpoint

import (
    "context"
    "encoding/json"
    "time"
)

type CheckpointStore interface {
    Save(ctx context.Context, id string, state interface{}) error
    Load(ctx context.Context, id string, state interface{}) error
    Delete(ctx context.Context, id string) error
}

type BatchProcessor struct {
    store          CheckpointStore
    checkpointFreq int
}

type BatchState struct {
    JobID      string    `json:"job_id"`
    TotalItems int       `json:"total_items"`
    Processed  int       `json:"processed"`
    LastItem   string    `json:"last_item"`
    StartedAt  time.Time `json:"started_at"`
}

func (p *BatchProcessor) ProcessBatch(ctx context.Context, jobID string, items []string) error {
    // Try to resume from checkpoint
    state := &BatchState{JobID: jobID}
    if err := p.store.Load(ctx, jobID, state); err == nil {
        log.Printf("Resuming job %s from item %d", jobID, state.Processed)
        items = items[state.Processed:]
    } else {
        // New job
        state = &BatchState{
            JobID:      jobID,
            TotalItems: len(items),
            Processed:  0,
            StartedAt:  time.Now(),
        }
    }
    
    // Process items with periodic checkpointing
    for i, item := range items {
        select {
        case <-ctx.Done():
            // Save progress before shutting down
            state.LastItem = item
            return p.store.Save(ctx, jobID, state)
        default:
            // Process item
            if err := p.processItem(ctx, item); err != nil {
                return err
            }
            
            state.Processed++
            state.LastItem = item
            
            // Checkpoint periodically
            if state.Processed%p.checkpointFreq == 0 {
                if err := p.store.Save(ctx, jobID, state); err != nil {
                    log.Printf("Failed to checkpoint: %v", err)
                }
            }
        }
    }
    
    // Job completed, remove checkpoint
    return p.store.Delete(ctx, jobID)
}

3. Circuit Breaker Pattern for Dependencies

Protect your service from cascading failures:

package circuitbreaker

import (
    "context"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota
    StateOpen
    StateHalfOpen
)

type CircuitBreaker struct {
    mu              sync.RWMutex
    state           State
    failures        int
    successes       int
    lastFailureTime time.Time
    
    maxFailures      int
    resetTimeout     time.Duration
    halfOpenRequests int
}

func (cb *CircuitBreaker) Call(ctx context.Context, fn func() error) error {
    cb.mu.RLock()
    state := cb.state
    cb.mu.RUnlock()
    
    if state == StateOpen {
        // Check if we should transition to half-open
        cb.mu.Lock()
        if time.Since(cb.lastFailureTime) > cb.resetTimeout {
            cb.state = StateHalfOpen
            cb.successes = 0
            state = StateHalfOpen
        }
        cb.mu.Unlock()
    }
    
    if state == StateOpen {
        return ErrCircuitOpen
    }
    
    err := fn()
    
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailureTime = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = StateOpen
            log.Printf("Circuit breaker opened after %d failures", cb.failures)
        }
        return err
    }
    
    if state == StateHalfOpen {
        cb.successes++
        if cb.successes >= cb.halfOpenRequests {
            cb.state = StateClosed
            cb.failures = 0
            log.Println("Circuit breaker closed")
        }
    }
    
    return nil
}

Testing Your Implementation

Manual Testing Guide

Test 1: Startup Race Condition

Setup:

# Deploy both services
kubectl apply -f k8s/bad-service.yaml
kubectl apply -f k8s/good-service.yaml

# Watch pods in separate terminal
watch kubectl get pods -n demo

Test the bad service:

# Force restart
kubectl delete pod -l app=bad-service -n demo

# Observe: Pod will enter CrashLoopBackOff due to liveness probe
# killing it before 45s startup completes

Test the good service:

# Force restart
kubectl delete pod -l app=good-service -n demo

# Observe: Pod stays in 0/1 Ready state for ~45s, then becomes ready
# No restarts occur thanks to startup probe

Test 2: Data Consistency Under Failure

Setup:

# Deploy payment service with reconciliation enabled
kubectl apply -f k8s/payment-service.yaml

# Start payment traffic generator
kubectl run payment-generator --image=payment-client:latest \
  --restart=Never --rm -it -- \
  --target=payment-service.demo.svc.cluster.local:8080 \
  --rate=10 --duration=60s

Simulate SIGKILL during transactions:

# In another terminal, kill pods abruptly
while true; do
  kubectl delete pod -l app=payment-service -n demo --force --grace-period=0
  sleep 30
done

Verify reconciliation:

# Check for data inconsistencies
kubectl logs -l app=payment-service -n demo | grep "inconsistency"

# Monitor reconciliation metrics
kubectl port-forward svc/payment-service 8090:8090
curl http://localhost:8090/metrics | grep consistency

Test 3: RTO/RPO Validation

Disaster Recovery Simulation:

# Simulate regional failure
kubectl patch deployment payment-service -n demo \
  --patch '{"spec":{"replicas":0}}'

# Measure RTO - time to restore service
start_time=$(date +%s)
kubectl patch deployment payment-service -n demo \
  --patch '{"spec":{"replicas":3}}'

# Wait for all pods to be ready
kubectl wait --for=condition=ready pod -l app=payment-service -n demo --timeout=900s
end_time=$(date +%s)
rto=$((end_time - start_time))

echo "RTO: ${rto} seconds"
if [ $rto -le 900 ]; then
  echo "? RTO target met (15 minutes)"
else
  echo "? RTO target exceeded"
fi

Test 4: Istio Resilience Features

Automatic Retry Testing:

# Deploy with fault injection
kubectl apply -f istio/fault-injection.yaml

# Generate requests with chaos header
for i in {1..100}; do
  grpcurl -H "x-chaos-test: true" -plaintext \
    payment-service.demo.svc.cluster.local:8080 \
    PaymentService/ProcessPayment \
    -d '{"amount": 100, "currency": "USD"}'
done

# Check Istio metrics for retry behavior
kubectl exec -n istio-system deployment/istiod -- \
  pilot-agent request GET stats/prometheus | grep retry

Monitoring and Observability

RTO/RPO Considerations

Recovery Time Objective (RTO): Target time to restore service after an outage Recovery Point Objective (RPO): Maximum acceptable data loss

Your service lifecycle design directly impacts these critical business metrics:

package monitoring

import (
    "time"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    // RTO-related metrics
    ServiceStartupTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
        Name: "service_startup_duration_seconds",
        Help: "Time from pod start to service ready",
        Buckets: []float64{1, 5, 10, 30, 60, 120, 300, 600}, // Up to 10 minutes
    }, []string{"service", "version"})
    
    ServiceRecoveryTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
        Name: "service_recovery_duration_seconds", 
        Help: "Time to recover from failure state",
        Buckets: []float64{1, 5, 10, 30, 60, 300, 900}, // Up to 15 minutes
    }, []string{"service", "failure_type"})
    
    // RPO-related metrics
    LastCheckpointAge = promauto.NewGaugeVec(prometheus.GaugeOpts{
        Name: "last_checkpoint_age_seconds",
        Help: "Age of last successful checkpoint",
    }, []string{"service", "checkpoint_type"})
    
    DataConsistencyChecks = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "data_consistency_checks_total",
        Help: "Total number of consistency checks performed",
    }, []string{"service", "check_type", "status"})
    
    InconsistencyDetected = promauto.NewCounterVec(prometheus.CounterOpts{
        Name: "data_inconsistencies_detected_total",
        Help: "Total number of data inconsistencies detected",
    }, []string{"service", "inconsistency_type", "severity"})
)

Grafana Dashboard

{
  "dashboard": {
    "title": "Service Lifecycle - Business Impact",
    "panels": [
      {
        "title": "RTO Compliance",
        "description": "Percentage of recoveries meeting RTO target (15 minutes)",
        "targets": [{
          "expr": "100 * (histogram_quantile(0.95, service_recovery_duration_seconds_bucket) <= 900)"
        }],
        "thresholds": [
          {"value": 95, "color": "green"},
          {"value": 90, "color": "yellow"},
          {"value": 0, "color": "red"}
        ]
      },
      {
        "title": "RPO Risk Assessment",
        "description": "Data at risk based on checkpoint age",
        "targets": [{
          "expr": "last_checkpoint_age_seconds / 60"
        }],
        "unit": "minutes"
      },
      {
        "title": "Data Consistency Status",
        "targets": [{
          "expr": "rate(data_inconsistencies_detected_total[5m])"
        }]
      }
    ]
  }
}

Production Readiness Checklist

Before deploying to production, ensure your service meets these criteria:

Application Layer

  • [ ] Implements separate liveness and readiness endpoints
  • [ ] Readiness checks validate all critical dependencies
  • [ ] Graceful shutdown drains in-flight requests
  • [ ] Idempotency for all state-modifying operations
  • [ ] Anti-entropy/reconciliation processes implemented
  • [ ] Circuit breakers for external dependencies
  • [ ] Checkpoint-and-resume for long-running operations
  • [ ] Structured logging with correlation IDs
  • [ ] Metrics for startup, shutdown, and health status

Kubernetes Configuration

  • [ ] Startup probe for slow-initializing services
  • [ ] Distinct liveness and readiness probes
  • [ ] Calculated terminationGracePeriodSeconds based on actual shutdown time
  • [ ] PreStop hooks for load balancer draining
  • [ ] Resource requests and limits defined
  • [ ] PodDisruptionBudget for availability
  • [ ] Anti-affinity rules for high availability

Service Mesh Integration

  • [ ] Istio sidecar lifecycle annotations (holdApplicationUntilProxyStarts)
  • [ ] Istio automatic retry policies configured
  • [ ] Circuit breaker configuration in DestinationRule
  • [ ] Distributed tracing enabled
  • [ ] mTLS for service-to-service communication

Data Integrity & Recovery

  • [ ] RTO/RPO metrics tracked and alerting configured
  • [ ] Reconciliation processes tested with Game Day exercises
  • [ ] Chaos engineering tests validate failure scenarios
  • [ ] Synthetic monitoring for end-to-end business flows
  • [ ] Backup and restore procedures documented and tested

Common Pitfalls and Solutions

1. My service keeps restarting during deployment:

Symptom: Pods enter CrashLoopBackOff during rollout

Common Causes:

  • Liveness probe starts before application is ready
  • Startup time exceeds probe timeout
  • Missing startup probe

Solution:

startupProbe:
  httpGet:
    path: /healthz
    port: 8080
  failureThreshold: 30  # 30 * 10s = 5 minutes
  periodSeconds: 10

2. Data corruption during pod restarts:

Symptom: Inconsistent database state after deployments

Common Causes:

  • Non-atomic operations
  • Missing idempotency
  • No reconciliation processes

Solution:

// Implement atomic operations with database transactions
tx, err := db.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

// All operations within transaction
if err := processPayment(tx, req); err != nil {
    return err // Automatic rollback
}

return tx.Commit()

3. Service mesh sidecar issues:

Symptom: ECONNREFUSED errors on startup

Common Causes:

  • Application starts before sidecar is ready
  • Sidecar terminates before application

Solution:

annotations:
  sidecar.istio.io/holdApplicationUntilProxyStarts: "true"
  proxy.istio.io/config: |
    proxyMetadata:
      EXIT_ON_ZERO_ACTIVE_CONNECTIONS: "true"

Conclusion

Service lifecycle management is not just about preventing outages—it’s about building systems that are predictable, observable, and resilient to the inevitable failures that occur in distributed systems. This allows:

  • Zero-downtime deployments: Services gracefully handle rollouts without data loss.
  • Improved reliability: Proper health checks prevent cascading failures.
  • Better observability: Clear signals about service state and data consistency.
  • Faster recovery: Services self-heal from transient failures.
  • Data integrity: Idempotency and reconciliation prevent corruption.
  • Compliance readiness: Meet RTO/RPO requirements for disaster recovery.
  • Financial protection: Prevent duplicate transactions and data corruption that could cost millions.

The difference between a service that “works on my machine” and one that thrives in production lies in these details. Whether you’re running on GKE, EKS, or AKS, these patterns form the foundation of production-ready microservices.

Want to test these patterns yourself? The complete code examples and deployment manifests are available on GitHub.

July 22, 2024

Security Challenges in Microservice Architecture

Filed under: Security,Web Services — admin @ 3:58 pm

Abstract

Microservice architectures have gained wide adoption due to their ability to deliver scalability, agility, and resilience. However, the distributed nature of microservices also introduces new security challenges that must be addressed proactively. Security in distributed systems revolves around three fundamental principles: confidentiality, integrity, and availability (CIA). Ensuring the CIA triad is maintained is crucial for protecting sensitive data and ensuring system reliability for MicroServices.

  • Confidentiality ensures that sensitive information is accessible only to authorized users using encryption, access controls, and strong authentication mechanisms [NIST SP 800-53 Rev. 5].
  • Integrity guarantees that data remains accurate and unaltered during storage or transmission using cryptographic hash functions, digital signatures, and data validation processes [Software and Data Integrity Failures].
  • Availability ensures that systems and data are accessible to authorized users when needed. This involves implementing redundancy, failover mechanisms, and regular maintenance [ISO/IEC 27001:2022].

Below, we delve into these principles and the practices essential for building secure distributed systems. We then explore the potential security risks and failures associated with microservices and offer guidance on mitigating them.

Security Practices

The following key practices help establish a strong security posture:

  • Strong Identity Management: Implement robust identity and access management (IAM) systems to ensure that only authenticated and authorized users can access system resources. [AWS IAM Best Practices].
  • Fail Safe: Maintain confidentiality, integrity and availability when an error condition is detected.
  • Defense in Depth: Employ multiple layers of security controls to protect data and systems. This includes network segmentation, firewalls, intrusion detection systems (IDS), and secure coding practices [Microsoft’s Defense in Depth].
  • Least Privilege: A person or process is given only the minimum level of access rights (privileges) that is necessary for that person or process to complete an assigned operation.
  • Separation of Duties: This principle, also known as separation of privilege, requires multiple conditions to be met for task completion, ensuring no single entity has complete control, thereby enhancing security by distributing responsibilities.
  • Zero Trust Security: Not trust any entity by default, regardless of its location, and verification is required from everyone trying to access resources. [NIST Zero Trust Architecture].
  • Auditing and Monitoring: Implement comprehensive logging, monitoring, and auditing practices to detect and respond to security incidents [Center for Internet Security (CIS) Controls].
  • Protecting Data in Motion and at Rest: Use encryption to protect data during transmission (data in motion) and when stored (data at rest). [NIST’s Guide to Storage Encryption Technologies for End User Devices].

Security Methodologies and Frameworks

Following practices ensure that security is integrated throughout the development lifecycle and that potential threats are systematically addressed.

  • DevSecOps: Integrate security practices into the DevOps process to shift security left, addressing issues early in the software development lifecycle.
  • Security by Design (SbD): Incorporate security by design process to ensure robust and secure systems [OWASP Secure Product Design]. The key principles of security by design encompass Memory safe programming languages, Static and dynamic application security testing, Defense-in-Depth, Single sign-on, Secure Logging, Data classification, Secure random number generators, Limit the scope of credentials and access, Address Space Location Randomization(ASLR) and Kernel ASLR (KASLR), Encrypt data at rest optionally with customer managed keys, Encrypt data in transit, Data isolation with multi-tenancy support, Strong secrets management, Principle of Least Privilege and Separation of Duties, and Principle of Security-in-the-Open.
  • Threat Modeling Techniques: Threat modeling involves identifying potential threats to your system, understanding what can go wrong, and determining how to mitigate these risks. Following threat model techniques can be used for identifying and categorizing potential security threats such as
    • STRIDE (Spoofing, Tampering, Repudiation, Information Disclosure, Denial of Service, Elevation of Privilege) categorizes different types of threats.
    • PASTA (Process for Attack Simulation and Threat Analysis) a risk-centric threat modeling methodology [SEI Threat Modeling].
    • VAST (Visual, Agile, and Simple Threat) scales and integrates with Agile development processes.
  • CAPEC (Common Attack Pattern Enumeration and Classification): A comprehensive dictionary of known attack patterns, providing detailed information about common threats and mitigation techniques.
  • OCTAVE (Operationally Critical Threat, Asset, and Vulnerability Evaluation): A risk-based strategic assessment and planning technique for cybersecurity by the Carnegie Mellon University’s Software Engineering Institute.
  • OWASP Application Security Verification Standard (ASVS): A standard for designing, building, and testing secure applications.
  • OWASP Top Ten: Top Web Application Security Risks such as:
    • A01: Broken Access Control.
    • A02: Cryptographic Failures.
    • A03: Injection including Cross-site Scripting.
    • A04: Insecure Design.
    • A05: Security Misconfiguration.
    • A06: Vulnerable and Outdated Components.
    • A07: Identification and Broken Authentication Failures.
    • A08: Software and Data Integrity Failures including Insecure Deserialization.
    • A09: Security Logging and Monitoring Failures including various failures impacting visibility and incident response.
    • A10: Server-Side Request Forgery (SSRF).
  • CWE TOP 25 Most Dangerous Software Errors:
    • CWE-787: Out-of-bounds Write
    • CWE-79: Improper Neutralization of Input During Web Page Generation (‘Cross-site Scripting’)
    • CWE-89: Improper Neutralization of Special Elements used in an SQL Command (‘SQL Injection’)
    • CWE-416: Use After Free
    • CWE-78: Improper Neutralization of Special Elements used in an OS Command (‘OS Command Injection’)
    • CWE-20: Improper Input Validation
    • CWE-125: Out-of-bounds Read
    • CWE-22: Improper Limitation of a Pathname to a Restricted Directory (‘Path Traversal’)
    • CWE-352: Cross-Site Request Forgery (CSRF)
    • CWE-434: Unrestricted Upload of File with Dangerous Type
    • CWE-862: Missing Authorization
    • CWE-476: NULL Pointer Dereference
    • CWE-287: Improper Authentication
    • CWE-190: Integer Overflow or Wraparound
    • CWE-502: Deserialization of Untrusted Data
    • CWE-77: Improper Neutralization of Special Elements used in a Command (‘Command Injection’)
    • CWE-119: Improper Restriction of Operations within the Bounds of a Memory Buffer
    • CWE-798: Use of Hard-coded Credentials
    • CWE-918: Server-Side Request Forgery (SSRF)
    • CWE-306: Missing Authentication for Critical Function
    • CWE-362: Concurrent Execution using Shared Resource with Improper Synchronization (‘Race Condition’)
    • CWE-269: Improper Privilege Management
    • CWE-94: Improper Control of Generation of Code (‘Code Injection’)
    • CWE-863: Incorrect Authorization
    • CWE-276: Incorrect Default Permissions
  • SAMM (Software Assurance Maturity Model) A framework for analyzing software security practices.
  • CERT (Computer Emergency Response Team) Coding Standards: Standards for secure coding guidelines developed by the Software Engineering Institute (SEI) at Carnegie Mellon University.
  • SANS Secure Coding Practices: provides secure coding resources.
  • NIST (National Institute of Standards and Technology) Cybersecurity Framework: A comprehensive cybersecurity framework that includes guidelines for secure software development and risk management.
  • ISO/IEC 27034 (Application Security): An international standard provides guidance on information security for application services across their entire lifecycle, including design, development, testing, and maintenance.
  • BSIMM (Building Security In Maturity Model): A framework that helps organizations plan, execute, and measure their software security initiatives.
  • SAFECode (Software Assurance Forum for Excellence in Code): A non-profit organization for secure software development.
  • DREAD(Damage potential, Reproducibility, Exploitability, Affected users, and Discoverability: A model to rate and prioritize risks.

Security Incidents

Security incidents often result from inadequate security measures and oversight, highlighting the importance of rigorous security practices across various aspects of system management and software development.

  • Static Analysis of Source Code: The Heartbleed vulnerability in the OpenSSL cryptographic library allowed attackers to read sensitive memory contents and went undetected for over two years due to lack of static analysis and code reviews.
  • Scope of Privileges and Credentials: In Capital One Data Breach (2019), a former AWS employee exploited misconfigured web application firewall (WAF) credentials to gain unauthorized access to Capital One’s AWS environment, leading to the theft of over 100 million customer records that cost Capital One $270m.
  • Random numbers: Lack of secure random number generation, encryption algorithms and encryption configuration have caused numerous security breaches such as security deficiencies in IoT devices due to bad random number generation, factory-default passwords with security cameras, and attacking SSL with with RC4.
  • Data Classification: Improperly classifying and handling data based on its sensitivity and criticality have been source of security incidents like Equifax Data Breach (2017), which exposed personal information of over 143 million consumers and McDonald’s Data Leak (2017) that leaked personal information about 2.2 million users.
  • Secure Logging: Failure to implement secure logging led incidents like Apache Log4j Vulnerability (2021) that affected numerous applications and systems. Similarly, the lack of logging made it difficult to detect and investigate SolarWinds Supply Chain Attack (2020) that compromised numerous government agencies and companies.
  • Unauthorized Access to Production Data: Failing to implement appropriate controls and policies for governing production data use has led to significant data breaches. For example:
  • Filesystem Security: Failing to properly configure filesystem security led to critical issues such as Dirty Cow Vulnerability (2016) that caused privilege escalation vulnerability, and Shellshock Vulnerability (2014) that allowed remote code execution by exploiting vulnerabilities.
  • Memory protection with ASLR and KASLR: Failing to implement Address Space Layout Randomization (ASLR) and Kernel Address Space Layout Randomization (KASLR) led to the Linux Kernel Flaw (CVE-2024-0646), which exposed systems to privilege escalation attacks.
  • Code Signing: Failing to properly securely signing code with cryptographic digital signatures led to incidents like ASUS Live Update Hack (2019) that affected about 1 million users and Stuxnet Worm (2010) that targeted programmable logic controllers (PLCs).
  • Data Integrity: Failure to implement data integrity verification with cryptographic hashing, digital signatures, or checksums can lead to incidents like:
    • Petya Ransomware Attack (2017): The Petya ransomware, specifically the “NotPetya” variant employed advanced propagation methods, including leveraging legitimate Windows tools and exploiting known vulnerabilities like EternalBlue and EternalRomance.
    • Bangladesh Bank Cyber Heist (2016): Hackers compromised the bank’s systems and initiated fraudulent SWIFT transactions due to the lack of appropriate data integrity controls.
  • Data Privacy: implementing controls to protect data privacy using data minimization, anonymization, encryption, access controls, and compliance with GDPR/CCPA regulations can prevent incidents like:
  • Customer Workloads in Multi-tenant environments: Failing to implement proper security controls and isolation mechanisms when executing customer-provided workloads in a multi-tenant environment can lead to incidents like:
    • Azure Functions Vulnerability: Researchers discovered a vulnerability in Azure Functions that allows privilege escalation bug to potentially permitting an attacker to “plant a backdoor which would have run in every Function invocation”.
    • Docker Container Escape Vulnerability (2019): a vulnerability in runC was reported by its maintainers that affects Docker containers to gain root-level access on the host.
  • Certificate Revocation Validation: Verifying that the digital certificates used for authentication and encryption have not been revoked or compromised using a certificate revocation list (CRL) or using the Online Certificate Status Protocol (OCSP) can prevent incidents like:
  • Encryption and Key Rotation/Lengths: Failures in encryption and key management have led to significant security breaches:
  • Secure Configuration: Failure to implement secure configurations and changes and change management can lead to incidents like AWS S3 Bucket Misconfiguration (2017) where sensitive data from various organizations was exposed due to misconfigured AWS S3 bucket permissions.
  • Secure communication protocols: Failure to implement secure communication protocols, such as TLS/SSL, to protect data in transit and mitigate man- in-the-middle attacks can lead to incidents like:
    • POODLE Attack (2014) exploited a vulnerability in the way SSL 3.0 handled padding, allowing attackers to decrypt encrypted connections.
    • FREAK Attack (2015) exploited a vulnerability in legacy export-grade encryption to allow attackers to perform man-in-the-middle attacks.
  • Secure Authentication: Failure to implement secure authentication mechanisms, such as multi-factor authentication (MFA) and strong password policies can lead to unauthorized access like:
  • Secure Backup and Disaster Recovery: Failure to implement secure procedures for data backup and recovery, including encryption, access controls, and offsite storage, can lead to incidents such as:
    • Code Spaces Data Loss (2014): The Code Spaces was forced to shut down after a catastrophic data loss incident due to a lack of secure backup and recovery procedures.
    • Garmin Ransomware Attack (2020): Garmin was hit by a ransomware attack that disrupted its services and operations, highlighting the importance of secure data backup and recovery procedures.
  • Secure Caching: Implementing proper authentication, access controls, and encryption prevent data leaks or unauthorized access like:
    • Cloudflare Data Leak (2017): A vulnerability in Cloudflare’s cache servers resulted in sensitive data leaking across different customer websites, exposing sensitive information.
    • Memcached DDoS Attacks (2018): Misconfigured Memcached servers were exploited by attackers to launch massive distributed denial-of-service (DDoS) attacks.
  • Privilege Escalation (Least Privilege): Improper privilege management caused Edward Snowden Data Leaks (2013) which allowed Snowden to copy and exfiltrate sensitive data from classified systems. In Capital One Data Breach (2019) breach, an overly permissive IAM policy granted broader access than necessary, violating the principle of least privilege. In addition, a contingent authorization can be granted for temporary or limited access to resources or systems based on specific conditions or events.
  • SPF, DKIM, DMARC: implement the email authentication such as SPF (Sender Policy Framework), DKIM (DomainKeys Identified Mail) and DMARC (Domain-based Message Authentication, Reporting, and Conformance), and anti-spoofing mechanisms for all domains.
  • Multitenancy: Implement secure and isolated processing of service requests in a multi-tenant environment to prevent unauthorized access or data leakage between different tenants like:
  • Identity Management in Mobile applications: Insecure authentication, authorization, and user management mechanisms can lead to incidents like:
  • Secure Default Configuration: The systems and applications should be designed and configured to be secure by default to prevent incidents like:
  • Server-side Template Injection (SSTI): A vulnerability that occurs when user-supplied input is improperly interpreted as part of a server-side template engine, leading to the potential execution of arbitrary code.
    • SSTI in Apache Freemarker (2022): A SSTI vulnerability in the Apache Freemarker templating engine allowed remote code execution in various applications.
    • SSTI in Jinja2: Illustrates how Server-Side Template Injection (SSTI) payloads for Jinja2 can enable remote code execution.
  • Reverse Tabnabbing: A security vulnerability that occurs when a website you trust opens a link in a new tab and an attacker manipulates the website contents with malicious contents.
  • Regions and Partitions Isolation: Isolating the security and controls for each region and partition helps prevent security vulnerabilitiessuch as:
    • AWS US-East-1 Outage (2017): An operational issue in AWS’s US-East-1 region caused widespread service disruptions, affecting numerous customers and services hosted in that region.
    • Google Cloud Engine Outage (2016): A software bug in Google’s central data center caused cascading failures and service disruptions across multiple regions.
  • External Dependencies: Regularly reviewing and assessing external (Software-Defined Object) dependencies for potential security vulnerabilities can mitigate supply chain attacks and security breaches like:
    • Equifax Data Breach (2017): The breach was caused by the failure to patch a vulnerability in the Apache Struts open-source framework used by Equifax.
    • Log4Shell Vulnerability (2021): A critical vulnerability in the Log4j library, used for logging in Java applications, allowed attackers to execute arbitrary code on affected systems.
  • Circular Dependencies: Avoiding circular dependencies in software design can prevent incidents like:
    • Node.js Event-Stream Incident (2018): A malicious actor gained control of the popular “event-stream” package and injected malicious code.
    • Left-Pad Incident (2016): Although not a direct security breach, the removal of the “left-pad” npm package broke thousands of projects due to its circular dependencies.
    • Windows DLL Hijacking: Complex dependency management can lead to DLL hijacking that can execute malicious code.
  • Confused Deputy: The “Confused Deputy” problem, which occurs when a program inadvertently performs privileged operations on behalf of another entity, leading to security breaches:
    • Google Docs Phishing Attack (2017): Attackers exploited a feature in Google Docs to trick users into granting permission to a malicious app disguised as Google Docs.
    • Android Toast Overlay Attack (2017): A vulnerability in the Android operating system allowed malicious apps to display overlay Toast messages that could intercept user input or perform actions without user consent.
  • Validation Before Deserialization: Failure to validate the deserialized data can lead to security vulnerabilities, such as code execution or data tampering attacks like:
  • Generic Error Messages: implement proper error handling and return generic error messages rather than exposing sensitive information or implementation details.
  • Monitoring: Failure to implement proper logging and monitoring mechanisms can make it difficult to detect and respond to security incidents.
    • Uber’s Data Breach (2016): Uber failed to properly monitor and respond to security alerts, resulting in a delayed discovery of the data breach that exposed data of 57 million users and drivers.
    • Target Data Breach (2013): Inadequate logging and monitoring allowed attackers to remain undetected in Target’s systems for several weeks, resulting in the theft of millions of credit card records.
  • Secure Web Design: Implement input validation, secure session management, cross-site scripting (XSS) prevention, cross-site request forgery (CSRF) protection, and industry best practices to prevent incidents like:

Summary

Microservice architectures offer scalability, agility, and resilience but also present unique security challenges. Addressing these challenges requires adhering to the principles of confidentiality, integrity, and availability (CIA). Key security practices include strong identity management, defense in depth, principle of least privilege, zero trust security, comprehensive auditing and monitoring, and protecting data in motion and at rest. Security methodologies and frameworks like DevSecOps, Security by Design (SbD), and threat modeling techniques (e.g., STRIDE, PASTA) ensure robust security integration throughout the development lifecycle. Real-world incidents highlight the consequences of inadequate security measures. Implementing secure communication protocols, authentication mechanisms, and data backup procedures are crucial. Overall, a proactive and comprehensive approach to security, incorporating established practices and frameworks, is vital for safeguarding microservice architectures and distributed systems.

January 1, 2023

Consumer-driven and Producer-generated Contract Testing for REST APIs

Filed under: REST,Testing,Web Services — admin @ 9:43 pm

Though, REST standard for remote APIs is fairly loose but you can document API shape and structure using standards such as Open API and swagger specifications. The documented API specification ensures that both consumer/client and producer/server side abide by the specifications and prevent unexpected behavior. The API provider may also define service-level objective (SLO) so that API meets specified latency, security and availability and other service-level indicators (SLI). The API provider can use contract tests to validate the API interactions based on documented specifications. The contract testing includes both consumer and producer where a consumer makes an API request and the producer produces the result. The contract tests ensures that both consumer requests and producer responses match the contract request and response definitions per API specifications. These contract tests don’t just validate API schema instead they validate interactions between consumer and producer thus they can also be used to detect any breaking or backward incompatible changes so that consumers can continue using the APIs without any surprises.

In order to demonstrate contract testing, we will use api-mock-service library to generate mock/stub client requests and server responses based on Open API specifications or customized test contracts. These test contracts can be used by both consumers and producers for validating API contracts and evolve the contract tests as API specifications are updated.

Sample REST API Under Test

A sample eCommerce application will be used to demonstrate contracts testing. The application will use various REST APIs to implement online shopping experience. The primary purpose of this example is to show how different request structures can be passed to the REST APIs and then generate a valid result or an error condition for contract testing. You can view the Open-API specifications for this sample app here.

Customer REST APIs

The customer APIs define operations to manage customers who shop online, e.g.:

Customer APIs

Product REST APIs

The product APIs define operations to manage products that can be shopped online, e.g.:

Product APIs

Payment REST APIs

The payment APIs define operations to charge credit card and pay for online shopping, e.g.:

Payment APIs

Order REST APIs

The order APIs define operations to purchase a product from the online store and it will use above APIs to validate customers, check product inventory, charge payments and then store record of orders, e.g.:

Order APIs

Generating Stub Server Responses based on Open-API Specifications

In this example, stub server responses will be generated by api-mock-service based on open-api specifications ecommerce-api.json by starting the mock service first as follows:

docker pull plexobject/api-mock-service:latest
docker run -p 8000:8000 -p 9000:9000 -e HTTP_PORT=8000 -e PROXY_PORT=9000 \
	-e DATA_DIR=/tmp/mocks -e ASSET_DIR=/tmp/assets api-mock-service

And then uploading open-API specifications for ecommerce-api.json:

curl -H "Content-Type: application/yaml" --data-binary @ecommerce-api.json \
	http://localhost:8000/_oapi

It will generate test contracts with stub/mock responses for all APIs defined in the ecommerce-api.json Open API specification. For example, you can produce result of customers REST APIs, e.g.:

curl http://localhost:8000/customers

to produce:

[
  {
    "address": {
      "city": "PpCJyfKUomUOdhtxr",
      "countryCode": "US",
      "id": "ede97f59-2ef2-48e5-913f-4bce0f152603",
      "streetAddress": "Se somnis cibo oculi, die flammam petimus?",
      "zipCode": "06826"
    },
    "creditCard": {
      "balance": {
        "amount": 53965,
        "currency": "CAD"
      },
      "cardNumber": "7345-4444-5461",
      "customerId": "WB97W4L2VQRRkH5L0OAZGk0MT957r7Z",
      "expiration": "25/0000",
      "id": "ae906a78-0aff-4d4e-ad80-b77877f0226c",
      "type": "VISA"
    },
    "email": "abigail.appetitum@dicant.net",
    "firstName": "sciam",
    "id": "21c82838-507a-4745-bc1b-40e6e476a1fb",
    "lastName": "inquit",
    "phone": "1-717-5555-3010"
  },
...  

Above response is randomly generated based on the types/formats/regex/min-max limits of properties defined in Open-API and calling this API will automatically generate all valid and error responses, e.g. calling “curl http://localhost:8000/customers” again will return:

* Mark bundle as not supporting multiuse
< HTTP/1.1 500 Internal Server Error
< Content-Type:
< Vary: Origin
< X-Mock-Path: /customers
< X-Mock-Request-Count: 9
< X-Mock-Scenario: getCustomerByEmail-customers-500-8a93b6c60c492e730ea149d5d09e79d85701c01dbc017d178557ed1d2c1bad3d
< Date: Sun, 01 Jan 2023 20:41:17 GMT
< Content-Length: 67
<
* Connection #0 to host localhost left intact
{"logRef":"achieve_output_fresh","message":"buffalo_rescue_street"}

Consumer-driven Contract Testing

Upon uploading the Open-API specifications of microservices, the api-mock-service generates test contracts for each REST API and response statuses. You can then customize these test cases for consumer-driven contract testing.

For example, here is the default test contract generated for finding a customer by id with path “/customers/:id”:

method: GET
name: getCustomer-customers-200-61a298e
path: /customers/:id
description: ""
predicate: ""
request:
    match_query_params: {}
    match_headers: {}
    match_contents: '{}'
    path_params:
        id: \w+
    query_params: {}
    headers: {}
response:
    headers: 
      Content-Type:
        - application/json
    contents: '{"address":{"city":"{{RandStringMinMax 2 60}}","countryCode":"{{EnumString `US CA`}}","id":"{{UUID}}","streetAddress":"{{RandRegex `\\w+`}}","zipCode":"{{RandRegex `\\d{5}`}}"},"creditCard":{"balance":{"amount":{{RandNumMinMax 0 0}},"currency":"{{RandRegex `(USD|CAD|EUR|AUD)`}}"},"cardNumber":"{{RandRegex `\\d{4}-\\d{4}-\\d{4}`}}","customerId":"{{RandStringMinMax 30 36}}","expiration":"{{RandRegex `\\d{2}/\\d{4}`}}","id":"{{UUID}}","type":"{{EnumString `VISA MASTERCARD AMEX`}}"},"email":"{{RandRegex `.+@.+\\..+`}}","firstName":"{{RandRegex `\\w`}}","id":"{{UUID}}","lastName":"{{RandRegex `\\w`}}","phone":"{{RandRegex `1-\\d{3}-\\d{4}-\\d{4}`}}"}'
    contents_file: ""
    status_code: 200
wait_before_reply: 0s

Above template demonstrates interaction between consumer and producer by defining properties such as:

  • method – of REST API such as GET/POST/PUT/DELETE
  • name – of the test case
  • path of REST API
  • description – of test
  • predicate – defines a condition which must be true to select this test contract
  • request section defines input properties for the REST API including:
    • match_query_params – to match query input parameters for selecting the test contract
    • match_headers – to match input headers for selecting the test contract
    • match_contents – defines regex for selecting input body
    • path_params – defines path variables and regex
    • query_params and headers – defines sample input parameters and headers
  • response section defines output properties for the REST API including:
    • headers – defines response headers
    • contents – defines body of response
    • contents_file – allows loading response from a file
    • status_code – defines HTTP response status
  • wait_before_reply – defines wait time before returning response

You can then invoke test contract using:

curl http://localhost:8000/customers/1

that generates test case from the mock/stub server provided by the api-mock-service library, e.g.

{
  "address": {
    "city": "PanHQyfbHZVw",
    "countryCode": "US",
    "id": "ff5d0e98-daa5-49c8-bb79-f2d7274f2fb1",
    "streetAddress": "Sumus o proferens etiamne intuerer fugasti, nuntiantibus da?",
    "zipCode": "01364"
  },
  "creditCard": {
    "balance": {
      "amount": 80704,
      "currency": "USD"
    },
    "cardNumber": "3226-6666-2214",
    "customerId": "0VNf07XNWkLiIBhfmfCnrE1weTlkhmxn",
    "expiration": "24/5555",
    "id": "f9549ef3-a5eb-4df4-a8a9-85a30a6a49c6",
    "type": "VISA"
  },
  "email": "amanda.doleat@fructu.com",
  "firstName": "quaero",
  "id": "9aeee733-932d-4244-a6f8-f21d2883fd27",
  "lastName": "habeat",
  "phone": "1-052-5555-4733"
}

You can customize above response contents using builtin template functions in the api-mock-service library or create additional test contracts for each distinct input parameter. For example, following contract defines interaction between consumer and producer to add a new customer:

method: POST
name: saveCustomer-customers-200-ddfceb2
path: /customers
description: ""
order: 0
group: Sample Ecommerce API
predicate: ""
request:
    match_query_params: {}
    match_headers: {}
    match_contents: '{"address.city":"(__string__\\w+)","address.countryCode":"(__string__(US|CA))","address.streetAddress":"(__string__\\w+)","address.zipCode":"(__string__\\d{5})","creditCard.balance.amount":"(__number__[+-]?((\\d{1,10}(\\.\\d{1,5})?)|(\\.\\d{1,10})))","creditCard.balance.currency":"(__string__(USD|CAD|EUR|AUD))","creditCard.cardNumber":"(__string__\\d{4}-\\d{4}-\\d{4})","creditCard.customerId":"(__string__\\w+)","creditCard.expiration":"(__string__\\d{2}/\\d{4})","creditCard.type":"(__string__(VISA|MASTERCARD|AMEX))","email":"(__string__.+@.+\\..+)","firstName":"(__string__\\w)","lastName":"(__string__\\w)","phone":"(__string__1-\\d{3}-\\d{4}-\\d{4})"}'
    path_params: {}
    query_params: {}
    headers:
        ContentsType: application/json
    contents: '{"address":{"city":"__string__\\w+","countryCode":"__string__(US|CA)","streetAddress":"__string__\\w+","zipCode":"__string__\\d{5}"},"creditCard":{"balance":{"amount":"__number__[+-]?((\\d{1,10}(\\.\\d{1,5})?)|(\\.\\d{1,10}))","currency":"__string__(USD|CAD|EUR|AUD)"},"cardNumber":"__string__\\d{4}-\\d{4}-\\d{4}","customerId":"__string__\\w+","expiration":"__string__\\d{2}/\\d{4}","type":"__string__(VISA|MASTERCARD|AMEX)"},"email":"__string__.+@.+\\..+","firstName":"__string__\\w","lastName":"__string__\\w","phone":"__string__1-\\d{3}-\\d{4}-\\d{4}"}'
    example_contents: |
        address:
            city: Ab fabrorum meminerim conterritus nota falsissime deum?
            countryCode: CA
            streetAddress: Mei nisi dum, ab amaremus antris?
            zipCode: "00128"
        creditCard:
            balance:
                amount: 3000.4861560368768
                currency: USD
            cardNumber: 7740-7777-6114
            customerId: Fudi eodem sed habitaret agam pro si?
            expiration: 85/2222
            type: AMEX
        email: larry.neglecta@audio.edu
        firstName: fatemur
        lastName: gaudeant
        phone: 1-543-8888-2641
response:
    headers: 
      Content-Type: 
        - application/json
    contents: '{"address":{"city":"{{RandStringMinMax 2 60}}","countryCode":"{{EnumString `US CA`}}","id":"{{UUID}}","streetAddress":"{{RandRegex `\\w+`}}","zipCode":"{{RandRegex `\\d{5}`}}"},"creditCard":{"balance":{"amount":{{RandNumMinMax 0 0}},"currency":"{{RandRegex `(USD|CAD|EUR|AUD)`}}"},"cardNumber":"{{RandRegex `\\d{4}-\\d{4}-\\d{4}`}}","customerId":"{{RandStringMinMax 30 36}}","expiration":"{{RandRegex `\\d{2}/\\d{4}`}}","id":"{{UUID}}","type":"{{EnumString `VISA MASTERCARD AMEX`}}"},"email":"{{RandRegex `.+@.+\\..+`}}","firstName":"{{RandRegex `\\w`}}","id":"{{UUID}}","lastName":"{{RandRegex `\\w`}}","phone":"{{RandRegex `1-\\d{3}-\\d{4}-\\d{4}`}}"}'
    contents_file: ""
    status_code: 200
wait_before_reply: 0s

Above template defines interaction for adding a new customer where request section defines format of request and matching criteria using match_content property. The response section includes the headers and contents that are generated by the stub/mock server for consumer-driven contract testing. You can then invoke test contract using:

curl -X POST http://localhost:8000/customers -d '{"address":{"city":"rwjJS","countryCode":"US","id":"4a788c96-e532-4a97-9b8b-bcb298636bc1","streetAddress":"Cura diu me, miserere me?","zipCode":"24121"},"creditCard":{"balance":{"amount":57012,"currency":"USD"},"cardNumber":"5566-2222-8282","customerId":"tgzwgThaiZqc5eDwbKk23nwjZqkap7","expiration":"70/6666","id":"d966aafa-c28b-4078-9e87-f7e9d76dd848","type":"VISA"},"email":"andrew.recorder@ipsas.net","firstName":"quendam","id":"071396bb-f8db-489d-a8f7-bbcce952ecef","lastName":"formaeque","phone":"1-345-6666-0618"}'

Which will return a response such as:

{
  "address": {
    "city": "j77oUSSoB5lJCUtc4scxtm0vhilPRdLE7Nc8KzAunBa87OrMerCZI",
    "countryCode": "CA",
    "id": "9bb21030-29d0-44be-8f5a-25855e38c164",
    "streetAddress": "Qui superbam imago cernimus, sensarum nuntii tot da?",
    "zipCode": "08020"
  },
  "creditCard": {
    "balance": {
      "amount": 75666,
      "currency": "AUD"
    },
    "cardNumber": "1383-8888-5013",
    "customerId": "nNaUd15lf6lqkAEwKoguVTvBnPMBVDhdeO",
    "expiration": "73/5555",
    "id": "554efad7-17ab-49f9-967a-3e47381a4d34",
    "type": "AMEX"
  },
  "email": "deborah.vivit@desivero.gov",
  "firstName": "contexo",
  "id": "db70b737-ee1d-48ed-83da-c5a8773c7a5f",
  "lastName": "delectat",
  "phone": "1-013-7777-0054"
}

Note: The response will not match the request body as the contract testing only tests interactions between consumer and producer without maintaining any server side state. You can use other types of testing such as integration/component/functional testing for validating state based behavior.

Producer-driven Generated Tests

The process of defining contracts to generate tests for validating producer REST APIs is similar to consumer-driven contracts. For example, you can upload open-api specifications or user-defined contracts to the api-mock-service provided mock/stub server.

For example, you can upload open-API specifications for ecommerce-api.json as follows:

curl -H "Content-Type: application/yaml" --data-binary @ecommerce-api.json \
	http://localhost:8000/_oapi

Upon uploading the specifications, the mock server will generate contracts for each REST API and status. You can customize those contracts with additional validation or assertion and then invoke server generated tests either by specifying the REST API or invoke multiple REST APIs belonging to a specific group. You can also define an order for executing tests in a group and can optionally pass data from one invocation to the next invocation of REST API.

For testing purpose, we will customize customer REST APIs for adding a new customer and fetching a customer by its id, i.e.,

A contract for adding a new customer

method: POST
name: save-customer
path: /customers
group: customers
order: 0
request:
    headers:
        Content-Type: application/json
    contents: |
        address:
            city: {{RandCity}}
            countryCode: {{EnumString `US CA`}}
            id: {{UUID}}
            streetAddress: {{RandSentence 2 3}}
            zipCode: {{RandRegex `\d{5}`}}
        creditCard:
            balance:
                amount: {{RandNumMinMax 20 500}}
                currency: {{EnumString `USD CAD`}}
            cardNumber: {{RandRegex `\d{4}-\d{4}-\d{4}`}}
            customerId: {{UUID}}
            expiration: {{RandRegex `\d{2}/\d{4}`}}
            id: {{UUID}}
            type: {{EnumString `VISA MASTERCARD`}}
        email: {{RandEmail}}
        firstName: {{RandName}}
        id: {{UUID}}
        lastName: {{RandName}}
        phone: {{RandRegex `1-\d{3}-\d{3}-\d{4}`}}
response:
    match_headers: {}
    match_contents: '{"address.city":"(__string__\\w+)","address.countryCode":"(__string__(US|CA))","address.id":"(__string__\\w+)","address.streetAddress":"(__string__\\w+)","address.zipCode":"(__string__\\d{5}.?\\d{0,4})","creditCard.balance.amount":"(__number__[+-]?(([0-9]{1,10}(\\.[0-9]{1,5})?)|(\\.[0-9]{1,10})))","creditCard.balance.currency":"(__string__\\w+)","creditCard.cardNumber":"(__string__[\\d-]{10,20})","creditCard.customerId":"(__string__\\w+)","creditCard.expiration":"(__string__\\d{2}.\\d{4})","creditCard.id":"(__string__\\w+)","creditCard.type":"(__string__(VISA|MASTERCARD|AMEX))","email":"(__string__.+@.+\\..+)","firstName":"(__string__\\w+)","id":"(__string__\\w+)","lastName":"(__string__\\w+)","phone":"(__string__[\\-\\w\\d]{9,15})"}'
    pipe_properties:
      - id
      - email
    assertions:
      - VariableContains contents.email @
      - VariableContains contents.creditCard.type A
      - VariableContains headers.Content-Type application/json
      - VariableEQ status 200

The request section defines content property that will build the input request, which will be sent to the producer provided REST API. The server section defines match_contents to match regex of each response property. In addition, the response section defines assertions to compare against response contents, headers or status against expected output.

A contract for finding an existing customer

method: GET
name: get-customer
path: /customers/{{.id}}
description: ""
order: 1
group: customers
predicate: ""
request:
    path_params:
        id: \w+
    query_params: {}
    headers:
      Content-Type: application/json
    contents: ""
    example_contents: ""
response:
    headers: {}
    match_headers:
      Content-Type: application/json    
    match_contents: '{"address.city":"(__string__\\w+)","address.countryCode":"(__string__(US|CA))","address.streetAddress":"(__string__\\w+)","address.zipCode":"(__string__\\d{5})","creditCard.balance.amount":"(__number__[+-]?((\\d{1,10}(\\.\\d{1,5})?)|(\\.\\d{1,10})))","creditCard.balance.currency":"(__string__(USD|CAD|EUR|AUD))","creditCard.cardNumber":"(__string__\\d{4}-\\d{4}-\\d{4})","creditCard.customerId":"(__string__\\w+)","creditCard.expiration":"(__string__\\d{2}/\\d{4})","creditCard.type":"(__string__(VISA|MASTERCARD|AMEX))","email":"(__string__.+@.+\\..+)","firstName":"(__string__\\w)","lastName":"(__string__\\w)","phone":"(__string__1-\\d{3}-\\d{3}-\\d{4})"}'
    pipe_properties:
      - id
      - email
    assertions:
      - VariableContains contents.email @
      - VariableContains contents.creditCard.type A
      - VariableContains headers.Content-Type application/json
      - VariableEQ status 200

Above template defines similar properties to generate request body and defines match_contents with assertions to match expected output headers, body and status. Based on order of tests, the generated test to add new customer will be executed first, which will be followed by the test to find a customer by id. As we are testing against real REST APIs, the REST API path is defined as “/customers/{{.id}}” for finding a customer will populate the id from the output of first test based on the pipe_properties.

Uploading Contracts

Once you have the api-mock-service mock server running, you can upload contracts using:

curl -H "Content-Type: application/yaml" --data-binary @fixtures/get_customer.yaml \
	http://localhost:8000/_scenarios
curl -H "Content-Type: application/yaml" --data-binary @fixtures/save_customer.yaml \
	http://localhost:8000/_scenarios

You can start your service before invoking generated tests, e.g. we will use sample-openapi for the testing purpose and then invoke the generated tests using:

curl -X POST http://localhost:8000/_contracts/customers -d \
	'{"base_url": "http://localhost:8080", "execution_times": 5, "verbose": true}'

Above command will execute all tests for customers group and it will invoke each REST API 5 times. After executing the APIs, it will generate result as follows:

{
  "results": {
    "get-customer_0": {
      "email": "anna.intra@amicum.edu",
      "id": "fa7a06cd-1bf1-442e-b761-d1d074d24373"
    },
    "get-customer_1": {
      "email": "aaron.sequi@laetus.gov",
      "id": "c5128ac0-865c-4d91-bb0a-23940ac8a7cb"
    },
    "get-customer_2": {
      "email": "edward.infligi@evellere.com",
      "id": "a485739f-01d4-442e-9ddc-c2656ba48c63"
    },
    "get-customer_3": {
      "email": "gary.volebant@istae.com",
      "id": "ef0eacd0-75cc-484f-b9a4-7aebfe51d199"
    },
    "get-customer_4": {
      "email": "alexis.dicant@displiceo.net",
      "id": "da65b914-c34e-453b-8ee9-7f0df598ac13"
    },
    "save-customer_0": {
      "email": "anna.intra@amicum.edu",
      "id": "fa7a06cd-1bf1-442e-b761-d1d074d24373"
    },
    "save-customer_1": {
      "email": "aaron.sequi@laetus.gov",
      "id": "c5128ac0-865c-4d91-bb0a-23940ac8a7cb"
    },
    "save-customer_2": {
      "email": "edward.infligi@evellere.com",
      "id": "a485739f-01d4-442e-9ddc-c2656ba48c63"
    },
    "save-customer_3": {
      "email": "gary.volebant@istae.com",
      "id": "ef0eacd0-75cc-484f-b9a4-7aebfe51d199"
    },
    "save-customer_4": {
      "email": "alexis.dicant@displiceo.net",
      "id": "da65b914-c34e-453b-8ee9-7f0df598ac13"
    }
  },
  "errors": {},
  "metrics": {
    "getcustomer_counts": 5,
    "getcustomer_duration_seconds": 0.006,
    "savecustomer_counts": 5,
    "savecustomer_duration_seconds": 0.006
  },
  "succeeded": 10,
  "failed": 0
}

Though, generated tests are executed against real services, it’s recommended that the service implementation use test doubles or mock services for any dependent services as contract testing is not meant to replace component or end-to-end tests that provide better support for integration testing.

Recording Consumer/Producer interactions for Generating Stub Requests and Responses

The contract testing does not always depend on API specifications such as Open API and swagger and instead you can record interactions between consumers and producers using api-mock-service tool.

For example, if you have an existing REST API or a legacy service such as above sample API, you can record an interaction as follows:

export http_proxy="http://localhost:9000"
export https_proxy="http://localhost:9000"
curl -X POST -H "Content-Type: application/json" http://localhost:8080/customers -d \
	'{"address":{"city":"rwjJS","countryCode":"US","id":"4a788c96-e532-4a97-9b8b-bcb298636bc1","streetAddress":"Cura diu me, miserere me?","zipCode":"24121"},"creditCard":{"balance":{"amount":57012,"currency":"USD"},"cardNumber":"5566-2222-8282","customerId":"tgzwgThaiZqc5eDwbKk23nwjZqkap7","expiration":"70/6666","id":"d966aafa-c28b-4078-9e87-f7e9d76dd848","type":"VISA"},"email":"andrew.recorder@ipsas.net","firstName":"quendam","id":"071396bb-f8db-489d-a8f7-bbcce952ecef","lastName":"formaeque","phone":"1-345-6666-0618"}'

This will invoke the remote REST API, record contract interactions and then return server response:

{
  "id": "95d655e1-405e-4087-8a7d-56791eaf51cc",
  "firstName": "quendam",
  "lastName": "formaeque",
  "email": "andrew.recorder@ipsas.net",
  "phone": "1-345-6666-0618",
  "creditCard": {
    "id": "d966aafa-c28b-4078-9e87-f7e9d76dd848",
    "customerId": "tgzwgThaiZqc5eDwbKk23nwjZqkap7",
    "type": "VISA",
    "cardNumber": "5566-2222-8282",
    "expiration": "70/6666",
    "balance": {
      "amount": 57012,
      "currency": "USD"
    }
  },
  "address": {
    "id": "4a788c96-e532-4a97-9b8b-bcb298636bc1",
    "streetAddress": "Cura diu me, miserere me?",
    "city": "rwjJS",
    "zipCode": "24121",
    "countryCode": "US"
  }
}

The recorded contract can be used to generate the stub response, e.g. following configuration defines the recorded contract:

method: POST
name: recorded-customers-200-55240a69747cac85a881a3ab1841b09c2c66d6a9a9ae41c99665177d3e3b5bb7
path: /customers
description: recorded at 2023-01-02 03:18:11.80293 +0000 UTC for http://localhost:8080/customers
order: 0
group: customers
predicate: ""
request:
    match_query_params: {}
    match_headers:
        Content-Type: application/json
    match_contents: '{"address.city":"(__string__\\w+)","address.countryCode":"(__string__\\w+)","address.id":"(.+)","address.streetAddress":"(__string__\\w+)","address.zipCode":"(__string__\\d{5,5})","creditCard.balance.amount":"(__number__[+-]?\\d{1,10})","creditCard.balance.currency":"(__string__\\w+)","creditCard.cardNumber":"(__string__\\d{4,4}[-]\\d{4,4}[-]\\d{4,4})","creditCard.customerId":"(.+)","creditCard.expiration":"(.+)","creditCard.id":"(.+)","creditCard.type":"(__string__\\w+)","email":"(__string__\\w+.?\\w+@\\w+.?\\w+)","firstName":"(__string__\\w+)","id":"(.+)","lastName":"(__string__\\w+)","phone":"(__string__\\d{1,1}[-]\\d{3,3}[-]\\d{4,4}[-]\\d{4,4})"}'
    path_params: {}
    query_params: {}
    headers:
        Accept: '*/*'
        Content-Length: "522"
        Content-Type: application/json
        User-Agent: curl/7.65.2
    contents: '{"address":{"city":"rwjJS","countryCode":"US","id":"4a788c96-e532-4a97-9b8b-bcb298636bc1","streetAddress":"Cura diu me, miserere me?","zipCode":"24121"},"creditCard":{"balance":{"amount":57012,"currency":"USD"},"cardNumber":"5566-2222-8282","customerId":"tgzwgThaiZqc5eDwbKk23nwjZqkap7","expiration":"70/6666","id":"d966aafa-c28b-4078-9e87-f7e9d76dd848","type":"VISA"},"email":"andrew.recorder@ipsas.net","firstName":"quendam","id":"071396bb-f8db-489d-a8f7-bbcce952ecef","lastName":"formaeque","phone":"1-345-6666-0618"}'
    example_contents: ""
response:
    headers:
        Content-Type:
            - application/json
        Date:
            - Mon, 02 Jan 2023 03:18:11 GMT
    contents: '{"id":"95d655e1-405e-4087-8a7d-56791eaf51cc","firstName":"quendam","lastName":"formaeque","email":"andrew.recorder@ipsas.net","phone":"1-345-6666-0618","creditCard":{"id":"d966aafa-c28b-4078-9e87-f7e9d76dd848","customerId":"tgzwgThaiZqc5eDwbKk23nwjZqkap7","type":"VISA","cardNumber":"5566-2222-8282","expiration":"70/6666","balance":{"amount":57012.00,"currency":"USD"}},"address":{"id":"4a788c96-e532-4a97-9b8b-bcb298636bc1","streetAddress":"Cura diu me, miserere me?","city":"rwjJS","zipCode":"24121","countryCode":"US"}}'
    contents_file: ""
    example_contents: ""
    status_code: 200
    match_headers: {}
    match_contents: '{"address.city":"(__string__\\w+)","address.countryCode":"(__string__\\w+)","address.id":"(.+)","address.streetAddress":"(__string__\\w+)","address.zipCode":"(__string__\\d{5,5})","creditCard.balance.amount":"(__number__[+-]?\\d{1,10})","creditCard.balance.currency":"(__string__\\w+)","creditCard.cardNumber":"(__string__\\d{4,4}[-]\\d{4,4}[-]\\d{4,4})","creditCard.customerId":"(.+)","creditCard.expiration":"(.+)","creditCard.id":"(.+)","creditCard.type":"(__string__\\w+)","email":"(__string__\\w+.?\\w+@\\w+.?\\w+)","firstName":"(__string__\\w+)","id":"(.+)","lastName":"(__string__\\w+)","phone":"(__string__\\d{1,1}[-]\\d{3,3}[-]\\d{4,4}[-]\\d{4,4})"}'
    pipe_properties: []
    assertions: []
wait_before_reply: 0s

You can then invoke consumer-driven contracts to generate stub response or invoke generated tests to test against producer implementation as described in earlier section. Another benefit of capturing test contracts using recorded session is that it can accurately capture all URLs, parameters and headers for both requests and responses so that contract testing can precisely validate against existing behavior.

Summary

Though, unit-testing, component testing and end-to-end testing are a common testing strategies that are used by most organizations but they don’t provide adequate support to validate API specifications and interactions between consumers/clients and producers/providers of the APIs. The contract testing ensures that consumers and producers will not deviate from the specifications and can be used to validate changes for backward compatibility when APIs are evolved. This also decouples consumers and producers if the API is still in development as both parties can write code against the agreed contracts and test them independently. A service owner can generate producer contracts using tools such as api-mock-service based on Open API specification or user-defined constraints. The consumers can provide their consumer-driven contracts to the service providers to ensure that the API changes don’t break any consumers. These contracts can be stored in a source code repository or on a registry service so that contract testing can easily access them and execute them as part of the build and deployment pipelines. The api-mock-service tool greatly assists in adding contract testing to your software development lifecycle and is freely available from https://github.com/bhatti/api-mock-service.

February 6, 2016

Building a Generic Data Service

Filed under: Web Services — admin @ 10:44 pm

As REST based Micro-Services have become prevalent, I often find that web and mobile clients have to connect to different services for gathering data. You may have to call dozens of services to display data on a single screen or page. Also, you may only need subset of data from each service but you still have to pay for the bandwidth and parsing cost.

I created a new Java framework PlexDataProviders for aggregating and querying data from various underlying sources, which can be used to build a general-purpose data service. PlexDataProviders is a light-weight Java framework that abstract access to various data providers such as databases, files, web services, etc. It allows aggregation of data from various data providers.

The PlexDataProviders framework is divided into two components:

  • Data Provider – This component defines interfaces that are implemented to access data sources such as database or web services.
  • Query Engine – This component is used for querying and aggregating data.

The query engine can determine dependency between providers and it also allow you to use output of one of the data provider as input to another data provider. For example, let’s assume:

  • data-provider A requires input-a1, input-a2 and produces output-a1, output-a2
  • data-provider B requires input-b1 and output-a1 and produces output-b1, output-b2

Then you can pass input-a1, input-a2 to the query engine and request output-a1, output-a2, output-b1, output-b2 output data fields.

Benefits

PlexDataProviders provides offers following benefits:

  • It provides a unified way to search data and abstracts integration to underlying data sources.
  • It helps simplifying client side logic as they can use a single data service to query all data instead of using multiple data services.
  • This also help with managing end-points as you only a single end-point instead of connecting to multiple web services.
  • As clients can specify the data they need, this helps with payload size and network bandwidth.
  • The clients only need to create a single data parser so it keeps JSON parsing logic simple.
  • As PlexDataProviders supports multi-threading, it also helps with latency of the data fetch requests.
  • It partial failure so that a failure in a single data provider doesn’t effect other data providers and the data service can still return partial results. User
  • It supports timeout so that clients can receive available data that completes in given timeout interval

Data Structure

Following are primary data structures:

  • MetaField – This class defines meta information for each data field such as name, kind, type, etc.
  • MetaFieldType – This enum class supports primitive data types supported, i.e.
    • SCALAR_TEXT – simple text
    • SCALAR_INTEGER – integer numbers
    • SCALAR_DECIMAL – decimal numbers
    • SCALAR_DATE – dates
    • SCALAR_BOOLEAN – boolean
    • VECTOR_TEXT – array of text
    • VECTOR_INTEGER – array of integers
    • VECTOR_DECIMAL – array of decimals
    • VECTOR_DATE – array of dates
    • VECTOR_BOOLEAN – array of boolean
    • BINARY – binary data
    • ROWSET – nested data rowsets
  • Metadata – This class defines a set of MetaFields used in DataRow/DataRowSet
  • DataRow – This class abstracts a row of data fields
  • DataRowSet – This class abstracts a set of rows

PlexDataProviders also supports nested structures where a data field in DataRow can be instance of DataRowSet.

Adding a Data Provider

The data provider implements following two interfaces

[codesyntax lang="java"]
public interface DataProducer {
    void produce(DataRowSet requestFields, DataRowSet responseFields,
            QueryConfiguration config) throws DataProviderException;
}
[/codesyntax]

Note that QueryConfiguration defines additional parameters such as:

  • pagination parameters
  • ordering/grouping
  • filtering parameters
  • timeout parameters

The timeout parameter can be used to return all available data within defined time, e.g. query engine may invoke underlying data providers in multiple threads and if underlying query takes a long time then it would return available data.

[codesyntax lang="java"]
public interface DataProvider extends DataProducer, Comparable<DataProvider> {
    String getName();

    int getRank();

    Metadata getMandatoryRequestMetadata();

    Metadata getOptionalRequestMetadata();

    Metadata getResponseMetadata();

    TaskGranularity getTaskGranularity();
}
[/codesyntax]

Each provider defines name, rank (or priority when matching for best provider), set of mandatory/optional input and output data fields. The data provider can also define granularity as coarse grain or fine grain and the implementation may execute those providers on different threads.

PlexDataProviders also provides interfaces for converting data from domain objects to DataRowSet. Here is an example of provider implementation:

[codesyntax lang="java"]
public class SecuritiesBySymbolsProvider extends BaseProvider {
    private static Metadata parameterMeta = Metadata.from(SharedMeta.symbol);
    private static Metadata optionalMeta = Metadata.from();
    private static SecurityMarshaller marshaller = new SecurityMarshaller();

    public SecuritiesBySymbolsProvider() {
        super("SecuritiesBySymbolsProvider", parameterMeta, optionalMeta,
                marshaller.getMetadata());
    }

    @Override
    public void produce(DataRowSet parameter, DataRowSet response,
            QueryConfiguration config) throws DataProviderException {
        final String id = parameter.getValueAsText(SharedMeta.symbol, 0);
        Map<String, Object> criteria = new HashMap<>();
        criteria.put("symbol", id.toUpperCase());
        Collection<Security> securities = DaoLocator.securityDao.query(criteria);
        DataRowSet rowset = marshaller.marshal(securities);
        addRowSet(response, rowset, 0);
    }
}
[/codesyntax]

Typically, you will create data-provider for each different kind of query that you want to support. Each data provider specifies set of required and optional data fields that can be used to generate output data fields.

Here is an example of marshalling data from Securty domain objects to DataRowSet:

[codesyntax lang="java"]
public DataRowSet marshal(Security security) {
    DataRowSet rowset = new DataRowSet(responseMeta);
    marshal(rowset, security, 0);
    return rowset;
}

public DataRowSet marshal(Collection<Security> securities) {
    DataRowSet rowset = new DataRowSet(responseMeta);
    for (Security security : securities) {
        marshal(rowset, security, rowset.size());
    }
    return rowset;
}
...
[/codesyntax]

PlexDataProviders provides DataProviderLocator interface for registering and looking up provider, e.g.

[codesyntax lang="java"]
public interface DataProviderLocator {
    void register(DataProvider provider);

    Collection<DataProvider> locate(Metadata requestFields, Metadata responseFields);
...
}
[/codesyntax]

PlexDataProviders comes with a small application that provides data services by implementing various data providers. It uses PlexService framework for defining the service, e.g.

[codesyntax lang="java"]
@WebService
@Path("/data")
public class DataServiceImpl implements DataService {
    private DataProviderLocator dataProviderLocator = new DataProviderLocatorImpl();
    private QueryEngine queryEngine = new QueryEngineImpl(dataProviderLocator);

    public DataServiceImpl() {
        dataProviderLocator.register(new AccountsByIdsProvider());
        dataProviderLocator.register(new AccountsByUseridProvider());
        dataProviderLocator.register(new CompaniesBySymbolsProvider());
        dataProviderLocator.register(new OrdersByAccountIdsProvider());
        dataProviderLocator.register(new PositionGroupsBySymbolsProvider());
        dataProviderLocator.register(new PositionsBySymbolsProvider());
        dataProviderLocator.register(new QuotesBySymbolsProvider());
        dataProviderLocator.register(new SecuritiesBySymbolsProvider());
        dataProviderLocator.register(new UsersByIdsProvider());
        dataProviderLocator.register(new WatchlistByUserProvider());
        dataProviderLocator.register(new SymbolsProvider());
        dataProviderLocator.register(new UsersProvider());
        dataProviderLocator.register(new SymbolSearchProvider());
    }

    @Override
    @GET
    public DataResponse query(Request webRequest) {
        final DataRequest dataRequest = DataRequest.from(webRequest .getProperties());
        return queryEngine.query(dataRequest);
    }
}
[/codesyntax]

As you can see the data service simply builds DataRequest with input data fields and sends back response back to clients.

Here is an example client that passes a search query data field and requests quote data fields with company details

public void testGetQuoteBySearch() throws Throwable {
    String jsonResp = TestWebUtils.httpGet("http://localhost:" + DEFAULT_PORT
                    + "/data?responseFields=exchange,symbol,quote.bidPrice,quote.askPrice,quote.sales,company.name&symbolQuery=AAPL");
    ...

Note that above request will use three data providers, first it uses SymbolSearchProvider provider to search for matching symbols with given query. It then uses the symbol data field to request company and quote data fields from QuotesBySymbolsProvider and CompaniesBySymbolsProvider. The PlexDataProviders framework will take care of all dependency management for providers.

Here is an example JSON response from the data service:

[codesyntax lang="javascript"]
{
    "queryResponse": {
        "fields": [
            [{
                "symbol": "AAPL_X"
            }, {
                "quote.sales": [
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 56
                    }, {
                        "timeOfSale.exchange": "DOW"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 69.49132317180353
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 54
                    }, {
                        "timeOfSale.exchange": "NYSE"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 16.677774132458076
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 99
                    }, {
                        "timeOfSale.exchange": "NASDAQ"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 42.17891320885568
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 49
                    }, {
                        "timeOfSale.exchange": "DOW"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 69.61680149649729
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 69
                    }, {
                        "timeOfSale.exchange": "NYSE"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 25.353316897552833
                    }]
                ]
            }, {
                "quote.askPrice": 54.99300665695502
            }, {
                "quote.bidPrice": 26.935682182171643
            }, {
                "exchange": "DOW"
            }, {
                "company.name": "AAPL - name"
            }],
            [{
                "symbol": "AAPL"
            }, {
                "exchange": "NASDAQ"
            }]
        ],
        "errorsByProviderName": {},
        "providers": ["QuotesBySymbolsProvider", "SymbolSearchProvider", "CompaniesBySymbolsProvider"]
    }
}
[/codesyntax] 

PlexDataProviders is available from github and is licensed under liberal MIT license. It also comes with a small sample application for demo purpose. Feel free to send me your suggestions.

 

December 23, 2007

Released ErlSDB 0.1

Filed under: Erlang,SimpleDB,Web Services — admin @ 7:09 pm

I started working on an Erlang library to access Amazon’s SimpleDB web service and I released an early version of the library this weekend. Here are some notes on its usage:
Installing

svn checkout http://erlsdb.googlecode.com/svn/trunk/ erlsdb-read-only

Building

make

Testing

edit Makefile and add access key and secret key, then type make test

Usage

Take a look at test/erlsdb_test.erl to learn usage, here is a sample code

Starting Server

erlsdb:start(type,
    	[#sdb_state{
		access_key = "YourAccessKey",
		secret_key = "YourSecretKey",
		domain = "YourDomain"
		}
	])

Creating Domain

    erlsdb:create_domain()

Note that the server will use the domain that was passed during initialization.

Listing all Domains

    {ok, List, _} = erlsdb:list_domains()

Deleting Domain

    erlsdb:delete_domain()

Adding an item

    Attributes = lists:sort([
	["StreetAddress", "705 5th Ave"],
        ["City", "Seattle"],
        ["State", "WA"],
        ["Zip", "98101"]
	]),
    erlsdb:put_attributes("TccAddr", Attributes)

Retrieving an item

    {ok, UnsortedAttrs} = erlsdb:get_attributes("TccAddr")

Deleting an item

    erlsdb:delete_attributes("TccAddr"),

Powered by WordPress