I came across an interesting blog on classical distributed Byzantine Generals Problem from Mark Nelsons’ blog. He showed C++ implementation of Leslie Lamport algorithm. It seemed like a natural fit Erlang, but instead of writing directly in Erlang, I first wrote the algorithm into Java and Ruby (with slight redesign). Unfortunately, the original C++ source, and my Java and Ruby versions are not really truly distributed or concurrent. I just wanted to translate the original algorithm without changing a whole a lot, but Erlang gave you both distributing features and concurrency for free.
First, you can learn about the Byzantine General problem from Mark Nelson’s post or from this brief summary, or the Wikipedia.
The algorithm is heavily based on message communication where the general sends a message value (0 or 1), to his lieutenants and each lieutenant sends the value to all other lieutenants including himself. After some specific number of rounds a consenus is reached.
The original C++ code defines a Node structure to store input/output values of each process, a Traits class to store configuration and a method to come up with new values. Since, some processes can be faulty (or traiters), the Traits class determines the value that a process will return. The most of the fun happens in the Process class that performs all message communication. You can download the original C++ source code.
I slightly changed the structure and got rid of static attributes and methods. In my design, the application consists of following modules:
- Configuration – for storing configuration for simulation such as number of processes, rounds, source (general) process.
- Strategy – that determines the value that should be returned by the process.
- Repository – that stores hierarchical paths for messages that were communicated by the processes.
- Process/GeneralProcesses classification that send or receive messages.
- Engine – that drives the algorithm.
- Test – that actually invokes engine with different parameters and stores timings.
Here is the Java implementation (I am showing multiple java classes here, but you can download the complete source code from here.
I kept most of the algorithm as it was, which turned out to be really memory hog when you increase number of processes. In fact, I could not run the Java application for more than 10 processes with 512MB memory. Also, in my Java versio I used Interface Separation Principle (old habit):
1: 2: public enum Value { 3: ZERO, 4: ONE, 5: FAULTY, 6: UNKNOWN 7: } 8: public interface ValueStrategy { 9: 10: public Node getSourceValue(); 11: 12: public Value getValue( 13: Value value, 14: int source, 15: int destination, 16: String path); 17: 18: 19: public Value getDefaultValue(); 20: 21: 22: public boolean isFaulty(int process); 23: } 24: 25: 26: 27: public interface Broadcaster { 28: public void broadcast(int round, int id, Node source, String path); 29: } 30: 31: 32: 33: 34: import java.util.*; 35: 36: public interface PathRepository { 37: 38: public void generateChildren(); 39: 40: public List<String> getRankList(int rank, int source); 41: 42: public List<String> getPathChildren(String path); 43: } 44: 45: 46: 47: public class Configuration { 48: private int source; 49: private int roundsOfMessages; 50: private int numberOfProcesses; 51: 52: // 53: Configuration(int source, int roundsOfMessages, int numberOfProcesses) { 54: this.source = source; 55: this.roundsOfMessages = roundsOfMessages; 56: this.numberOfProcesses = numberOfProcesses; 57: } 58: 59: // 60: public int getSource() { 61: return source; 62: } 63: 64: 65: public int getRoundsOfMessages() { 66: return roundsOfMessages; 67: } 68: 69: 70: public int getNumberOfProcesses(){ 71: return numberOfProcesses; 72: } 73: } 74: 75: 76: 77: 78: import java.util.*; 79: 80: public class DefaultEngine implements Engine, Broadcaster { 81: static final boolean debug = System.getProperty("debug", "false").equals("true"); 82: private Configuration configuration; 83: private PathRepository repository; 84: private List<Process> processes; 85: private ValueStrategy strategy; 86: 87: // 88: public DefaultEngine(int source, int roundsOfMessages, int numberOfProcesses) { 89: this.configuration = new Configuration(source, roundsOfMessages, numberOfProcesses); 90: this.strategy = new DefaultValueStrategy(configuration); 91: this.repository = new DefaultPathRepository(configuration); 92: this.processes = new ArrayList<Process>(); 93: } 94: 95: public void broadcast(int round, int id, Node source, String path) { 96: for (int j=0; j<configuration.getNumberOfProcesses(); j++) { 97: if ( j != configuration.getSource()) { 98: Value value = strategy.getValue( 99: source.input, 100: id, 101: j, 102: path); 103: if (debug) { 104: String sourcePath = path.substring(0, path.length()-1); 105: System.out.println("Sending for round " + round + " from process " + id + " to " + j + ": {" + value + 106: ", " + path + ", " + Value.UNKNOWN + "}, getting value from source " + sourcePath); 107: } 108: getProcesses().get(j).receiveMessage(path, new Node(value, Value.UNKNOWN)); 109: } 110: } 111: } 112: 113: 114: public List<Process> getProcesses() { 115: return processes; 116: } 117: 118: public void run() { 119: // 120: // Starting at round 0 and working up to round M, call the 121: // SendMessages() method of each process. It will send the appropriate 122: // message to all other sibling processes. 123: // 124: for (int i=0; i<= configuration.getRoundsOfMessages(); i++) { 125: for (int j=0; j<configuration.getNumberOfProcesses(); j++) { 126: processes.get(j).sendMessages(i); 127: } 128: } 129: 130: // 131: // All that is left is to print out the results. For non-faulty processes, 132: // we call the Decide() method to see what what the process decision was 133: // 134: for (int j=0; j<configuration.getNumberOfProcesses(); j++) { 135: if (processes.get(j).isSource()) System.out.print("Source "); 136: System.out.print("Process " + j); 137: if (!processes.get(j).isFaulty()) { 138: System.out.print(" decides on value " + 139: processes.get(j).decide()); 140: } else { 141: System.out.print(" is faulty"); 142: } 143: System.out.println(); 144: } 145: } 146: 147: 148: public void start() { 149: // 150: for (int i=0; i<configuration.getNumberOfProcesses(); i++) { 151: Process process = i == configuration.getSource() ? new GeneralProcess(i, configuration, repository, this, strategy) : new Process(i, configuration, repository, this, strategy); 152: processes.add(process); 153: } 154: repository.generateChildren(); 155: } 156: } 157: import java.util.*; 158: 159: public class DefaultPathRepository implements PathRepository { 160: private Map<String, List<String>> children; 161: private Map<Integer, Map<Integer, List<String>>> pathByRank; 162: private Configuration configuration; 163: static final boolean debug = System.getProperty("debug", "false").equals("true"); 164: 165: // 166: public DefaultPathRepository(Configuration configuration) { 167: this.configuration = configuration; 168: this.children = new HashMap<String, List<String>>(); 169: this.pathByRank = new HashMap<Integer, Map<Integer, List<String>>>(); 170: } 171: 172: public Map<String, List<String>> getChildren() { 173: return children; 174: } 175: 176: public List<String> getPathChildren(String path) { 177: List<String> pathList = children.get(path); 178: if (pathList == null) { 179: pathList = new ArrayList<String>(); 180: children.put(path, pathList); 181: } 182: return pathList; 183: } 184: 185: 186: public List<String> getRankList(int rank, int source) { 187: Map<Integer, List<String>> pathMap = pathByRank.get(rank); 188: if (pathMap == null) { 189: pathMap = new HashMap<Integer, List<String>>(); 190: pathByRank.put(rank, pathMap); 191: } 192: List<String> pathList = pathMap.get(source); 193: if (pathList == null) { 194: pathList = new ArrayList<String>(); 195: pathMap.put(source, pathList); 196: } 197: return pathList; 198: } 199: 200: public void generateChildren() { 201: generateChildren(configuration.getSource(), new boolean[configuration.getNumberOfProcesses()], "", 0); 202: } 203: 204: 205: private void generateChildren(int source, boolean[] ids, String path, int rank) { 206: ids[source] = true; 207: path += toCharString(source); 208: getRankList(rank, source).add(path); 209: 210: // 211: if (rank < configuration.getRoundsOfMessages()) { 212: for (int i=0; i<ids.length; i++) { 213: if (!ids[i]) { 214: boolean[] newIds = new boolean[ids.length]; 215: System.arraycopy(ids, 0, newIds, 0, ids.length); 216: generateChildren(i, newIds, path, rank + 1); 217: getPathChildren(path).add(path + toCharString(i)); 218: } 219: } 220: } 221: 222: // 223: if (debug) { 224: System.out.print("generateChildren(" + source + "," + rank + "," + path + "), children = "); 225: List<String> list = getPathChildren(path); 226: for (String s : list) { 227: System.out.print(s + " "); 228: } 229: System.out.println(); 230: } 231: } 232: 233: 234: // 235: private char toChar(int n) { 236: return (char) (n + '0'); 237: } 238: private String toCharString(int n) { 239: return String.valueOf(toChar(n)); 240: } 241: } 242: 243: 244: 245: 246: 247: 248: import java.util.*; 249: 250: public class DefaultValueStrategy implements ValueStrategy { 251: private Configuration configuration; 252: private Node sourceValue; 253: 254: // 255: DefaultValueStrategy(Configuration configuration) { 256: this.configuration = configuration; 257: this.sourceValue = new Node(Value.ZERO, Value.UNKNOWN); 258: } 259: 260: // 261: // This method returns the true value of the source's value. The source may send 262: // faulty values to other processes, but the Node returned by this value will be 263: // in its root node. 264: // 265: // In this case, the General's node has in input value of 0, which makes that 266: // the desired value. Of course, since the General is faulty, this doesn't really 267: // matter. 268: // 269: public Node getSourceValue() { 270: return sourceValue; 271: } 272: 273: // 274: // During message, GetValue() is called to get the value returned by a given process 275: // during a messaging round. 'value' is the input value that it should be sending to 276: // the destination process (if it isn't faulty), source is the source process ID, 277: // destination is the destination process ID, and Path is the path being used for this 278: // particular message. 279: // 280: // In this particular implementation, we have two faulty processes - the source 281: // process, which returns a sort-of random value, and process ID 2, which returns 282: // a ONE always, in contradiction of the General's desired value of 0. 283: // 284: public Value getValue(Value value, int source, int destination, String path) { 285: if (configuration.getSource() == source) return (destination & 1) != 0 ? Value.ZERO : Value.ONE; 286: else if (source == 2) return Value.ONE; 287: return value; 288: } 289: 290: 291: // 292: // When breaking a tie, GetDefault() is used to get the default value. 293: // 294: // This is an arbitrary decision, but it has to be consistent across all processes. 295: // More importantly, the processes have to arrive at a correct decision regardless 296: // of whether the default value is always 0 or always 1. In this case we've set it to 297: // a value of 1. 298: // 299: public Value getDefaultValue() { 300: return Value.ONE; 301: } 302: 303: // 304: // This method is used to identify fault processes by ID 305: // 306: public boolean isFaulty(int process) { 307: return process == configuration.getSource() || process == 2; 308: } 309: } 310: 311: public interface Engine extends Runnable { 312: public void start(); 313: } 314: public class GeneralProcess extends Process { 315: public GeneralProcess(int id, Configuration configuration, PathRepository repository, Broadcaster broadcaster, ValueStrategy strategy) { 316: super(id, configuration, repository, broadcaster, strategy); 317: nodes.put("", strategy.getSourceValue()); 318: } 319: 320: 321: @Override 322: public Value decide() { 323: // 324: // The source process doesn't have to do all the work - since it's the decider, 325: // it simply looks at its input value to pick the appropriate decision value. 326: // 327: return nodes.get("").input; 328: } 329: } 330: 331: public class Node { 332: Value input; 333: Value output; 334: Node() { 335: this(Value.FAULTY, Value.FAULTY); 336: } 337: Node(Value input, Value output) { 338: this.input = input; 339: this.output = output; 340: } 341: } 342: 343: 344: import java.util.*; 345: 346: 347: public class Process { 348: protected int id; 349: protected Configuration configuration; 350: protected ValueStrategy strategy; 351: protected Map<String, Node> nodes; 352: protected PathRepository repository; 353: protected Broadcaster broadcaster; 354: static final boolean debug = System.getProperty("debug", "false").equals("true"); 355: public Process(int id, Configuration configuration, PathRepository repository, Broadcaster broadcaster, ValueStrategy strategy) { 356: this.id = id; 357: this.strategy = strategy; 358: this.repository = repository; 359: this.broadcaster = broadcaster; 360: this.configuration = configuration; 361: this.nodes = new HashMap<String, Node>(); 362: } 363: static int totalMessages; 364: // 365: // 366: // Receiving a message is pretty simple here, it means that some other process 367: // calls this method on the current process with path and a node. All we do 368: // is store the value, we'll use it in the next round of messaging. 369: // 370: public void receiveMessage(String path, Node node) { 371: nodes.put(path, node); 372: totalMessages++; 373: } 374: 375: 376: // 377: // After constructing all messages, you need to call SendMessages on each process, 378: // once per round. This routine will send the appropriate messages for each round 379: // to all th eother processes listed in the vector passed in as an argument. 380: // 381: // Deciding on what messages to send is pretty simple. If we look at the static 382: // map pathsByRank, indexed by round and the processId of this process, it gives 383: // the entire set of taraget paths that this process needs to send messages to. 384: // So there is an iteration loop through that map, and this process sends a message 385: // to the correct target process for each path in the map. 386: // 387: public void sendMessages(int round) { 388: List<String> pathList = repository.getRankList(round, id); 389: for (String path : pathList) { 390: String sourcePath = path.substring(0, path.length()-1); 391: Node source = nodes.get(sourcePath); 392: if (source == null) throw new IllegalStateException("Failed to find source node for " + sourcePath); 393: broadcaster.broadcast(round, id, source, path); 394: } 395: } 396: 397: // 398: // After all messages have been sent, it's time to Decide. 399: // 400: // This part of the algorithm follows the description in the article closely. 401: // It has to work its way from the leaf values up to the root of the tree. 402: // The first round consists of going to the leaf nodes, and copying the input 403: // value to the output value. 404: // 405: // All subsequent rounds consist of getting the majority of the output values from 406: // each nodes children, and copying that to the nodes output value. 407: // 408: // When we finally reach the root node, there is only one node with an output value, 409: // and that represents this processes decision. 410: // 411: public Value decide() { 412: // 413: // Step 1 - set the leaf values 414: // 415: for (int i=0; i<configuration.getNumberOfProcesses(); i++) { 416: List<String> pathList = repository.getRankList(configuration.getRoundsOfMessages(), i); 417: for (String path : pathList) { 418: Node node = nodes.get(path); 419: node.output = node.input; 420: } 421: } 422: // 423: // Step 2 - work up the tree 424: // 425: for (int round = configuration.getRoundsOfMessages() - 1 ; round >= 0 ; round--) { 426: for (int i=0; i<configuration.getNumberOfProcesses(); i++) { 427: List<String> pathList = repository.getRankList(round, i); 428: for (String path : pathList) { 429: Node node = nodes.get(path); 430: node.output = getMajority(path); 431: } 432: } 433: } 434: List<String> pathList = repository.getRankList(0, configuration.getSource()); 435: String topPath = pathList.get(0); 436: return nodes.get(topPath).output; 437: } 438: 439: // 440: // This routine calculates the majority value for the children of a given 441: // path. The logic is pretty simple, we increment the count for all possible 442: // values over the children. If there is a clearcut majority, we return that, 443: // otherwise we return the default value defined by the strategy class. 444: // 445: public Value getMajority(String path) { 446: Map<Value, Integer> counts = new HashMap<Value, Integer>(); 447: counts.put(Value.ONE, new Integer(0)); 448: counts.put(Value.ZERO, new Integer(0)); 449: counts.put(Value.UNKNOWN, new Integer(0)); 450: Collection<String> list = repository.getPathChildren(path); 451: int n = 0; 452: if (list == null) { 453: if (debug) System.out.println("No child found for '" + path + "'"); 454: } else { 455: n = list.size(); 456: for (String child : list) { 457: Node node = nodes.get(child); 458: if (node != null) { 459: counts.put(node.output, counts.get(node.output) + 1); 460: } else if (debug) { 461: //System.out.println("Could not find node for count with path " + child); 462: } 463: } 464: } 465: // 466: // 467: if (counts.get(Value.ONE) > ( n / 2 ) ) return Value.ONE; 468: else if (counts.get(Value.ZERO) > ( n / 2 ) ) return Value.ZERO; 469: else if (counts.get(Value.ONE).intValue() == counts.get(Value.ZERO).intValue() && 470: counts.get(Value.ONE) == (n / 2)) return strategy.getDefaultValue(); 471: return Value.UNKNOWN; 472: } 473: 474: 475: // 476: // A utility routine that tells whether a given process is faulty 477: // 478: public boolean isFaulty() { 479: return strategy.isFaulty(id); 480: } 481: // 482: // Another somewhat handy utility routine 483: // 484: public boolean isSource() { 485: return configuration.getSource() == id; 486: } 487: } 488: 489:
Here is the ruby version, which is pretty much same as the Java version (except a bit smaller). The operator overloading and power of collections give Ruby the terseness over Java version. Again, the application is consisted of classes for process, strategy, repository, engine, node, and config. The benchmarks are kicked off from ByzGeneralTest, which calls Engine, which in turn creates Repository, Strategy, Generals and LProcesses (lieutenants).
1: require 'benchmark' 2: require 'test/unit' 3: require 'test/unit/ui/console/testrunner' 4: 5: DEBUG = false 6: 7: 8: 9: 10: class Configuration 11: attr_reader :source, :num_rounds, :num_procs 12: def initialize(source, num_rounds, num_procs) 13: @source = source 14: @num_rounds = num_rounds 15: @num_procs = num_procs 16: end 17: def to_s 18: "#{@source}|#{@num_rounds}|#{@num_procs}" 19: end 20: end 21: 22: class Node 23: attr_accessor :input 24: attr_accessor :output 25: def initialize(input = Value::FAULTY, output = Value::FAULTY) 26: @input = input 27: @output = output 28: end 29: def to_s 30: "#{@input}/#{@output}" 31: end 32: end 33: 34: class Value 35: ZERO = '0' 36: ONE = '1' 37: FAULTY = 'X' 38: UNKNOWN = '?' 39: end 40: 41: 42: 43: 44: 45: class PathRepository 46: attr_accessor :children 47: attr_accessor :path_by_rank 48: attr_accessor :config 49: 50: def initialize(config) 51: @config = config 52: @children = Hash.new {|h,k| h[k] = []} 53: @path_by_rank = Hash.new {|h,k| h[k] = Hash.new {|hh,kk| hh[kk] = []}} 54: end 55: 56: def path_children(path) 57: @children[path] 58: end 59: 60: def rank_list(rank, source) 61: @path_by_rank[rank][source] 62: end 63: 64: def generate_children(source = @config.source, ids = new_ids, path = "", rank = 0) 65: ids[source] = true 66: path = "#{path}#{source}" 67: @path_by_rank[rank][source].push path 68: 69: if rank < @config.num_rounds 70: for i in 0...@config.num_procs 71: unless ids[i] 72: generate_children(i, new_ids(ids), path, rank + 1) 73: @children[path].push("#{path}#{i}") 74: end 75: end 76: end 77: 78: if (DEBUG) 79: print("generate_children(#{source}, #{rank}, #{path}, children = ") 80: @children[path].each do |child| 81: print(child + " ") 82: end 83: puts("") 84: end 85: end 86: 87: private 88: def new_ids(old_ids = nil) 89: #old_ids.nil? ? Array.new(@config.num_procs) {|i| false} : Array.new(old_ids) 90: old_ids.nil? ? Hash.new {|h,k| h[k] = false} : old_ids.dup 91: end 92: end 93: 94: 95: 96: class ValueStrategy 97: attr_accessor :config 98: attr_accessor :source_value 99: 100: def initialize(config) 101: @config = config 102: @source_value = Node.new(Value::ZERO, Value::UNKNOWN) 103: end 104: 105: def create_value(value, source, destination, path) 106: if @config.source == source 107: (destination & 1) != 0 ? Value::ZERO : Value::ONE 108: elsif source == 2 109: Value::ONE 110: else 111: value 112: end 113: end 114: 115: 116: def get_default 117: return Value::ONE 118: end 119: 120: def faulty?(process) 121: process == @config.source || process == 2 122: end 123: end 124: 125: 126: 127: class LProcess 128: @@total_messages = 0 129: attr_reader :id 130: attr_reader :config 131: attr_reader :strategy 132: attr_reader :repository 133: attr_reader :broadcaster 134: # 135: def initialize(id, config, repository, broadcaster, strategy) 136: @id = id 137: @config = config 138: @repository = repository 139: @strategy = strategy 140: @broadcaster = broadcaster 141: @nodes = Hash.new 142: end 143: 144: 145: def self.total_messages 146: @@total_messages 147: end 148: def self.reset_total_messages 149: @@total_messages = 0 150: end 151: 152: def receive_message(path, node) 153: @nodes[path] = node 154: @@total_messages += 1 155: end 156: 157: def send_messages(round) 158: @repository.rank_list(round, @id).each do |path| 159: source_path = path.slice(0, path.length-1) 160: source = @nodes[source_path] 161: raise "Source path #{source_path} not found" if source.nil? 162: broadcaster.broadcast(round, @id, source, path) 163: end 164: end 165: 166: def decide 167: #### Step 1 - set the leaf values 168: for i in 0...@config.num_procs 169: @repository.rank_list(@config.num_rounds, i).each do |path| 170: node = @nodes[path] 171: node.output = node.input 172: end 173: end 174: 175: ### Step 2 - work up the tree 176: (@config.num_rounds - 1).step 0, -1 do |round| 177: for i in 0...@config.num_procs 178: @repository.rank_list(round, i).each do |path| 179: @nodes[path].output = find_majority(path) 180: end 181: end 182: end 183: 184: path_list = @repository.rank_list(0, @config.source) 185: top_path = path_list[0] 186: @nodes[top_path].output 187: end 188: 189: def find_majority(path) 190: counts = Hash.new(0) 191: list = @repository.path_children(path) 192: n = 0 193: if list 194: n = list.size 195: list.each do |child| 196: node = @nodes[child] 197: counts[node.output] = counts[node.output] + 1 if node 198: end 199: end 200: # 201: if (counts[Value::ONE] > ( n / 2 ) ) 202: Value::ONE 203: elsif (counts[Value::ZERO] > ( n / 2 ) ) 204: Value::ZERO 205: elsif (counts[Value::ONE] == counts[Value::ZERO] && 206: counts[Value::ONE] == (n / 2)) 207: @strategy.get_default 208: else 209: Value::UNKNOWN 210: end 211: end 212: 213: def faulty? 214: @strategy.faulty?(id) 215: end 216: 217: def source? 218: @config.source == id 219: end 220: end 221: 222: 223: class GeneralProcess < LProcess 224: def initialize(id, config, repository, broadcaster, strategy) 225: super(id, config, repository, broadcaster, strategy) 226: @nodes[""] = @strategy.source_value 227: end 228: 229: 230: def decide 231: @nodes[""].input 232: end 233: end 234: 235: class Engine 236: attr_reader :config 237: attr_reader :repository 238: attr_reader :procs 239: attr_reader :strategy 240: 241: def initialize(source, num_rounds, num_procs) 242: @config = Configuration.new(source, num_rounds, num_procs) 243: @strategy = ValueStrategy.new(config) 244: @repository = PathRepository.new(config) 245: @procs = [] 246: end 247: 248: def broadcast(round, id, source, path) 249: for j in 0...@config.num_procs 250: unless j == @config.source 251: value = @strategy.create_value(source.input, id, j, path) 252: if (DEBUG) 253: source_path = path.slice(0, path.length-1) 254: puts("Sending for round #{round} from process #{id} to #{j} : #{value}, #{path}, #{Value::UNKNOWN}, getting value from source #{source_path}") 255: end 256: @procs[j].receive_message(path, Node.new(value, Value::UNKNOWN)) 257: end 258: end 259: end 260: 261: 262: def run 263: for i in 0...@config.num_rounds 264: for j in 0...@config.num_procs 265: @procs[j].send_messages(i) 266: end 267: end 268: 269: for j in 0...@config.num_procs 270: print("Source ") if @procs[j].source? 271: print("Process #{j}") 272: unless @procs[j].faulty? 273: print("decides on value #{@procs[j].decide}") 274: else 275: print(" is faulty") 276: end 277: puts("") 278: end 279: end 280: 281: 282: def start 283: for i in 0...@config.num_procs 284: process = i == @config.source ? 285: GeneralProcess.new(i, @config, @repository, self, @strategy) : 286: LProcess.new(i, @config, @repository, self, @strategy) 287: @procs.push(process) 288: end 289: @repository.generate_children 290: end 291: end 292: 293: 294: class ByzGeneralTest < Test::Unit::TestCase 295: def setup 296: end 297: 298: 299: def run_engine(m, n, source) 300: r = Benchmark.realtime() do 301: puts("Starting|#{m}|#{n}|#{source}") 302: engine = Engine.new(source, m, n) 303: engine.start 304: engine.run 305: end 306: puts("Finished|#{m}|#{n}|#{source}|#{LProcess.total_messages}|#{r*1000}") 307: Process.reset_total_messages 308: end 309: 310: 311: def xtest_once 312: n = 5 313: m = 6 314: source = 3 315: run_engine(m, n, source) 316: end 317: 318: def test_multiple 319: 5.step 50, 5 do |n| 320: 10.step 100, 10 do |m| 321: source = n / 3 322: run_engine(m, n, source) 323: end 324: end 325: end 326: end 327: Test::Unit::UI::Console::TestRunner.run(ByzGeneralTest) 328:
Finally here is the Erlang source code (Again I am showing multiple modules here). I tried to break the structure same as my Java and Ruby version and the application consisted of process, strategy, repository, engine, node, and config modules. The benchmarks were kicked off from the byz_general_test, which invoked engine and engine created processes for repository, generals and lieutenants. Another distinction is difference between active objects and regular functions. In this design, repository and processes are active objects that can receive messages via Erlang’s message communication primitives. Also, I added message counter just to keep track of number of messages for the simulation (though it is not requirement for the algorithm). I used ets APIs in repository to store paths of messages.
1: 2: 3: -module(config). 4: -compile(export_all). 5: 6: 10: -record(config, {source, num_procs, num_rounds}). 11: 12: new(S, N, M) -> 13: #config{source = S, num_procs = N, num_rounds = M}. 14: 15: get_source(Self) -> 16: Self#config.source. 17: get_num_procs(Self) -> 18: Self#config.num_procs. 19: get_num_rounds(Self) -> 20: Self#config.num_rounds. 21: 25: -module(counter). 26: -export([start/0, get_value/0, increment/0, reset/0, die/0, test_counter/0]). 27: -include("byz_general_test.hrl"). 28: 29: 33: 34: 35: start() -> 36: register(message_counter, spawn(fun() -> loop(0) end)). 37: 38: 39: get_value() -> 40: lib_util:rpc(message_counter, value). 41: 42: 43: increment() -> 44: message_counter ! {self(), increment}. 45: 46: reset() -> 47: message_counter ! {self(), reset}. 48: 49: 50: die() -> 51: message_counter ! {self(), die, "Exiting"}. 52: 53: 54: 55: loop(N) -> 56: receive 57: {_From, increment} -> 58: loop(N+1); 59: {_From, reset} -> 60: loop(0); 61: {From, value} -> 62: From ! {message_counter, N}, 63: loop(N); 64: {_From, die, Reason} -> 65: unregister(message_counter), 66: exit(Reason); 67: Any -> 68: io:format("Unexpected message ~p~n", [Any]) 69: end. 70: 71: 72: 76: test_counter() -> 77: start(), 78: increment(), 79: increment(), 80: 2 = get_value(), 81: reset(), 82: 0 = get_value(), 83: die(). 87: -module(engine). 88: -export([start/3, init/0, run/0, reset/0, test/0]). 89: 90: 94: 95: init() -> 96: counter:start(), 97: repository:start(). 98: 99: reset() -> 100: counter:reset(), 101: repository:reset(). 102: 103: start(Source, N, M) -> 104: Config = config:new(Source, N, M), 105: put(config, Config), 106: ProcIds = lists:seq(0, config:get_num_procs(Config)-1), 107: Pids = lists:map(fun(I) -> process:start(I, Config) end, ProcIds), 108: put(pids, Pids), 109: lists:foreach(fun(Pid) -> process:init(Pid, Pids) end, Pids), 110: repository:generate_children(Config), 111: Config. 112: 113: run() -> 114: Config = get(config), 115: Ids = lists:seq(0, config:get_num_procs(Config)-1), 116: lists:foreach(fun(Id) -> send_message(Id) end, Ids), 117: lists:foreach(fun(Id) -> print_result(Id) end, Ids). 118: 122: get_pid(Id) -> 123: Pids = get(pids), 124: lists:nth(Id+1, Pids). 125: 126: send_message(Id) -> 127: Config = get(config), 128: RoundIds = lists:seq(0, config:get_num_rounds(Config)-1), 129: lists:foreach(fun(Round) -> send_message(Id, Round) end, RoundIds). 130: 131: send_message(Id, Round) -> 132: Pid = get_pid(Id), 133: process:send_messages(Pid, Round, Id). 134: 135: print_result(Id) -> 136: Config = get(config), 137: Source = config:get_source(Config), 138: case Source of 139: Id -> 140: io:format("Source "); 141: _ -> 142: true 143: end, 144: io:format("Process ~p ", [Id]), 145: 146: Pid = get_pid(Id), 147: Faulty = process:is_faulty(Pid), 148: if 149: Faulty -> 150: io:format(" is faulty~n"); 151: true -> 152: Decision = process:decide(Pid), 153: io:format(" decides on value ~p~n", [Decision]) 154: end. 155: 159: test() -> 160: init(), 161: Config = start(3, 5, 6), 162: run(), 163: Config. 164: -module(lib_util). 165: -compile(export_all). 166: 170: rpc(Pid, Request) -> 171: Pid ! {self(), Request}, 172: receive 173: {Pid, Response} -> 174: Response 175: end. 176: -module(node). 177: -include("byz_general_test.hrl"). 178: -compile(export_all). 179: 183: -record(node, {input, output}). 184: 185: new() -> 186: #node{input = ?FAULTY, output = ?FAULTY}. 187: new(I, O) -> 188: #node{input = I, output = O}. 189: 190: 191: set_input(Self, I) -> 192: Self#node{input = I}. 193: get_input(Self) -> 194: Self#node.input. 195: 196: set_output(Self, O) -> 197: Self#node{output = O}. 198: get_output(Self) -> 199: Self#node.output. 200: set_output_as_input(Self) -> 201: O = get_output(Self), 202: Self#node{output = O}. 203: 204: 205: 206: 207: 211: -module(process). 212: -export([init/2, start/2, receive_message/3, send_messages/3, decide/1, find_majority/2, is_faulty/1, is_source/1, test_process/0]). 213: -include("byz_general_test.hrl"). 214: 215: 220: 221: % 222: %%% starts process loop 223: % 224: start(Id, Config) -> 225: spawn(fun() -> loop(Id, Config) end). 226: 227: receive_message(Pid, Path, Node) -> 228: %lib_util:rpc(Pid, {receive_message, Path, Node}). 229: Pid ! {self(), {receive_message, Path, Node}}. 230: 231: send_messages(Pid, Round, Id) -> 232: %lib_util:rpc(Pid, {send_messages, Round, Id}). 233: Pid ! {self(), {send_messages, Round, Id}}. 234: 235: init(Pid,Pids) -> 236: Pid ! {self(), init, Pids}. 237: 238: decide(Pid) -> 239: lib_util:rpc(Pid, decide). 240: 241: find_majority(Pid, Path) -> 242: lib_util:rpc(Pid, {find_majority, Path}). 243: 244: is_faulty(Pid) -> 245: lib_util:rpc(Pid, is_faulty). 246: 247: is_source(Pid) -> 248: lib_util:rpc(Pid, is_source). 249: 254: put(config, Config), 255: put(allPids, AllPids), 256: SourcePid = lists:nth(config:get_source(Config)+1, AllPids), 257: put(lieutenants, AllPids -- [SourcePid]), 258: Nodes = dict:new(), 259: put(nodes, Nodes), 260: Source = config:get_source(Config), 261: SourceValue = strategy:source_value(), 262: if 263: Source =:= Id -> 264: Nodes1 = dict:store("", SourceValue, Nodes), 265: put(nodes, Nodes1); 266: true -> 267: true 268: end. 269: 270: 271: 272: % 273: %%% stores message receievd in nodes dictionary 274: %%% we are also keeping track of total number of messages received in simulation. 275: % 276: receive_message(Path, Node) -> 277: Nodes = get(nodes), 278: Nodes1 = dict:store(Path, Node, Nodes), 279: put(nodes, Nodes1), 280: counter:increment(). 281: 282: get_node(Path) -> 283: Nodes = get(nodes), 284: Response = dict:find(Path, Nodes), 285: case Response of 286: {ok, Node} -> 287: Node; 288: _ -> 289: %io:format("Failed to find node for path ~p~n", [Path]), 290: node:new() 291: end. 292: 293: % 294: %%% 295: % 296: psend_messages(Round, Id) -> 297: L = repository:get_rank_list(Round, Id), 298: lists:foreach( 299: fun(Path) -> 300: psend_messages(Round, Id, Path) end, L). 301: 302: psend_messages(Round, Id, Path) -> 303: Length = string:len(Path), 304: SourcePath = if 305: Length > 1 -> 306: string:substr(Path, 1, Length-1); 307: true -> 308: "" 309: end, 310: Source = get_node(SourcePath), 311: broadcast(Round, Id, Source, Path). 312: 313: 314: % 315: %%% broadcast message to all processes 316: % 317: broadcast(Round, Id, SourceNode, Path) -> 318: Config = get(config), 319: ProcIds = lists:seq(0, config:get_num_procs(Config)-1), 320: IdsToProcess = ProcIds -- [config:get_source(Config)], 321: lists:foreach( 322: fun(Dest) -> 323: broadcast(Round, Id, SourceNode, Path, Dest) end, IdsToProcess). 324: 325: broadcast(_Round, Id, SourceNode, Path, Dest) -> 326: Config = get(config), 327: Value = strategy:create_value(Config, node:get_input(SourceNode), Id, Dest, Path), 328: ProcPids = get(allPids), 329: ProcPid = lists:nth(Dest+1, ProcPids), 330: receive_message(ProcPid, Path, node:new(Value, ?UNKNOWN)). 331: 332: 333: decide() -> 334: %%% Step 1 - set the leaf values 335: Config = get(config), 336: ProcIds = lists:seq(0, config:get_num_procs(Config)-1), 337: lists:foreach(fun(X) -> reset_node(X) end, ProcIds), 338: %%% Step 2 - work up the tree 339: RoundIds = lists:reverse(lists:seq(0, config:get_num_rounds(Config)-1)), 340: lists:foreach( 341: fun(Round) -> 342: set_node_value(Round) end, RoundIds), 343: Source = config:get_source(Config), 344: Result = repository:get_rank_list(0, Source), 345: case Result of 346: [] -> 347: ?UNKNOWN; 348: [TopPath] -> 349: node:get_output(get_node(TopPath)); 350: [TopPath|_] -> 351: node:get_output(get_node(TopPath)) 352: end. 353: 354: 355: set_node_value(Round) -> 356: Config = get(config), 357: ProcIds = lists:seq(0, config:get_num_procs(Config)-1), 358: lists:foreach( 359: fun(Id) -> 360: set_node_value(Round, Id) end, ProcIds). 361: 362: set_node_value(Round, Id) -> 363: L = repository:get_rank_list(Round, Id), 364: lists:foreach( 365: fun(Path) -> 366: set_node_value(Round, Id, Path) end, L). 367: 368: set_node_value(_Round, _Id, Path) -> 369: Nodes = get(nodes), 370: Node = get_node(Path), 371: Value = find_majority(Path), 372: Nodes1 = dict:store(Path, node:set_output(Node, Value), Nodes), 373: put(nodes, Nodes1). 374: 375: reset_node(I) -> 376: Config = get(config), 377: Nodes = get(nodes), 378: L = repository:get_rank_list(config:get_num_rounds(Config), I), 379: lists:foreach(fun(Path) -> 380: Node = get_node(Path), 381: Nodes1 = dict:store(Path, node:set_output_as_input(Node), Nodes), 382: put(nodes, Nodes1) 383: end, L). 384: 385: 386: increment_count(Child) -> 387: Counts = get(counts), 388: Node = get_node(Child), 389: Count = dict:fetch(node:get_output(Node), Counts) + 1, 390: Counts1 = dict:store(node:get_output(Node), Count, Counts), 391: put(counts, Counts1). 392: 393: find_majority(Path) -> 394: Counts = dict:new(), 395: Counts1 = dict:store(?ONE, 0, Counts), 396: Counts2 = dict:store(?ZERO, 0, Counts1), 397: Counts3 = dict:store(?UNKNOWN, 0, Counts2), 398: put(counts, Counts3), 399: L = repository:get_children_path(Path), 400: N = length(L), 401: lists:foreach(fun(Child) -> 402: increment_count(Child) end, L), 403: Counts4 = get(counts), 404: OneCount = dict:fetch(?ONE, Counts4), 405: ZeroCount = dict:fetch(?ZERO, Counts4), 406: 407: if 408: OneCount > ( N / 2 ) -> 409: ?ONE; 410: ZeroCount > ( N / 2 ) -> 411: ?ZERO; 412: OneCount == ZeroCount andalso OneCount == ( N / 2 ) -> 413: strategy:get_default(); 414: true -> 415: ?UNKNOWN 416: end. 417: 418: 419: loop(Id, Config) -> 420: receive 421: {_From, init, AllPids} -> 422: init(Id, Config, AllPids), 423: loop(Id, Config); 424: {_From, {receive_message, Path, Node}} -> 425: receive_message(Path, Node), 426: %From ! {self(), done}, 427: loop(Id, Config); 428: {_From, {send_messages, Round, Id}} -> 429: psend_messages(Round, Id), 430: %From ! {self(), done}, 431: loop(Id, Config); 432: {From, decide} -> 433: From ! {self(), decide()}, 434: loop(Id, Config); 435: {From, is_faulty} -> 436: From ! {self(), strategy:is_faulty(Config, Id)}, 437: loop(Id, Config); 438: {From, is_source} -> 439: From ! {self(), config:get_source(Config) == Id}, 440: loop(Id, Config); 441: {From, {find_majority, Path}} -> 442: From ! {self(), find_majority(Path)}, 443: loop(Id, Config); 444: Any -> 445: io:format("Unexpected ~p~n", [Any]), 446: loop(Id, Config) 447: end. 448: 449: 453: test_process() -> 454: counter:start(), 455: repository:start(), 456: Config = config:new(3, 5, 6), 457: Pid = start(0, Config), 458: init(Pid, [Pid, 1, 2, 3, 4, 5, 6]), 459: Path = "mypath", 460: Node = node:new(), 461: receive_message(Pid, Path, Node), 462: send_messages(Pid, 0, 0), 463: ?UNKNOWN = decide(Pid), 464: ?ONE = find_majority(Pid, Path), 465: false = is_faulty(Pid), 466: false = is_source(Pid). 467: 468: 469: 470: 471: 472: 476: -module(repository). 477: -export([start/0, reset/0, get_rank_list/2, get_children_path/1, generate_children/1, die/0, test_generate/0, test_path_ranks/0, test_children/0]). 478: -include("byz_general_test.hrl"). 479: 480: 481: 482: 486: start() -> 487: register(repository, spawn(fun() -> init_loop() end)). 488: 489: 490: init_loop() -> 491: Children = ets:new(children, [set]), 492: PathsByRank = ets:new(path_ranks, [set]), 493: loop(Children, PathsByRank). 494: 495: get_children_path(Path) -> 496: lib_util:rpc(repository, {get_children_path, Path}). 497: 498: get_rank_list(Rank, Source) -> 499: lib_util:rpc(repository, {get_rank_list, Rank, Source}). 500: 501: generate_children(Config) -> 502: repository ! {self(), generate_children, Config}. 503: 504: reset() -> 505: repository ! {self(), reset}. 506: 507: die() -> 508: repository ! {self(), die, "Exiting"}. 509: 515: 519: table_lookup(Tab, Key) -> 520: Result = ets:lookup(Tab, Key), 521: case Result of 522: [] -> 523: []; 524: error -> 525: []; 526: {ok, Value} -> 527: lists:reverse(Value); 528: [{Key, Value}] -> 529: lists:reverse(Value) 530: end. 531: 532: table_insert(Tab, Key, Value) -> 533: NewValue = case table_lookup(Tab, Key) of 534: [] -> 535: [Value]; 536: L -> 537: [Value|L] 538: end, 539: ets:insert(Tab, {Key, NewValue}). 540: 541: 542: 546: set_children_path(Children, PathKey, PathValue) -> 547: table_insert(Children, PathKey, PathValue). 548: 549: get_children_path(Children, Path) -> 550: table_lookup(Children, Path). 551: 552: 553: 554: 558: get_rank_list(PathsByRank, Rank, Source) -> 559: table_lookup(PathsByRank, {Rank, Source}). 560: 561: 562: set_rank_list(PathsByRank, Rank, Source, Path) -> 563: Key = {Rank, Source}, 564: table_insert(PathsByRank, Key, Path). 565: 566: 567: generate_children(Config, Children, PathsByRank) -> 568: generate_children( 569: Config, Children, PathsByRank, 570: config:get_source(Config), 571: [], 572: "", 573: 0). 574: 575: generate_children(Config, Children, PathsByRank, Source, Ids, Path, Rank) -> 576: Ids1 = [Source|Ids], 577: Path1 = Path ++ integer_to_list(Source), 578: set_rank_list(PathsByRank, Rank, Source, Path1), 579: %%% 580: Rounds = config:get_num_rounds(Config), 581: if 582: Rank < Rounds -> 583: IdsToProcess = lists:seq(0, config:get_num_procs(Config)-1) -- Ids1, 584: lists:foreach( 585: fun(Source1) -> generate_children(Config, Children, PathsByRank, Source1, Ids1, Path1, Rank+1), 586: set_children_path(Children, Path1, Path1 ++ integer_to_list(Source1)) 587: end, IdsToProcess); 588: true -> 589: true 590: end, 591: Children. 592: 593: 594: loop(Children, PathsByRank) -> 595: receive 596: {From, {get_rank_list, Rank, Source}} -> 597: From ! {repository, get_rank_list(PathsByRank, Rank, Source)}, 598: loop(Children, PathsByRank); 599: {From, {get_children_path, Path}} -> 600: From ! {repository, get_children_path(Children, Path)}, 601: loop(Children, PathsByRank); 602: {_From, reset} -> 603: ets:delete_all_objects(Children), 604: ets:delete_all_objects(PathsByRank), 605: loop(Children, PathsByRank); 606: {_From, generate_children, Config} -> 607: generate_children(Config, Children, PathsByRank), 608: loop(Children, PathsByRank); 609: {_From, die, Reason} -> 610: ets:delete(Children), 611: ets:delete(PathsByRank), 612: exit(Reason); 613: Any -> 614: io:format("Unexpected message~p~n", [Any]) 615: end. 616: 617: 618: 720: 724: benchmark_byz_general() -> 725: engine:init(), 726: benchmark_byz_general(5). 727: 728: benchmark_byz_general(N) when N < 100 -> 729: benchmark_byz_general(N, 10), 730: benchmark_byz_general(N+10); 731: benchmark_byz_general(N) when N >= 100 -> 732: true. 733: 734: benchmark_byz_general(N, M) when M < 100 -> 735: statistics(runtime), 736: statistics(wall_clock), 737: Source = round(N / 3), 738: io:format("Starting|~p|~p|~p~n", [M, N, Source]), 739: engine:start(Source, N, M), 740: engine:run(), 741: {_, RT} = statistics(runtime), 742: {_, WC} = statistics(wall_clock), 743: io:format("Finished|~p|~p|~p|~p|~p|~p~n", [M, N, Source, counter:get_value(), RT, WC]), 744: engine:reset(), 745: benchmark_byz_general(N, M+10); 746: benchmark_byz_general(_N, M) when M >= 100 -> 747: true. 748: 749:
I merged the results with following script:
1: class Stats 2: attr_accessor :processes 3: attr_accessor :rounds 4: attr_accessor :messages 5: attr_accessor :java_time 6: attr_accessor :erlang_time 7: attr_accessor :cpp_time 8: attr_accessor :ruby_time 9: def initialize(procs, rounds, msgs) 10: @processes = procs.to_i 11: @rounds = rounds.to_i 12: @messages = msgs.to_i 13: end 14: def key 15: "#{@processes}/#{@rounds}" 16: end 17: def to_s 18: "#{@processes},#{rounds},#{@messages},#{@cpp_time},#{@ruby_time},#{@java_time},#{@erlang_time}" 19: end 20: end 21: 22: stats = {} 23: 24: files = ["cpp.out", "ruby.out", "java.out", "erlang.out"] 25: count_setter = ["cpp_time=", "ruby_time=", "java_time=", "erlang_time="] 26: for i in 0...files.length 27: File.open(files[i], "r").readlines.each do |line| 28: if line =~ /Finish/ 29: t = line.split("|") 30: stat = Stats.new(t[2], t[1], t[4]) 31: time = t[5].to_i 32: stat.send(count_setter[i], time) 33: stats[stat.key] = stat 34: end 35: end 36: end 37: 38: 39: puts "Processes,Rounds,Messages,CPP Time,Ruby Time,Java Time,Erlang Time" 40: stats.values.sort_by {|stat| stat.processes * stat.rounds}.each do |stat| 41: puts stat 42: end 43:
And here are the results of benchmark that I ran on my Dell notebook (Note I used time function in C++ which only returns timing in seconds, so C++ timings are actually not precise.):

Conclusion:
This was interesting problem that tested limits of these languages. For example, I found when using more than 10 processes things got really nasty. The Java program gave up with OutOfMemory. The Erlang program dumped crashed, though I would have used OTP’s supervisors if I was writing more fault tolerant application. Ruby program became too slow, so I had to kill it.
Java turned out to be the performance leader in this case and I was a bit surprised when Erlang’s response time became really high with 9 processes. As, I mentioned earlier, the C++, Java and Ruby versions are not really concurrent and their message passing is really method invocation.
As far as mapping algorithm to the language, I found that Erlang fit very nicely for distributed algorithms that involve a lot of communications. I left the C++, Java and Ruby version very simple and didn’t try to implement truly indpendent processes and communication because that would have required a lot more effort.