Shahzad Bhatti Welcome to my ramblings and rants!

September 12, 2025

The Byzantine Generals Problem: A Modern Performance Analysis in Elixir, Erlang, and Rust

Filed under: Computing,Concurrency — admin @ 2:35 pm

Introduction

In 2007, I wrote about implementing Leslie Lamport’s Byzantine Generals Problem algorithm across several programming languages. At the time, this seemed like an interesting theoretical exercise in distributed computing. I didn’t realize that a year later, Satoshi Nakamoto would publish the Bitcoin whitepaper, introducing a decentralized, Sybil resistant digital currency that solved Byzantine fault tolerance at unprecedented scale.

Nearly two decades later, I’m returning to the Byzantine Generals Problem with the perspective that only hindsight provides. This updated post implements the algorithm in modern languages—Rust, Elixir, and contemporary Erlang.

The Byzantine Generals Problem: A Refresher

The Byzantine Generals Problem, first formalized by Leslie Lamport, addresses a fundamental challenge in distributed computing: how can distributed parties reach consensus when some parties may be unreliable or malicious? For example, imagine several divisions of the Byzantine army camped outside an enemy city, each commanded by a general. The generals must coordinate to either attack or retreat, but they can only communicate by messenger. The challenge: some generals might be traitors who will try to confuse the others by sending conflicting messages. For a solution to work, two conditions must be met:

  1. IC1: All loyal lieutenants obey the same order
  2. IC2: If the commanding general is loyal, then every loyal lieutenant obeys the order he sends

One of the most striking results is that no solution exists with fewer than 3m + 1 generals to handle m traitors. With only three generals, no algorithm can handle even a single traitor.

Why This Matters

When I originally wrote about this problem in 2007, Bitcoin didn’t exist. Satoshi Nakamoto’s whitepaper was published in 2008, and the first Bitcoin block wasn’t mined in 2009. Bitcoin’s proof-of-work consensus mechanism essentially solves the Byzantine Generals Problem in a novel way:

  • Generals = Miners: Each miner is like a general trying to reach consensus
  • Orders = Transactions: The “order” is which transactions to include in the next block
  • Traitors = Malicious Miners: Some miners might try to double-spend or create invalid blocks
  • Solution = Longest Chain: The network accepts the longest valid chain as truth

Bitcoin’s brilliant insight was using computational work (proof-of-work) as a way to make it economically expensive to be a “traitor.” As long as honest miners control more than 50% of the computing power, the system remains secure.

Modern Applications Beyond Blockchain

The Byzantine Generals Problem isn’t just about cryptocurrency. It’s fundamental to many critical systems:

  • Aircraft Control Systems: Multiple redundant computers must agree on flight controls
  • Satellite Networks: Space-based systems need fault tolerance against radiation-induced failures
  • Missile Defense: Critical decisions must be made reliably even with component failures
  • Distributed Databases: Systems like Apache Cassandra and MongoDB use Byzantine fault-tolerant algorithms
  • Container Orchestration: Kubernetes uses etcd, which implements Byzantine fault-tolerant consensus
  • Central Bank Digital Currencies (CBDCs): Many countries are exploring blockchain-based national currencies
  • Cross-Border Payments: Systems like Ripple use Byzantine fault-tolerant consensus

Implementation: Modern Languages for a Classic Problem

Let’s implement the Byzantine Generals Problem in three modern languages: Rust, Elixir, and updated Erlang. Each brings different strengths to distributed computing.

Why These Languages?

  • Rust: Memory safety without garbage collection, excellent for systems programming
  • Elixir: Built on the Actor model, designed for fault-tolerant distributed systems
  • Erlang: The original Actor model language, battle-tested in telecom systems

Core Algorithm

We’ll implement the OM(m) algorithm (Oral Messages with m traitors) that works for 3m + 1 or more generals.

Rust Implementation

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use tracing::{debug, info};

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Value {
    Zero,
    One,
    Retreat, // Default value
}

impl std::fmt::Display for Value {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Value::Zero => write!(f, "ZERO"),
            Value::One => write!(f, "ONE"),
            Value::Retreat => write!(f, "RETREAT"),
        }
    }
}

#[derive(Clone, Debug)]
pub struct Configuration {
    pub source: usize,
    pub num_rounds: usize,
    pub num_processes: usize,
}

pub struct ByzantineEngine {
    config: Configuration,
    processes: Vec<Arc<Mutex<Process>>>,
    message_count: Arc<Mutex<usize>>,
}

pub struct Process {
    id: usize,
    config: Configuration,
    values: HashMap<String, Value>,
    is_faulty: bool,
}

impl Process {
    pub fn new(id: usize, config: Configuration) -> Self {
        let is_faulty = id == config.source || id == 2; // Configure faulty processes
        Process {
            id,
            config,
            values: HashMap::new(),
            is_faulty,
        }
    }

    pub fn receive_message(&mut self, path: String, value: Value) {
        debug!("Process {} received message: path={}, value={:?}", self.id, path, value);
        self.values.insert(path, value);
    }

    pub fn send_messages(&self, round: usize, processes: &[Arc<Mutex<Process>>], 
                        message_count: Arc<Mutex<usize>>) {
        if round == 0 && self.id == self.config.source {
            self.send_initial_messages(processes, message_count);
        } else if round > 0 {
            self.relay_messages(round, processes, message_count);
        }
    }

    fn send_initial_messages(&self, processes: &[Arc<Mutex<Process>>], 
                           message_count: Arc<Mutex<usize>>) {
        let base_value = Value::Zero;
        
        for (i, process) in processes.iter().enumerate() {
            if i != self.id {
                let value = if self.is_faulty {
                    // Faulty commander sends different values to different processes
                    if i % 2 == 0 { Value::Zero } else { Value::One }
                } else {
                    base_value.clone()
                };
                
                let value_for_log = value.clone(); // Clone for logging
                let mut proc = process.lock().unwrap();
                proc.receive_message(self.id.to_string(), value);
                *message_count.lock().unwrap() += 1;
                
                debug!("Commander {} sent {:?} to process {}", self.id, value_for_log, i);
            }
        }
    }

    fn relay_messages(&self, round: usize, processes: &[Arc<Mutex<Process>>], 
                     message_count: Arc<Mutex<usize>>) {
        let paths = self.get_paths_for_round(round - 1);
        
        for path in paths {
            if let Some(value) = self.values.get(&path) {
                let new_value = self.transform_value(value.clone());
                let new_path = format!("{}{}", path, self.id);
                
                for (i, process) in processes.iter().enumerate() {
                    if i != self.id && !self.path_contains_process(&new_path, i) {
                        let mut proc = process.lock().unwrap();
                        proc.receive_message(new_path.clone(), new_value.clone());
                        *message_count.lock().unwrap() += 1;
                        
                        debug!("Process {} relayed {:?} to process {} with path {}", 
                               self.id, new_value, i, new_path);
                    }
                }
            }
        }
    }

    fn transform_value(&self, value: Value) -> Value {
        if self.is_faulty && self.id == 2 {
            Value::One // Process 2 always sends One when faulty
        } else {
            value
        }
    }

    fn get_paths_for_round(&self, round: usize) -> Vec<String> {
        if round == 0 {
            vec![self.config.source.to_string()]
        } else {
            self.values.keys()
                .filter(|path| path.len() == round + 1)
                .cloned()
                .collect()
        }
    }

    fn path_contains_process(&self, path: &str, process_id: usize) -> bool {
        path.contains(&process_id.to_string())
    }

    pub fn decide(&self) -> Value {
        if self.id == self.config.source {
            // Source process uses its own value
            return if self.is_faulty { Value::One } else { Value::Zero };
        }

        self.majority_vote()
    }

    fn majority_vote(&self) -> Value {
        let mut counts = HashMap::new();
        counts.insert(Value::Zero, 0);
        counts.insert(Value::One, 0);
        counts.insert(Value::Retreat, 0);

        // Count values from the final round paths
        let final_paths: Vec<_> = self.values.keys()
            .filter(|path| path.len() == self.config.num_rounds + 1)
            .collect();

        if final_paths.is_empty() {
            // Count all available values if no final round paths
            for value in self.values.values() {
                *counts.entry(value.clone()).or_insert(0) += 1;
            }
        } else {
            for path in final_paths {
                if let Some(value) = self.values.get(path) {
                    *counts.entry(value.clone()).or_insert(0) += 1;
                }
            }
        }

        debug!("Process {} vote counts: {:?}", self.id, counts);

        // Find majority
        let total_votes: usize = counts.values().sum();
        if total_votes == 0 {
            return Value::Retreat;
        }

        let majority_threshold = total_votes / 2;
        
        for (value, count) in counts {
            if count > majority_threshold {
                return value;
            }
        }

        Value::Retreat // Default if no majority
    }

    pub fn is_faulty(&self) -> bool {
        self.is_faulty
    }

    pub fn is_source(&self) -> bool {
        self.id == self.config.source
    }
}

impl ByzantineEngine {
    pub fn new(source: usize, num_rounds: usize, num_processes: usize) -> Self {
        let config = Configuration { source, num_rounds, num_processes };
        let processes: Vec<Arc<Mutex<Process>>> = (0..num_processes)
            .map(|id| Arc::new(Mutex::new(Process::new(id, config.clone()))))
            .collect();

        ByzantineEngine {
            config,
            processes,
            message_count: Arc::new(Mutex::new(0)),
        }
    }

    pub fn run(&self) -> (Duration, usize) {
        info!("Starting Byzantine Generals algorithm with {} processes, {} rounds", 
              self.config.num_processes, self.config.num_rounds);
        
        let start = Instant::now();
        
        for round in 0..self.config.num_rounds {
            debug!("Starting round {}", round);
            
            let handles: Vec<_> = self.processes.iter().enumerate().map(|(_id, process)| {
                let process = Arc::clone(process);
                let processes = self.processes.clone();
                let message_count = Arc::clone(&self.message_count);
                
                thread::spawn(move || {
                    let proc = process.lock().unwrap();
                    proc.send_messages(round, &processes, message_count);
                })
            }).collect();

            for handle in handles {
                // Add timeout to prevent hanging
                if handle.join().is_err() {
                    eprintln!("Warning: Thread failed in round {}", round);
                }
            }
            
            debug!("Completed round {}", round);
            // Small delay to ensure message ordering
            thread::sleep(Duration::from_millis(10));
        }

        let duration = start.elapsed();
        let messages = *self.message_count.lock().unwrap();
        
        info!("Algorithm completed in {:.2}ms with {} messages", 
              duration.as_millis(), messages);
        
        self.print_results();
        
        (duration, messages)
    }

    fn print_results(&self) {
        println!("\nByzantine Generals Results:");
        println!("===========================");
        
        for (id, process) in self.processes.iter().enumerate() {
            let proc = process.lock().unwrap();
            if proc.is_source() {
                print!("Source ");
            }
            print!("Process {}", id);
            if proc.is_faulty() {
                println!(" is faulty");
            } else {
                println!(" decides on value {}", proc.decide());
            }
        }
        println!();
    }
}

pub fn benchmark_comprehensive(max_processes: usize) {
    let test_cases = generate_test_cases(max_processes);
    
    for (processes, rounds) in test_cases {
        if processes < 4 {
            continue; // Skip invalid cases
        }
        
        let source = processes / 3;
        let engine = ByzantineEngine::new(source, rounds, processes);
        
        let start_memory = get_memory_usage();
        let start = Instant::now();
        let (duration, messages) = engine.run();
        let _total_duration = start.elapsed();
        let end_memory = get_memory_usage();
        
        let memory_used = end_memory.saturating_sub(start_memory);
        
        println!("Rust,{},{},{},{:.2},{:.2}", 
                processes, rounds, messages, 
                duration.as_millis(), memory_used as f64 / 1024.0 / 1024.0);
    }
}

fn generate_test_cases(max_processes: usize) -> Vec<(usize, usize)> {
    let mut cases = Vec::new();
    
    for n in (4..=max_processes).step_by(3) {
        for m in 1..=3 {
            if 3 * m + 1 <= n {
                cases.push((n, m));
            }
        }
    }
    
    cases
}

fn get_memory_usage() -> usize {
    // Simplified memory usage - would need platform-specific code for accurate measurement
    std::process::id() as usize * 1024 // Placeholder
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_value_display() {
        assert_eq!(format!("{}", Value::Zero), "ZERO");
        assert_eq!(format!("{}", Value::One), "ONE");
        assert_eq!(format!("{}", Value::Retreat), "RETREAT");
    }

    #[test]
    fn test_process_creation() {
        let config = Configuration {
            source: 0,
            num_rounds: 2,
            num_processes: 4,
        };
        
        let process = Process::new(0, config.clone());
        assert!(process.is_source());
        assert!(process.is_faulty()); // Source is faulty in our test setup
        
        let process2 = Process::new(1, config);
        assert!(!process2.is_source());
        assert!(!process2.is_faulty());
    }

    #[test]
    fn test_engine_creation() {
        let engine = ByzantineEngine::new(0, 2, 4);
        assert_eq!(engine.config.source, 0);
        assert_eq!(engine.config.num_rounds, 2);
        assert_eq!(engine.config.num_processes, 4);
        assert_eq!(engine.processes.len(), 4);
    }

    #[test]
    fn test_minimum_byzantine_case() {
        let engine = ByzantineEngine::new(0, 1, 4);
        let (duration, messages) = engine.run();
        
        assert!(duration.as_nanos() > 0);
        assert!(messages > 0);
    }
}

Elixir Implementation

defmodule ByzantineGenerals do
  @moduledoc """
  Byzantine Generals Problem implementation in Elixir
  Leverages the Actor model for natural distributed computing
  """

  require Logger

  defmodule Configuration do
    @moduledoc "Configuration for Byzantine Generals algorithm"
    defstruct [:source, :num_rounds, :num_processes]

    @type t :: %__MODULE__{
      source: non_neg_integer(),
      num_rounds: non_neg_integer(),
      num_processes: non_neg_integer()
    }
  end

  defmodule Process do
    @moduledoc "Individual process (general) in the Byzantine Generals algorithm"
    use GenServer
    require Logger

    defstruct [:id, :config, :values, :is_faulty, :message_count, :processes]

    @type value :: :zero | :one | :retreat
    @type path :: String.t()

    # Client API

    def start_link(%{id: id, config: config}) do
      GenServer.start_link(__MODULE__, %{id: id, config: config}, name: :"process_#{id}")
    end

    def receive_message(pid, path, value) do
      GenServer.call(pid, {:receive_message, path, value}, 10_000)
    end

    def send_messages(pid, round, processes) do
      GenServer.call(pid, {:send_messages, round, processes}, 10_000)
    end

    def decide(pid) do
      GenServer.call(pid, :decide, 5_000)
    end

    def is_faulty?(pid) do
      GenServer.call(pid, :is_faulty, 1_000)
    end

    def is_source?(pid) do
      GenServer.call(pid, :is_source, 1_000)
    end

    def get_message_count(pid) do
      GenServer.call(pid, :get_message_count, 1_000)
    end

    def get_values(pid) do
      GenServer.call(pid, :get_values, 1_000)
    end

    # Server callbacks

    @impl true
    def init(%{id: id, config: config}) do
      is_faulty = id == config.source || id == 2
      
      state = %__MODULE__{
        id: id,
        config: config,
        values: %{},
        is_faulty: is_faulty,
        message_count: 0,
        processes: []
      }
      
      Logger.debug("Process #{id} initialized, faulty: #{is_faulty}")
      {:ok, state}
    end

    @impl true
    def handle_call({:receive_message, path, value}, _from, state) do
      Logger.debug("Process #{state.id} received message: path=#{path}, value=#{value}")
      
      new_values = Map.put(state.values, path, value)
      new_count = state.message_count + 1
      
      {:reply, :ok, %{state | values: new_values, message_count: new_count}}
    end

    @impl true
    def handle_call({:send_messages, round, processes}, _from, state) do
      new_state = %{state | processes: processes}
      
      cond do
        round == 0 && state.id == state.config.source ->
          send_initial_messages(new_state)
        round > 0 ->
          relay_messages(new_state, round)
        true ->
          {:reply, :ok, new_state}
      end
    end

    @impl true
    def handle_call(:decide, _from, state) do
      decision = if state.id == state.config.source do
        # Source process uses its own value
        if state.is_faulty, do: :one, else: :zero
      else
        majority_vote(state)
      end
      
      {:reply, decision, state}
    end

    @impl true
    def handle_call(:is_faulty, _from, state) do
      {:reply, state.is_faulty, state}
    end

    @impl true
    def handle_call(:is_source, _from, state) do
      {:reply, state.id == state.config.source, state}
    end

    @impl true
    def handle_call(:get_message_count, _from, state) do
      {:reply, state.message_count, state}
    end

    @impl true
    def handle_call(:get_values, _from, state) do
      {:reply, state.values, state}
    end

    # Private functions

    defp send_initial_messages(state) do
      base_value = :zero
      
      Enum.each(state.processes, fn {id, pid} ->
        if id != state.id do
          value = if state.is_faulty do
            # Faulty commander sends different values
            if rem(id, 2) == 0, do: :zero, else: :one
          else
            base_value
          end
          
          receive_message(pid, Integer.to_string(state.id), value)
          Logger.debug("Commander #{state.id} sent #{value} to process #{id}")
        end
      end)
      
      {:reply, :ok, state}
    end

    defp relay_messages(state, round) do
      paths = get_paths_for_round(state, round - 1)
      
      Enum.each(paths, fn path ->
        case Map.get(state.values, path) do
          nil -> 
            :ok
          value ->
            new_value = transform_value(state, value)
            new_path = path <> Integer.to_string(state.id)
            
            Enum.each(state.processes, fn {id, pid} ->
              if id != state.id && !String.contains?(new_path, Integer.to_string(id)) do
                receive_message(pid, new_path, new_value)
                Logger.debug("Process #{state.id} relayed #{new_value} to #{id}, path: #{new_path}")
              end
            end)
        end
      end)
      
      {:reply, :ok, state}
    end

    defp transform_value(state, value) do
      if state.is_faulty && state.id == 2 do
        :one
      else
        value
      end
    end

    defp get_paths_for_round(state, round) do
      if round == 0 do
        [Integer.to_string(state.config.source)]
      else
        state.values
        |> Map.keys()
        |> Enum.filter(&(String.length(&1) == round + 1))
      end
    end

    defp majority_vote(state) do
      counts = Enum.reduce(state.values, %{zero: 0, one: 0, retreat: 0}, fn {_path, value}, acc ->
        Map.update!(acc, value, &(&1 + 1))
      end)
      
      Logger.debug("Process #{state.id} vote counts: #{inspect(counts)}")
      
      total_votes = Map.values(counts) |> Enum.sum()
      
      if total_votes == 0 do
        :retreat
      else
        majority_threshold = div(total_votes, 2)
        
        case Enum.find(counts, fn {_value, count} -> count > majority_threshold end) do
          {value, _count} -> value
          nil -> :retreat
        end
      end
    end
  end

  defmodule Engine do
    @moduledoc "Engine that orchestrates the Byzantine Generals algorithm"
    
    require Logger

    def run(source, num_rounds, num_processes, opts \\ []) do
      config = %Configuration{
        source: source,
        num_rounds: num_rounds,
        num_processes: num_processes
      }

      verbose = Keyword.get(opts, :verbose, true)
      
      if verbose do
        Logger.info("Starting Byzantine Generals: #{num_processes} processes, #{num_rounds} rounds, source: #{source}")
      end

      # Start processes
      processes = start_processes(config)
      
      start_time = :os.system_time(:millisecond)
      
      # Run algorithm rounds
      run_rounds(processes, num_rounds)
      
      end_time = :os.system_time(:millisecond)
      duration = end_time - start_time

      # Collect results
      {results, total_messages} = collect_results(processes, config, verbose)
      
      # Clean up
      cleanup_processes(processes)

      {duration, total_messages, results}
    end

    defp start_processes(config) do
      for id <- 0..(config.num_processes - 1) do
        {:ok, pid} = Process.start_link(%{id: id, config: config})
        {id, pid}
      end
    end

    defp run_rounds(processes, num_rounds, timeout \\ 30_000) do
      for round <- 0..(num_rounds - 1) do
        Logger.debug("Starting round #{round}")
        
        tasks = Enum.map(processes, fn {_id, pid} ->
          Task.async(fn -> 
            Process.send_messages(pid, round, processes)
          end)
        end)
        
        try do
          Task.await_many(tasks, timeout)
          # Small delay to ensure message ordering
          :timer.sleep(10)
        catch
          :exit, {:timeout, _} -> 
            Logger.error("Round #{round} timed out")
            throw(:timeout)
        end
      end
      :ok
    end

    defp collect_results(processes, _config, verbose) do
      total_messages = Enum.sum(Enum.map(processes, fn {_id, pid} ->
        Process.get_message_count(pid)
      end))

      results = Enum.map(processes, fn {id, pid} ->
        is_source = Process.is_source?(pid)
        is_faulty = Process.is_faulty?(pid)
        decision = if is_faulty, do: nil, else: Process.decide(pid)
        
        result = %{
          id: id,
          is_source: is_source,
          is_faulty: is_faulty,
          decision: decision
        }
        
        if verbose do
          print_process_result(result)
        end
        
        result
      end)

      {results, total_messages}
    end

    defp print_process_result(%{id: id, is_source: is_source, is_faulty: is_faulty, decision: decision}) do
      prefix = if is_source, do: "Source ", else: ""
      
      if is_faulty do
        IO.puts("#{prefix}Process #{id} is faulty")
      else
        IO.puts("#{prefix}Process #{id} decides on value #{decision}")
      end
    end

    defp cleanup_processes(processes) do
      Enum.each(processes, fn {_id, pid} -> 
        GenServer.stop(pid, :normal, 1000)
      end)
    end

    def benchmark(max_processes, opts \\ []) do
      verbose = Keyword.get(opts, :verbose, true)
      
      if verbose do
        IO.puts("Elixir Byzantine Generals Benchmark")
        IO.puts("===================================")
        IO.puts("Language,Processes,Rounds,Messages,Time(ms)")
      end
      
      test_cases = generate_test_cases(max_processes)
      
      results = Enum.map(test_cases, fn {processes, rounds} ->
        source = div(processes, 3)
        {time, messages, _results} = run(source, rounds, processes, verbose: false)
        
        result = %{
          language: "Elixir",
          processes: processes,
          rounds: rounds,
          messages: messages,
          time_ms: time
        }
        
        if verbose do
          IO.puts("Elixir,#{processes},#{rounds},#{messages},#{time}")
        end
        
        result
      end)
      
      results
    end

    defp generate_test_cases(max_processes) do
      for n <- 4..max_processes, rem(n - 1, 3) == 0 do
        for m <- 1..3, 3 * m + 1 <= n do
          {n, m}
        end
      end
      |> List.flatten()
    end
  end

  # Main module functions

  def run(source, num_rounds, num_processes, opts \\ []) do
    Engine.run(source, num_rounds, num_processes, opts)
  end

  def benchmark(max_processes \\ 20, opts \\ []) do
    Engine.benchmark(max_processes, opts)
  end

  def quick_test do
    IO.puts("Running quick test with 4 processes, 1 round...")
    {time, messages, results} = run(0, 1, 4)
    
    IO.puts("\nTest Results:")
    IO.puts("Time: #{time}ms")
    IO.puts("Messages: #{messages}")
    IO.puts("Processes reached consensus: #{length(results)}")
    IO.puts("? Test completed successfully")
    
    :ok
  end
end

defmodule ByzantineGenerals.Application do
  @moduledoc false
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Add supervised processes here if needed
    ]

    opts = [strategy: :one_for_one, name: ByzantineGenerals.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Testing Erlang Implementation

-module(byzantine_generals).
-export([run/3, benchmark/1, quick_test/0, start/0, stop/0]).
-include_lib("kernel/include/logger.hrl").

-record(config, {source, num_rounds, num_processes}).
-record(process_state, {id, config, values, is_faulty, message_count, processes}).

%% Public API

%% Start the application
start() ->
    application:start(byzantine_generals).

%% Stop the application  
stop() ->
    application:stop(byzantine_generals).

%% Run the Byzantine Generals algorithm
run(Source, NumRounds, NumProcesses) ->
    ?LOG_INFO("Starting Byzantine Generals: ~p processes, ~p rounds, source: ~p", 
              [NumProcesses, NumRounds, Source]),
    
    Config = #config{source = Source, num_rounds = NumRounds, num_processes = NumProcesses},
    
    % Validate configuration
    case validate_config(Config) of
        ok -> 
            run_algorithm(Config);
        {error, Reason} ->
            ?LOG_ERROR("Invalid configuration: ~p", [Reason]),
            {error, Reason}
    end.

%% Run benchmark with different configurations
benchmark(MaxProcesses) ->
    ?LOG_INFO("Running Erlang Byzantine Generals Benchmark up to ~p processes", [MaxProcesses]),
    
    io:format("Erlang Byzantine Generals Benchmark~n"),
    io:format("===================================~n"),
    io:format("Language,Processes,Rounds,Messages,Time(ms)~n"),
    
    TestCases = generate_test_cases(MaxProcesses),
    
    Results = lists:map(fun({Processes, Rounds}) ->
        Source = Processes div 3,
        case run(Source, Rounds, Processes) of
            {ok, Time, Messages, _ProcessResults} ->
                io:format("Erlang,~p,~p,~p,~p~n", [Processes, Rounds, Messages, Time]),
                #{language => erlang, processes => Processes, rounds => Rounds, 
                  messages => Messages, time_ms => Time};
            {error, _Reason} ->
                #{error => true, processes => Processes, rounds => Rounds}
        end
    end, TestCases),
    
    Results.

%% Quick test function
quick_test() ->
    io:format("Running quick test with 4 processes, 1 round...~n"),
    case run(0, 1, 4) of
        {ok, Time, Messages, _Results} ->
            io:format("~nTest Results:~n"),
            io:format("Time: ~pms~n", [Time]),
            io:format("Messages: ~p~n", [Messages]),
            io:format("? Test completed successfully~n"),
            ok;
        {error, Reason} ->
            io:format("? Test failed: ~p~n", [Reason]),
            error
    end.

%% Internal functions

validate_config(#config{source = Source, num_rounds = NumRounds, num_processes = NumProcesses}) ->
    if
        NumProcesses < 4 ->
            {error, "Need at least 4 processes for Byzantine Generals Problem"};
        Source >= NumProcesses ->
            {error, "Source must be less than number of processes"};
        NumRounds < 1 ->
            {error, "Need at least 1 round"};
        true ->
            ok
    end.

run_algorithm(Config) ->
    % Start message counter
    CounterPid = spawn_link(fun() -> counter_loop(0) end),
    register(message_counter, CounterPid),
    
    StartTime = erlang:system_time(millisecond),
    
    try
        % Start processes
        ProcessPids = start_processes(Config),
        
        % Initialize processes with neighbor information
        initialize_processes(ProcessPids, Config),
        
        % Run algorithm rounds
        run_rounds(ProcessPids, Config),
        
        % Wait for completion
        timer:sleep(100),
        
        EndTime = erlang:system_time(millisecond),
        Duration = EndTime - StartTime,
        
        % Collect results
        {TotalMessages, ProcessResults} = collect_results(ProcessPids, Config),
        
        % Cleanup
        cleanup_processes(ProcessPids),
        unregister(message_counter),
        exit(CounterPid, normal),
        
        {ok, Duration, TotalMessages, ProcessResults}
        
    catch
        Class:Reason:Stacktrace ->
            ?LOG_ERROR("Algorithm failed: ~p:~p~n~p", [Class, Reason, Stacktrace]),
            % Cleanup on error
            catch unregister(message_counter),
            catch exit(CounterPid, kill),
            {error, {Class, Reason}}
    end.

start_processes(Config) ->
    NumProcesses = Config#config.num_processes,
    lists:map(fun(Id) -> 
        Pid = spawn_link(fun() -> process_loop(Id, Config) end),
        {Id, Pid}
    end, lists:seq(0, NumProcesses - 1)).

initialize_processes(ProcessPids, Config) ->
    lists:foreach(fun({_Id, Pid}) -> 
        Pid ! {init, ProcessPids, Config}
    end, ProcessPids).

run_rounds(ProcessPids, Config) ->
    NumRounds = Config#config.num_rounds,
    lists:foreach(fun(Round) ->
        ?LOG_DEBUG("Starting round ~p", [Round]),
        
        % Send messages for this round
        lists:foreach(fun({_Id, Pid}) ->
            Pid ! {send_messages, Round, self()}
        end, ProcessPids),
        
        % Wait for all processes to complete the round
        lists:foreach(fun({_Id, _Pid}) ->
            receive
                {round_complete, Round} -> ok
            after 5000 ->
                ?LOG_WARNING("Timeout waiting for round ~p completion", [Round])
            end
        end, ProcessPids),
        
        % Small delay between rounds
        timer:sleep(10)
    end, lists:seq(0, NumRounds - 1)).

collect_results(ProcessPids, Config) ->
    % Get total message count
    TotalMessages = get_message_count(),
    
    % Get process results
    ProcessResults = lists:map(fun({Id, Pid}) ->
        Pid ! {get_result, self()},
        receive
            {result, Id, Result} -> 
                print_process_result(Id, Result, Config#config.source),
                Result
        after 2000 ->
            ?LOG_WARNING("Timeout getting result from process ~p", [Id]),
            #{id => Id, error => timeout}
        end
    end, ProcessPids),
    
    {TotalMessages, ProcessResults}.

print_process_result(Id, Result, Source) ->
    Prefix = case Id of
        Source -> "Source ";
        _ -> ""
    end,
    
    case maps:get(is_faulty, Result, false) of
        true ->
            io:format("~sProcess ~p is faulty~n", [Prefix, Id]);
        false ->
            Decision = maps:get(decision, Result, retreat),
            io:format("~sProcess ~p decides on value ~p~n", [Prefix, Id, Decision])
    end.

cleanup_processes(ProcessPids) ->
    lists:foreach(fun({_Id, Pid}) -> 
        Pid ! stop,
        % Don't wait for exit - let them clean up
        ok
    end, ProcessPids).

generate_test_cases(MaxProcesses) ->
    lists:flatten([
        [{N, M} || M <- lists:seq(1, 3), 3 * M + 1 =< N]
        || N <- lists:seq(4, MaxProcesses, 3)
    ]).

%% Process implementation

process_loop(Id, Config) ->
    IsFaulty = (Id =:= Config#config.source) orelse (Id =:= 2),
    State = #process_state{
        id = Id, 
        config = Config, 
        values = #{}, 
        is_faulty = IsFaulty,
        message_count = 0,
        processes = []
    },
    ?LOG_DEBUG("Process ~p initialized, faulty: ~p", [Id, IsFaulty]),
    process_loop(State).

process_loop(State) ->
    receive
        {init, ProcessPids, Config} ->
            NewState = State#process_state{processes = ProcessPids, config = Config},
            process_loop(NewState);
            
        {receive_message, Path, Value} ->
            NewValues = maps:put(Path, Value, State#process_state.values),
            NewState = State#process_state{
                values = NewValues,
                message_count = State#process_state.message_count + 1
            },
            increment_message_count(),
            ?LOG_DEBUG("Process ~p received message: path=~s, value=~p", 
                      [State#process_state.id, Path, Value]),
            process_loop(NewState);
            
        {send_messages, Round, From} ->
            NewState = handle_send_messages(State, Round),
            From ! {round_complete, Round},
            process_loop(NewState);
            
        {get_result, From} ->
            Result = create_result(State),
            From ! {result, State#process_state.id, Result},
            process_loop(State);
            
        stop ->
            ?LOG_DEBUG("Process ~p stopping", [State#process_state.id]),
            ok;
            
        Other ->
            ?LOG_WARNING("Process ~p received unexpected message: ~p", 
                        [State#process_state.id, Other]),
            process_loop(State)
    end.

handle_send_messages(State, Round) ->
    Id = State#process_state.id,
    Config = State#process_state.config,
    
    if 
        Round =:= 0 andalso Id =:= Config#config.source ->
            send_initial_messages(State);
        Round > 0 ->
            relay_messages(State, Round);
        true ->
            State
    end.

send_initial_messages(State) ->
    BaseValue = zero,
    ProcessPids = State#process_state.processes,
    
    lists:foreach(fun({Id, Pid}) ->
        if Id =/= State#process_state.id ->
            Value = case State#process_state.is_faulty of
                true -> 
                    % Faulty commander sends different values
                    case Id rem 2 of
                        0 -> zero;
                        1 -> one
                    end;
                false -> 
                    BaseValue
            end,
            
            Pid ! {receive_message, integer_to_list(State#process_state.id), Value},
            ?LOG_DEBUG("Commander ~p sent ~p to process ~p", 
                      [State#process_state.id, Value, Id]);
        true -> 
            ok
        end
    end, ProcessPids),
    
    State.

relay_messages(State, Round) ->
    Paths = get_paths_for_round(State, Round - 1),
    ProcessPids = State#process_state.processes,
    
    lists:foreach(fun(Path) ->
        case maps:get(Path, State#process_state.values, undefined) of
            undefined -> 
                ok;
            Value ->
                NewValue = transform_value(State, Value),
                NewPath = Path ++ integer_to_list(State#process_state.id),
                
                lists:foreach(fun({Id, Pid}) ->
                    IdStr = integer_to_list(Id),
                    case Id =/= State#process_state.id of
                        true ->
                            case string:str(NewPath, IdStr) of
                                0 -> % IdStr not found in NewPath
                                    Pid ! {receive_message, NewPath, NewValue},
                                    ?LOG_DEBUG("Process ~p relayed ~p to ~p, path: ~s", 
                                              [State#process_state.id, NewValue, Id, NewPath]);
                                _ -> % IdStr found in NewPath, skip
                                    ok
                            end;
                        false -> % Same process, skip
                            ok
                    end
                end, ProcessPids)
        end
    end, Paths),
    
    State.

transform_value(State, Value) ->
    if State#process_state.is_faulty andalso State#process_state.id =:= 2 ->
        one;
    true ->
        Value
    end.

get_paths_for_round(State, Round) ->
    if Round =:= 0 ->
        [integer_to_list((State#process_state.config)#config.source)];
    true ->
        maps:fold(fun(Path, _Value, Acc) ->
            case length(Path) of
                Len when Len =:= Round + 1 -> [Path | Acc];
                _ -> Acc
            end
        end, [], State#process_state.values)
    end.

create_result(State) ->
    Decision = if State#process_state.id =:= (State#process_state.config)#config.source ->
        % Source process uses its own value
        case State#process_state.is_faulty of
            true -> one;
            false -> zero
        end;
    true ->
        majority_vote(State)
    end,
    
    #{
        id => State#process_state.id,
        is_source => State#process_state.id =:= (State#process_state.config)#config.source,
        is_faulty => State#process_state.is_faulty,
        decision => Decision,
        message_count => State#process_state.message_count
    }.

majority_vote(State) ->
    Values = maps:values(State#process_state.values),
    Counts = lists:foldl(fun(Value, Acc) ->
        maps:update_with(Value, fun(Count) -> Count + 1 end, 1, Acc)
    end, #{zero => 0, one => 0, retreat => 0}, Values),
    
    ?LOG_DEBUG("Process ~p vote counts: ~p", [State#process_state.id, Counts]),
    
    TotalVotes = maps:fold(fun(_Value, Count, Sum) -> Sum + Count end, 0, Counts),
    
    if TotalVotes =:= 0 ->
        retreat;
    true ->
        MajorityThreshold = TotalVotes div 2,
        case maps:fold(fun(Value, Count, Acc) ->
            if Count > MajorityThreshold -> Value;
            true -> Acc
            end
        end, retreat, Counts) of
            retreat -> retreat;
            Value -> Value
        end
    end.

%% Message counter implementation

counter_loop(Count) ->
    receive
        increment ->
            counter_loop(Count + 1);
        {get_count, From} ->
            From ! {count, Count},
            counter_loop(Count);
        reset ->
            counter_loop(0);
        stop ->
            ok;
        _ ->
            counter_loop(Count)
    end.

increment_message_count() ->
    case whereis(message_counter) of
        undefined -> ok;
        Pid -> Pid ! increment
    end.

get_message_count() ->
    case whereis(message_counter) of
        undefined -> 0;
        Pid ->
            Pid ! {get_count, self()},
            receive
                {count, Count} -> Count
            after 1000 -> 0
            end
    end.

Performance Analysis and Benchmarking

To properly benchmark these implementations, we need to consider several factors:

Metrics to Measure

  1. Execution Time: How long does the algorithm take?
  2. Message Count: How many messages are exchanged?
  3. Memory Usage: Peak memory consumption
  4. Scalability: How performance degrades with increasing generals
  5. CPU Utilization: How efficiently the languages use system resources

Modern Benchmarking Approach

// Example comprehensive benchmark
pub struct BenchmarkResults {
    pub language: String,
    pub num_processes: usize,
    pub num_rounds: usize,
    pub execution_time_ms: f64,
    pub messages_sent: usize,
    pub memory_peak_mb: f64,
    pub cpu_utilization: f64,
}

pub fn comprehensive_benchmark() {
    let test_cases = vec![
        (4, 1),   // Minimum viable case
        (7, 2),   // Small scale
        (10, 3),  // Medium scale
        (16, 5),  // Larger scale
    ];

    for (processes, rounds) in test_cases {
        // Rust benchmark
        let rust_result = benchmark_rust_detailed(processes, rounds);
        
        // Elixir benchmark (would call via Port)
        let elixir_result = benchmark_elixir_detailed(processes, rounds);
        
        // Erlang benchmark (would call via Port)
        let erlang_result = benchmark_erlang_detailed(processes, rounds);
        
        compare_results(vec![rust_result, elixir_result, erlang_result]);
    }
}

Real-World Implications

The performance characteristics matter significantly in different contexts:

Blockchain Applications

  • Latency-Critical: Rust’s performance advantage matters for high-frequency trading
  • Node Count: Elixir/Erlang’s superior scaling helps with large blockchain networks
  • Fault Tolerance: Actor model languages excel at handling network partitions

IoT and Edge Computing

  • Resource Constraints: Rust’s low memory footprint is crucial
  • Device Coordination: Byzantine fault tolerance becomes critical for autonomous systems

Financial Systems

  • Regulatory Requirements: Provable consensus algorithms are increasingly required
  • High Availability: Erlang’s fault tolerance model aligns with financial system needs

Future Directions

Looking ahead, several trends will likely shape how we think about Byzantine fault tolerance:

  • Quantum Computing: Post-quantum cryptography will change how we implement Byzantine fault-tolerant signatures and may require new consensus mechanisms.
  • Climate Considerations: Energy-efficient consensus mechanisms (like Proof of Stake) are becoming increasingly important as environmental concerns grow.
  • Regulatory Frameworks: Government regulations around cryptocurrencies and distributed systems may influence which Byzantine fault-tolerant algorithms are acceptable in different contexts.
  • Edge and IoT: As computing moves to the edge, Byzantine fault tolerance becomes crucial for coordinating potentially millions of small, unreliable devices.

Performance Analysis

To compare the implementations, I measured complete wall-clock execution time including language runtime startup and algorithm execution across different process counts (10 to 2000 processes) with 1 round each. Each configuration was tested 3 times to ensure consistency. These benchmarks focus on demonstrating algorithmic correctness and relative performance characteristics rather than highly optimized production implementations.

All source code is available at https://github.com/bhatti/byz-sample for those interested in running or improving these implementations.

Results Summary

Complete Execution Time (Wall-Clock) – Updated Results:

  1. Elixir: 535ms average (range: 455-762ms)
  2. Rust: 577ms average (range: 521-667ms)
  3. Erlang: 1460ms average (range: 1401-1629ms)

Detailed Performance Breakdown

ConfigurationElixir (ms)Rust (ms)Erlang (ms)Messages
10 processes47153314078
50 processes476545140647
100 processes528587142091
200 processes4825501425199
1000 processes5685911497998
2000 processes68766116101999

Key Findings

  • Elixir maintained consistent performance across different process counts, showing good scalability characteristics
  • Rust delivered predictable performance with minimal variance, demonstrating excellent memory safety guarantees
  • Erlang showed significantly higher execution times but maintained reliability across all test configurations
  • Message counts remained consistent across languages for equivalent configurations, confirming algorithmic correctness

The results show that as process count increases from 10 to 2000:

  • Elixir scales relatively well, with execution time increasing by ~45%
  • Rust shows similar scaling characteristics, with ~24% increase
  • Erlang maintains consistent performance overhead regardless of scale

Note: These benchmarks measure wall-clock time including runtime startup overhead. The performance differences may be influenced by implementation patterns (GenServer vs raw message passing) and language-specific optimizations rather than fundamental runtime capabilities.

Try It Yourself

The complete implementation is available at https://github.com/bhatti/byz-sample with:

# Clone and run benchmarks
git clone https://github.com/bhatti/byz-sample
cd byz-sample
make benchmark

Disclaimer: Above implementation of the Byzantine Generals Problem serves as a case study for evaluating distributed computing approaches across different programming paradigms rather than benchmarking specific implementations in these languages.

Conclusion

The Byzantine Generals Problem exemplifies how fundamental computer science research can unexpectedly become the foundation for revolutionary technology. What began as an abstract theoretical exercise in 1982 became the backbone of Bitcoin in 2008 and continues to be crucial for modern distributed systems. My 2007 exploration of this problem was motivated by curiosity about distributed computing and language performance. Today, understanding Byzantine fault tolerance is essential for anyone working with blockchain technology, distributed databases, or fault-tolerant systems.

Try the implementations yourself: https://github.com/bhatti/byz-sample


No Comments

No comments yet.

RSS feed for comments on this post. TrackBack URL

Sorry, the comment form is closed at this time.

Powered by WordPress