I came across an interesting blog on classical distributed Byzantine Generals Problem from Mark Nelsons’ blog. He showed C++ implementation of Leslie Lamport algorithm. It seemed like a natural fit Erlang, but instead of writing directly in Erlang, I first wrote the algorithm into Java and Ruby (with slight redesign). Unfortunately, the original C++ source, and my Java and Ruby versions are not really truly distributed or concurrent. I just wanted to translate the original algorithm without changing a whole a lot, but Erlang gave you both distributing features and concurrency for free.
First, you can learn about the Byzantine General problem from Mark Nelson’s post or from this brief summary, or the Wikipedia.
The algorithm is heavily based on message communication where the general sends a message value (0 or 1), to his lieutenants and each lieutenant sends the value to all other lieutenants including himself. After some specific number of rounds a consenus is reached.
The original C++ code defines a Node structure to store input/output values of each process, a Traits class to store configuration and a method to come up with new values. Since, some processes can be faulty (or traiters), the Traits class determines the value that a process will return. The most of the fun happens in the Process class that performs all message communication. You can download the original C++ source code.
I slightly changed the structure and got rid of static attributes and methods. In my design, the application consists of following modules:
- Configuration – for storing configuration for simulation such as number of processes, rounds, source (general) process.
- Strategy – that determines the value that should be returned by the process.
- Repository – that stores hierarchical paths for messages that were communicated by the processes.
- Process/GeneralProcesses classification that send or receive messages.
- Engine – that drives the algorithm.
- Test – that actually invokes engine with different parameters and stores timings.
Here is the Java implementation (I am showing multiple java classes here, but you can download the complete source code from here.
I kept most of the algorithm as it was, which turned out to be really memory hog when you increase number of processes. In fact, I could not run the Java application for more than 10 processes with 512MB memory. Also, in my Java versio I used Interface Separation Principle (old habit):
1:
2: public enum Value {
3: ZERO,
4: ONE,
5: FAULTY,
6: UNKNOWN
7: }
8: public interface ValueStrategy {
9:
10: public Node getSourceValue();
11:
12: public Value getValue(
13: Value value,
14: int source,
15: int destination,
16: String path);
17:
18:
19: public Value getDefaultValue();
20:
21:
22: public boolean isFaulty(int process);
23: }
24:
25:
26:
27: public interface Broadcaster {
28: public void broadcast(int round, int id, Node source, String path);
29: }
30:
31:
32:
33:
34: import java.util.*;
35:
36: public interface PathRepository {
37:
38: public void generateChildren();
39:
40: public List<String> getRankList(int rank, int source);
41:
42: public List<String> getPathChildren(String path);
43: }
44:
45:
46:
47: public class Configuration {
48: private int source;
49: private int roundsOfMessages;
50: private int numberOfProcesses;
51:
52: //
53: Configuration(int source, int roundsOfMessages, int numberOfProcesses) {
54: this.source = source;
55: this.roundsOfMessages = roundsOfMessages;
56: this.numberOfProcesses = numberOfProcesses;
57: }
58:
59: //
60: public int getSource() {
61: return source;
62: }
63:
64:
65: public int getRoundsOfMessages() {
66: return roundsOfMessages;
67: }
68:
69:
70: public int getNumberOfProcesses(){
71: return numberOfProcesses;
72: }
73: }
74:
75:
76:
77:
78: import java.util.*;
79:
80: public class DefaultEngine implements Engine, Broadcaster {
81: static final boolean debug = System.getProperty("debug", "false").equals("true");
82: private Configuration configuration;
83: private PathRepository repository;
84: private List<Process> processes;
85: private ValueStrategy strategy;
86:
87: //
88: public DefaultEngine(int source, int roundsOfMessages, int numberOfProcesses) {
89: this.configuration = new Configuration(source, roundsOfMessages, numberOfProcesses);
90: this.strategy = new DefaultValueStrategy(configuration);
91: this.repository = new DefaultPathRepository(configuration);
92: this.processes = new ArrayList<Process>();
93: }
94:
95: public void broadcast(int round, int id, Node source, String path) {
96: for (int j=0; j<configuration.getNumberOfProcesses(); j++) {
97: if ( j != configuration.getSource()) {
98: Value value = strategy.getValue(
99: source.input,
100: id,
101: j,
102: path);
103: if (debug) {
104: String sourcePath = path.substring(0, path.length()-1);
105: System.out.println("Sending for round " + round + " from process " + id + " to " + j + ": {" + value +
106: ", " + path + ", " + Value.UNKNOWN + "}, getting value from source " + sourcePath);
107: }
108: getProcesses().get(j).receiveMessage(path, new Node(value, Value.UNKNOWN));
109: }
110: }
111: }
112:
113:
114: public List<Process> getProcesses() {
115: return processes;
116: }
117:
118: public void run() {
119: //
120: // Starting at round 0 and working up to round M, call the
121: // SendMessages() method of each process. It will send the appropriate
122: // message to all other sibling processes.
123: //
124: for (int i=0; i<= configuration.getRoundsOfMessages(); i++) {
125: for (int j=0; j<configuration.getNumberOfProcesses(); j++) {
126: processes.get(j).sendMessages(i);
127: }
128: }
129:
130: //
131: // All that is left is to print out the results. For non-faulty processes,
132: // we call the Decide() method to see what what the process decision was
133: //
134: for (int j=0; j<configuration.getNumberOfProcesses(); j++) {
135: if (processes.get(j).isSource()) System.out.print("Source ");
136: System.out.print("Process " + j);
137: if (!processes.get(j).isFaulty()) {
138: System.out.print(" decides on value " +
139: processes.get(j).decide());
140: } else {
141: System.out.print(" is faulty");
142: }
143: System.out.println();
144: }
145: }
146:
147:
148: public void start() {
149: //
150: for (int i=0; i<configuration.getNumberOfProcesses(); i++) {
151: Process process = i == configuration.getSource() ? new GeneralProcess(i, configuration, repository, this, strategy) : new Process(i, configuration, repository, this, strategy);
152: processes.add(process);
153: }
154: repository.generateChildren();
155: }
156: }
157: import java.util.*;
158:
159: public class DefaultPathRepository implements PathRepository {
160: private Map<String, List<String>> children;
161: private Map<Integer, Map<Integer, List<String>>> pathByRank;
162: private Configuration configuration;
163: static final boolean debug = System.getProperty("debug", "false").equals("true");
164:
165: //
166: public DefaultPathRepository(Configuration configuration) {
167: this.configuration = configuration;
168: this.children = new HashMap<String, List<String>>();
169: this.pathByRank = new HashMap<Integer, Map<Integer, List<String>>>();
170: }
171:
172: public Map<String, List<String>> getChildren() {
173: return children;
174: }
175:
176: public List<String> getPathChildren(String path) {
177: List<String> pathList = children.get(path);
178: if (pathList == null) {
179: pathList = new ArrayList<String>();
180: children.put(path, pathList);
181: }
182: return pathList;
183: }
184:
185:
186: public List<String> getRankList(int rank, int source) {
187: Map<Integer, List<String>> pathMap = pathByRank.get(rank);
188: if (pathMap == null) {
189: pathMap = new HashMap<Integer, List<String>>();
190: pathByRank.put(rank, pathMap);
191: }
192: List<String> pathList = pathMap.get(source);
193: if (pathList == null) {
194: pathList = new ArrayList<String>();
195: pathMap.put(source, pathList);
196: }
197: return pathList;
198: }
199:
200: public void generateChildren() {
201: generateChildren(configuration.getSource(), new boolean[configuration.getNumberOfProcesses()], "", 0);
202: }
203:
204:
205: private void generateChildren(int source, boolean[] ids, String path, int rank) {
206: ids[source] = true;
207: path += toCharString(source);
208: getRankList(rank, source).add(path);
209:
210: //
211: if (rank < configuration.getRoundsOfMessages()) {
212: for (int i=0; i<ids.length; i++) {
213: if (!ids[i]) {
214: boolean[] newIds = new boolean[ids.length];
215: System.arraycopy(ids, 0, newIds, 0, ids.length);
216: generateChildren(i, newIds, path, rank + 1);
217: getPathChildren(path).add(path + toCharString(i));
218: }
219: }
220: }
221:
222: //
223: if (debug) {
224: System.out.print("generateChildren(" + source + "," + rank + "," + path + "), children = ");
225: List<String> list = getPathChildren(path);
226: for (String s : list) {
227: System.out.print(s + " ");
228: }
229: System.out.println();
230: }
231: }
232:
233:
234: //
235: private char toChar(int n) {
236: return (char) (n + '0');
237: }
238: private String toCharString(int n) {
239: return String.valueOf(toChar(n));
240: }
241: }
242:
243:
244:
245:
246:
247:
248: import java.util.*;
249:
250: public class DefaultValueStrategy implements ValueStrategy {
251: private Configuration configuration;
252: private Node sourceValue;
253:
254: //
255: DefaultValueStrategy(Configuration configuration) {
256: this.configuration = configuration;
257: this.sourceValue = new Node(Value.ZERO, Value.UNKNOWN);
258: }
259:
260: //
261: // This method returns the true value of the source's value. The source may send
262: // faulty values to other processes, but the Node returned by this value will be
263: // in its root node.
264: //
265: // In this case, the General's node has in input value of 0, which makes that
266: // the desired value. Of course, since the General is faulty, this doesn't really
267: // matter.
268: //
269: public Node getSourceValue() {
270: return sourceValue;
271: }
272:
273: //
274: // During message, GetValue() is called to get the value returned by a given process
275: // during a messaging round. 'value' is the input value that it should be sending to
276: // the destination process (if it isn't faulty), source is the source process ID,
277: // destination is the destination process ID, and Path is the path being used for this
278: // particular message.
279: //
280: // In this particular implementation, we have two faulty processes - the source
281: // process, which returns a sort-of random value, and process ID 2, which returns
282: // a ONE always, in contradiction of the General's desired value of 0.
283: //
284: public Value getValue(Value value, int source, int destination, String path) {
285: if (configuration.getSource() == source) return (destination & 1) != 0 ? Value.ZERO : Value.ONE;
286: else if (source == 2) return Value.ONE;
287: return value;
288: }
289:
290:
291: //
292: // When breaking a tie, GetDefault() is used to get the default value.
293: //
294: // This is an arbitrary decision, but it has to be consistent across all processes.
295: // More importantly, the processes have to arrive at a correct decision regardless
296: // of whether the default value is always 0 or always 1. In this case we've set it to
297: // a value of 1.
298: //
299: public Value getDefaultValue() {
300: return Value.ONE;
301: }
302:
303: //
304: // This method is used to identify fault processes by ID
305: //
306: public boolean isFaulty(int process) {
307: return process == configuration.getSource() || process == 2;
308: }
309: }
310:
311: public interface Engine extends Runnable {
312: public void start();
313: }
314: public class GeneralProcess extends Process {
315: public GeneralProcess(int id, Configuration configuration, PathRepository repository, Broadcaster broadcaster, ValueStrategy strategy) {
316: super(id, configuration, repository, broadcaster, strategy);
317: nodes.put("", strategy.getSourceValue());
318: }
319:
320:
321: @Override
322: public Value decide() {
323: //
324: // The source process doesn't have to do all the work - since it's the decider,
325: // it simply looks at its input value to pick the appropriate decision value.
326: //
327: return nodes.get("").input;
328: }
329: }
330:
331: public class Node {
332: Value input;
333: Value output;
334: Node() {
335: this(Value.FAULTY, Value.FAULTY);
336: }
337: Node(Value input, Value output) {
338: this.input = input;
339: this.output = output;
340: }
341: }
342:
343:
344: import java.util.*;
345:
346:
347: public class Process {
348: protected int id;
349: protected Configuration configuration;
350: protected ValueStrategy strategy;
351: protected Map<String, Node> nodes;
352: protected PathRepository repository;
353: protected Broadcaster broadcaster;
354: static final boolean debug = System.getProperty("debug", "false").equals("true");
355: public Process(int id, Configuration configuration, PathRepository repository, Broadcaster broadcaster, ValueStrategy strategy) {
356: this.id = id;
357: this.strategy = strategy;
358: this.repository = repository;
359: this.broadcaster = broadcaster;
360: this.configuration = configuration;
361: this.nodes = new HashMap<String, Node>();
362: }
363: static int totalMessages;
364: //
365: //
366: // Receiving a message is pretty simple here, it means that some other process
367: // calls this method on the current process with path and a node. All we do
368: // is store the value, we'll use it in the next round of messaging.
369: //
370: public void receiveMessage(String path, Node node) {
371: nodes.put(path, node);
372: totalMessages++;
373: }
374:
375:
376: //
377: // After constructing all messages, you need to call SendMessages on each process,
378: // once per round. This routine will send the appropriate messages for each round
379: // to all th eother processes listed in the vector passed in as an argument.
380: //
381: // Deciding on what messages to send is pretty simple. If we look at the static
382: // map pathsByRank, indexed by round and the processId of this process, it gives
383: // the entire set of taraget paths that this process needs to send messages to.
384: // So there is an iteration loop through that map, and this process sends a message
385: // to the correct target process for each path in the map.
386: //
387: public void sendMessages(int round) {
388: List<String> pathList = repository.getRankList(round, id);
389: for (String path : pathList) {
390: String sourcePath = path.substring(0, path.length()-1);
391: Node source = nodes.get(sourcePath);
392: if (source == null) throw new IllegalStateException("Failed to find source node for " + sourcePath);
393: broadcaster.broadcast(round, id, source, path);
394: }
395: }
396:
397: //
398: // After all messages have been sent, it's time to Decide.
399: //
400: // This part of the algorithm follows the description in the article closely.
401: // It has to work its way from the leaf values up to the root of the tree.
402: // The first round consists of going to the leaf nodes, and copying the input
403: // value to the output value.
404: //
405: // All subsequent rounds consist of getting the majority of the output values from
406: // each nodes children, and copying that to the nodes output value.
407: //
408: // When we finally reach the root node, there is only one node with an output value,
409: // and that represents this processes decision.
410: //
411: public Value decide() {
412: //
413: // Step 1 - set the leaf values
414: //
415: for (int i=0; i<configuration.getNumberOfProcesses(); i++) {
416: List<String> pathList = repository.getRankList(configuration.getRoundsOfMessages(), i);
417: for (String path : pathList) {
418: Node node = nodes.get(path);
419: node.output = node.input;
420: }
421: }
422: //
423: // Step 2 - work up the tree
424: //
425: for (int round = configuration.getRoundsOfMessages() - 1 ; round >= 0 ; round--) {
426: for (int i=0; i<configuration.getNumberOfProcesses(); i++) {
427: List<String> pathList = repository.getRankList(round, i);
428: for (String path : pathList) {
429: Node node = nodes.get(path);
430: node.output = getMajority(path);
431: }
432: }
433: }
434: List<String> pathList = repository.getRankList(0, configuration.getSource());
435: String topPath = pathList.get(0);
436: return nodes.get(topPath).output;
437: }
438:
439: //
440: // This routine calculates the majority value for the children of a given
441: // path. The logic is pretty simple, we increment the count for all possible
442: // values over the children. If there is a clearcut majority, we return that,
443: // otherwise we return the default value defined by the strategy class.
444: //
445: public Value getMajority(String path) {
446: Map<Value, Integer> counts = new HashMap<Value, Integer>();
447: counts.put(Value.ONE, new Integer(0));
448: counts.put(Value.ZERO, new Integer(0));
449: counts.put(Value.UNKNOWN, new Integer(0));
450: Collection<String> list = repository.getPathChildren(path);
451: int n = 0;
452: if (list == null) {
453: if (debug) System.out.println("No child found for '" + path + "'");
454: } else {
455: n = list.size();
456: for (String child : list) {
457: Node node = nodes.get(child);
458: if (node != null) {
459: counts.put(node.output, counts.get(node.output) + 1);
460: } else if (debug) {
461: //System.out.println("Could not find node for count with path " + child);
462: }
463: }
464: }
465: //
466: //
467: if (counts.get(Value.ONE) > ( n / 2 ) ) return Value.ONE;
468: else if (counts.get(Value.ZERO) > ( n / 2 ) ) return Value.ZERO;
469: else if (counts.get(Value.ONE).intValue() == counts.get(Value.ZERO).intValue() &&
470: counts.get(Value.ONE) == (n / 2)) return strategy.getDefaultValue();
471: return Value.UNKNOWN;
472: }
473:
474:
475: //
476: // A utility routine that tells whether a given process is faulty
477: //
478: public boolean isFaulty() {
479: return strategy.isFaulty(id);
480: }
481: //
482: // Another somewhat handy utility routine
483: //
484: public boolean isSource() {
485: return configuration.getSource() == id;
486: }
487: }
488:
489:
Here is the ruby version, which is pretty much same as the Java version (except a bit smaller). The operator overloading and power of collections give Ruby the terseness over Java version. Again, the application is consisted of classes for process, strategy, repository, engine, node, and config. The benchmarks are kicked off from ByzGeneralTest, which calls Engine, which in turn creates Repository, Strategy, Generals and LProcesses (lieutenants).
1: require 'benchmark'
2: require 'test/unit'
3: require 'test/unit/ui/console/testrunner'
4:
5: DEBUG = false
6:
7:
8:
9:
10: class Configuration
11: attr_reader :source, :num_rounds, :num_procs
12: def initialize(source, num_rounds, num_procs)
13: @source = source
14: @num_rounds = num_rounds
15: @num_procs = num_procs
16: end
17: def to_s
18: "#{@source}|#{@num_rounds}|#{@num_procs}"
19: end
20: end
21:
22: class Node
23: attr_accessor :input
24: attr_accessor :output
25: def initialize(input = Value::FAULTY, output = Value::FAULTY)
26: @input = input
27: @output = output
28: end
29: def to_s
30: "#{@input}/#{@output}"
31: end
32: end
33:
34: class Value
35: ZERO = '0'
36: ONE = '1'
37: FAULTY = 'X'
38: UNKNOWN = '?'
39: end
40:
41:
42:
43:
44:
45: class PathRepository
46: attr_accessor :children
47: attr_accessor :path_by_rank
48: attr_accessor :config
49:
50: def initialize(config)
51: @config = config
52: @children = Hash.new {|h,k| h[k] = []}
53: @path_by_rank = Hash.new {|h,k| h[k] = Hash.new {|hh,kk| hh[kk] = []}}
54: end
55:
56: def path_children(path)
57: @children[path]
58: end
59:
60: def rank_list(rank, source)
61: @path_by_rank[rank][source]
62: end
63:
64: def generate_children(source = @config.source, ids = new_ids, path = "", rank = 0)
65: ids[source] = true
66: path = "#{path}#{source}"
67: @path_by_rank[rank][source].push path
68:
69: if rank < @config.num_rounds
70: for i in 0...@config.num_procs
71: unless ids[i]
72: generate_children(i, new_ids(ids), path, rank + 1)
73: @children[path].push("#{path}#{i}")
74: end
75: end
76: end
77:
78: if (DEBUG)
79: print("generate_children(#{source}, #{rank}, #{path}, children = ")
80: @children[path].each do |child|
81: print(child + " ")
82: end
83: puts("")
84: end
85: end
86:
87: private
88: def new_ids(old_ids = nil)
89: #old_ids.nil? ? Array.new(@config.num_procs) {|i| false} : Array.new(old_ids)
90: old_ids.nil? ? Hash.new {|h,k| h[k] = false} : old_ids.dup
91: end
92: end
93:
94:
95:
96: class ValueStrategy
97: attr_accessor :config
98: attr_accessor :source_value
99:
100: def initialize(config)
101: @config = config
102: @source_value = Node.new(Value::ZERO, Value::UNKNOWN)
103: end
104:
105: def create_value(value, source, destination, path)
106: if @config.source == source
107: (destination & 1) != 0 ? Value::ZERO : Value::ONE
108: elsif source == 2
109: Value::ONE
110: else
111: value
112: end
113: end
114:
115:
116: def get_default
117: return Value::ONE
118: end
119:
120: def faulty?(process)
121: process == @config.source || process == 2
122: end
123: end
124:
125:
126:
127: class LProcess
128: @@total_messages = 0
129: attr_reader :id
130: attr_reader :config
131: attr_reader :strategy
132: attr_reader :repository
133: attr_reader :broadcaster
134: #
135: def initialize(id, config, repository, broadcaster, strategy)
136: @id = id
137: @config = config
138: @repository = repository
139: @strategy = strategy
140: @broadcaster = broadcaster
141: @nodes = Hash.new
142: end
143:
144:
145: def self.total_messages
146: @@total_messages
147: end
148: def self.reset_total_messages
149: @@total_messages = 0
150: end
151:
152: def receive_message(path, node)
153: @nodes[path] = node
154: @@total_messages += 1
155: end
156:
157: def send_messages(round)
158: @repository.rank_list(round, @id).each do |path|
159: source_path = path.slice(0, path.length-1)
160: source = @nodes[source_path]
161: raise "Source path #{source_path} not found" if source.nil?
162: broadcaster.broadcast(round, @id, source, path)
163: end
164: end
165:
166: def decide
167: #### Step 1 - set the leaf values
168: for i in 0...@config.num_procs
169: @repository.rank_list(@config.num_rounds, i).each do |path|
170: node = @nodes[path]
171: node.output = node.input
172: end
173: end
174:
175: ### Step 2 - work up the tree
176: (@config.num_rounds - 1).step 0, -1 do |round|
177: for i in 0...@config.num_procs
178: @repository.rank_list(round, i).each do |path|
179: @nodes[path].output = find_majority(path)
180: end
181: end
182: end
183:
184: path_list = @repository.rank_list(0, @config.source)
185: top_path = path_list[0]
186: @nodes[top_path].output
187: end
188:
189: def find_majority(path)
190: counts = Hash.new(0)
191: list = @repository.path_children(path)
192: n = 0
193: if list
194: n = list.size
195: list.each do |child|
196: node = @nodes[child]
197: counts[node.output] = counts[node.output] + 1 if node
198: end
199: end
200: #
201: if (counts[Value::ONE] > ( n / 2 ) )
202: Value::ONE
203: elsif (counts[Value::ZERO] > ( n / 2 ) )
204: Value::ZERO
205: elsif (counts[Value::ONE] == counts[Value::ZERO] &&
206: counts[Value::ONE] == (n / 2))
207: @strategy.get_default
208: else
209: Value::UNKNOWN
210: end
211: end
212:
213: def faulty?
214: @strategy.faulty?(id)
215: end
216:
217: def source?
218: @config.source == id
219: end
220: end
221:
222:
223: class GeneralProcess < LProcess
224: def initialize(id, config, repository, broadcaster, strategy)
225: super(id, config, repository, broadcaster, strategy)
226: @nodes[""] = @strategy.source_value
227: end
228:
229:
230: def decide
231: @nodes[""].input
232: end
233: end
234:
235: class Engine
236: attr_reader :config
237: attr_reader :repository
238: attr_reader :procs
239: attr_reader :strategy
240:
241: def initialize(source, num_rounds, num_procs)
242: @config = Configuration.new(source, num_rounds, num_procs)
243: @strategy = ValueStrategy.new(config)
244: @repository = PathRepository.new(config)
245: @procs = []
246: end
247:
248: def broadcast(round, id, source, path)
249: for j in 0...@config.num_procs
250: unless j == @config.source
251: value = @strategy.create_value(source.input, id, j, path)
252: if (DEBUG)
253: source_path = path.slice(0, path.length-1)
254: puts("Sending for round #{round} from process #{id} to #{j} : #{value}, #{path}, #{Value::UNKNOWN}, getting value from source #{source_path}")
255: end
256: @procs[j].receive_message(path, Node.new(value, Value::UNKNOWN))
257: end
258: end
259: end
260:
261:
262: def run
263: for i in 0...@config.num_rounds
264: for j in 0...@config.num_procs
265: @procs[j].send_messages(i)
266: end
267: end
268:
269: for j in 0...@config.num_procs
270: print("Source ") if @procs[j].source?
271: print("Process #{j}")
272: unless @procs[j].faulty?
273: print("decides on value #{@procs[j].decide}")
274: else
275: print(" is faulty")
276: end
277: puts("")
278: end
279: end
280:
281:
282: def start
283: for i in 0...@config.num_procs
284: process = i == @config.source ?
285: GeneralProcess.new(i, @config, @repository, self, @strategy) :
286: LProcess.new(i, @config, @repository, self, @strategy)
287: @procs.push(process)
288: end
289: @repository.generate_children
290: end
291: end
292:
293:
294: class ByzGeneralTest < Test::Unit::TestCase
295: def setup
296: end
297:
298:
299: def run_engine(m, n, source)
300: r = Benchmark.realtime() do
301: puts("Starting|#{m}|#{n}|#{source}")
302: engine = Engine.new(source, m, n)
303: engine.start
304: engine.run
305: end
306: puts("Finished|#{m}|#{n}|#{source}|#{LProcess.total_messages}|#{r*1000}")
307: Process.reset_total_messages
308: end
309:
310:
311: def xtest_once
312: n = 5
313: m = 6
314: source = 3
315: run_engine(m, n, source)
316: end
317:
318: def test_multiple
319: 5.step 50, 5 do |n|
320: 10.step 100, 10 do |m|
321: source = n / 3
322: run_engine(m, n, source)
323: end
324: end
325: end
326: end
327: Test::Unit::UI::Console::TestRunner.run(ByzGeneralTest)
328:
Finally here is the Erlang source code (Again I am showing multiple modules here). I tried to break the structure same as my Java and Ruby version and the application consisted of process, strategy, repository, engine, node, and config modules. The benchmarks were kicked off from the byz_general_test, which invoked engine and engine created processes for repository, generals and lieutenants. Another distinction is difference between active objects and regular functions. In this design, repository and processes are active objects that can receive messages via Erlang’s message communication primitives. Also, I added message counter just to keep track of number of messages for the simulation (though it is not requirement for the algorithm). I used ets APIs in repository to store paths of messages.
1:
2:
3: -module(config).
4: -compile(export_all).
5:
6:
10: -record(config, {source, num_procs, num_rounds}).
11:
12: new(S, N, M) ->
13: #config{source = S, num_procs = N, num_rounds = M}.
14:
15: get_source(Self) ->
16: Self#config.source.
17: get_num_procs(Self) ->
18: Self#config.num_procs.
19: get_num_rounds(Self) ->
20: Self#config.num_rounds.
21:
25: -module(counter).
26: -export([start/0, get_value/0, increment/0, reset/0, die/0, test_counter/0]).
27: -include("byz_general_test.hrl").
28:
29:
33:
34:
35: start() ->
36: register(message_counter, spawn(fun() -> loop(0) end)).
37:
38:
39: get_value() ->
40: lib_util:rpc(message_counter, value).
41:
42:
43: increment() ->
44: message_counter ! {self(), increment}.
45:
46: reset() ->
47: message_counter ! {self(), reset}.
48:
49:
50: die() ->
51: message_counter ! {self(), die, "Exiting"}.
52:
53:
54:
55: loop(N) ->
56: receive
57: {_From, increment} ->
58: loop(N+1);
59: {_From, reset} ->
60: loop(0);
61: {From, value} ->
62: From ! {message_counter, N},
63: loop(N);
64: {_From, die, Reason} ->
65: unregister(message_counter),
66: exit(Reason);
67: Any ->
68: io:format("Unexpected message ~p~n", [Any])
69: end.
70:
71:
72:
76: test_counter() ->
77: start(),
78: increment(),
79: increment(),
80: 2 = get_value(),
81: reset(),
82: 0 = get_value(),
83: die().
87: -module(engine).
88: -export([start/3, init/0, run/0, reset/0, test/0]).
89:
90:
94:
95: init() ->
96: counter:start(),
97: repository:start().
98:
99: reset() ->
100: counter:reset(),
101: repository:reset().
102:
103: start(Source, N, M) ->
104: Config = config:new(Source, N, M),
105: put(config, Config),
106: ProcIds = lists:seq(0, config:get_num_procs(Config)-1),
107: Pids = lists:map(fun(I) -> process:start(I, Config) end, ProcIds),
108: put(pids, Pids),
109: lists:foreach(fun(Pid) -> process:init(Pid, Pids) end, Pids),
110: repository:generate_children(Config),
111: Config.
112:
113: run() ->
114: Config = get(config),
115: Ids = lists:seq(0, config:get_num_procs(Config)-1),
116: lists:foreach(fun(Id) -> send_message(Id) end, Ids),
117: lists:foreach(fun(Id) -> print_result(Id) end, Ids).
118:
122: get_pid(Id) ->
123: Pids = get(pids),
124: lists:nth(Id+1, Pids).
125:
126: send_message(Id) ->
127: Config = get(config),
128: RoundIds = lists:seq(0, config:get_num_rounds(Config)-1),
129: lists:foreach(fun(Round) -> send_message(Id, Round) end, RoundIds).
130:
131: send_message(Id, Round) ->
132: Pid = get_pid(Id),
133: process:send_messages(Pid, Round, Id).
134:
135: print_result(Id) ->
136: Config = get(config),
137: Source = config:get_source(Config),
138: case Source of
139: Id ->
140: io:format("Source ");
141: _ ->
142: true
143: end,
144: io:format("Process ~p ", [Id]),
145:
146: Pid = get_pid(Id),
147: Faulty = process:is_faulty(Pid),
148: if
149: Faulty ->
150: io:format(" is faulty~n");
151: true ->
152: Decision = process:decide(Pid),
153: io:format(" decides on value ~p~n", [Decision])
154: end.
155:
159: test() ->
160: init(),
161: Config = start(3, 5, 6),
162: run(),
163: Config.
164: -module(lib_util).
165: -compile(export_all).
166:
170: rpc(Pid, Request) ->
171: Pid ! {self(), Request},
172: receive
173: {Pid, Response} ->
174: Response
175: end.
176: -module(node).
177: -include("byz_general_test.hrl").
178: -compile(export_all).
179:
183: -record(node, {input, output}).
184:
185: new() ->
186: #node{input = ?FAULTY, output = ?FAULTY}.
187: new(I, O) ->
188: #node{input = I, output = O}.
189:
190:
191: set_input(Self, I) ->
192: Self#node{input = I}.
193: get_input(Self) ->
194: Self#node.input.
195:
196: set_output(Self, O) ->
197: Self#node{output = O}.
198: get_output(Self) ->
199: Self#node.output.
200: set_output_as_input(Self) ->
201: O = get_output(Self),
202: Self#node{output = O}.
203:
204:
205:
206:
207:
211: -module(process).
212: -export([init/2, start/2, receive_message/3, send_messages/3, decide/1, find_majority/2, is_faulty/1, is_source/1, test_process/0]).
213: -include("byz_general_test.hrl").
214:
215:
220:
221: %
222: %%% starts process loop
223: %
224: start(Id, Config) ->
225: spawn(fun() -> loop(Id, Config) end).
226:
227: receive_message(Pid, Path, Node) ->
228: %lib_util:rpc(Pid, {receive_message, Path, Node}).
229: Pid ! {self(), {receive_message, Path, Node}}.
230:
231: send_messages(Pid, Round, Id) ->
232: %lib_util:rpc(Pid, {send_messages, Round, Id}).
233: Pid ! {self(), {send_messages, Round, Id}}.
234:
235: init(Pid,Pids) ->
236: Pid ! {self(), init, Pids}.
237:
238: decide(Pid) ->
239: lib_util:rpc(Pid, decide).
240:
241: find_majority(Pid, Path) ->
242: lib_util:rpc(Pid, {find_majority, Path}).
243:
244: is_faulty(Pid) ->
245: lib_util:rpc(Pid, is_faulty).
246:
247: is_source(Pid) ->
248: lib_util:rpc(Pid, is_source).
249:
254: put(config, Config),
255: put(allPids, AllPids),
256: SourcePid = lists:nth(config:get_source(Config)+1, AllPids),
257: put(lieutenants, AllPids -- [SourcePid]),
258: Nodes = dict:new(),
259: put(nodes, Nodes),
260: Source = config:get_source(Config),
261: SourceValue = strategy:source_value(),
262: if
263: Source =:= Id ->
264: Nodes1 = dict:store("", SourceValue, Nodes),
265: put(nodes, Nodes1);
266: true ->
267: true
268: end.
269:
270:
271:
272: %
273: %%% stores message receievd in nodes dictionary
274: %%% we are also keeping track of total number of messages received in simulation.
275: %
276: receive_message(Path, Node) ->
277: Nodes = get(nodes),
278: Nodes1 = dict:store(Path, Node, Nodes),
279: put(nodes, Nodes1),
280: counter:increment().
281:
282: get_node(Path) ->
283: Nodes = get(nodes),
284: Response = dict:find(Path, Nodes),
285: case Response of
286: {ok, Node} ->
287: Node;
288: _ ->
289: %io:format("Failed to find node for path ~p~n", [Path]),
290: node:new()
291: end.
292:
293: %
294: %%%
295: %
296: psend_messages(Round, Id) ->
297: L = repository:get_rank_list(Round, Id),
298: lists:foreach(
299: fun(Path) ->
300: psend_messages(Round, Id, Path) end, L).
301:
302: psend_messages(Round, Id, Path) ->
303: Length = string:len(Path),
304: SourcePath = if
305: Length > 1 ->
306: string:substr(Path, 1, Length-1);
307: true ->
308: ""
309: end,
310: Source = get_node(SourcePath),
311: broadcast(Round, Id, Source, Path).
312:
313:
314: %
315: %%% broadcast message to all processes
316: %
317: broadcast(Round, Id, SourceNode, Path) ->
318: Config = get(config),
319: ProcIds = lists:seq(0, config:get_num_procs(Config)-1),
320: IdsToProcess = ProcIds -- [config:get_source(Config)],
321: lists:foreach(
322: fun(Dest) ->
323: broadcast(Round, Id, SourceNode, Path, Dest) end, IdsToProcess).
324:
325: broadcast(_Round, Id, SourceNode, Path, Dest) ->
326: Config = get(config),
327: Value = strategy:create_value(Config, node:get_input(SourceNode), Id, Dest, Path),
328: ProcPids = get(allPids),
329: ProcPid = lists:nth(Dest+1, ProcPids),
330: receive_message(ProcPid, Path, node:new(Value, ?UNKNOWN)).
331:
332:
333: decide() ->
334: %%% Step 1 - set the leaf values
335: Config = get(config),
336: ProcIds = lists:seq(0, config:get_num_procs(Config)-1),
337: lists:foreach(fun(X) -> reset_node(X) end, ProcIds),
338: %%% Step 2 - work up the tree
339: RoundIds = lists:reverse(lists:seq(0, config:get_num_rounds(Config)-1)),
340: lists:foreach(
341: fun(Round) ->
342: set_node_value(Round) end, RoundIds),
343: Source = config:get_source(Config),
344: Result = repository:get_rank_list(0, Source),
345: case Result of
346: [] ->
347: ?UNKNOWN;
348: [TopPath] ->
349: node:get_output(get_node(TopPath));
350: [TopPath|_] ->
351: node:get_output(get_node(TopPath))
352: end.
353:
354:
355: set_node_value(Round) ->
356: Config = get(config),
357: ProcIds = lists:seq(0, config:get_num_procs(Config)-1),
358: lists:foreach(
359: fun(Id) ->
360: set_node_value(Round, Id) end, ProcIds).
361:
362: set_node_value(Round, Id) ->
363: L = repository:get_rank_list(Round, Id),
364: lists:foreach(
365: fun(Path) ->
366: set_node_value(Round, Id, Path) end, L).
367:
368: set_node_value(_Round, _Id, Path) ->
369: Nodes = get(nodes),
370: Node = get_node(Path),
371: Value = find_majority(Path),
372: Nodes1 = dict:store(Path, node:set_output(Node, Value), Nodes),
373: put(nodes, Nodes1).
374:
375: reset_node(I) ->
376: Config = get(config),
377: Nodes = get(nodes),
378: L = repository:get_rank_list(config:get_num_rounds(Config), I),
379: lists:foreach(fun(Path) ->
380: Node = get_node(Path),
381: Nodes1 = dict:store(Path, node:set_output_as_input(Node), Nodes),
382: put(nodes, Nodes1)
383: end, L).
384:
385:
386: increment_count(Child) ->
387: Counts = get(counts),
388: Node = get_node(Child),
389: Count = dict:fetch(node:get_output(Node), Counts) + 1,
390: Counts1 = dict:store(node:get_output(Node), Count, Counts),
391: put(counts, Counts1).
392:
393: find_majority(Path) ->
394: Counts = dict:new(),
395: Counts1 = dict:store(?ONE, 0, Counts),
396: Counts2 = dict:store(?ZERO, 0, Counts1),
397: Counts3 = dict:store(?UNKNOWN, 0, Counts2),
398: put(counts, Counts3),
399: L = repository:get_children_path(Path),
400: N = length(L),
401: lists:foreach(fun(Child) ->
402: increment_count(Child) end, L),
403: Counts4 = get(counts),
404: OneCount = dict:fetch(?ONE, Counts4),
405: ZeroCount = dict:fetch(?ZERO, Counts4),
406:
407: if
408: OneCount > ( N / 2 ) ->
409: ?ONE;
410: ZeroCount > ( N / 2 ) ->
411: ?ZERO;
412: OneCount == ZeroCount andalso OneCount == ( N / 2 ) ->
413: strategy:get_default();
414: true ->
415: ?UNKNOWN
416: end.
417:
418:
419: loop(Id, Config) ->
420: receive
421: {_From, init, AllPids} ->
422: init(Id, Config, AllPids),
423: loop(Id, Config);
424: {_From, {receive_message, Path, Node}} ->
425: receive_message(Path, Node),
426: %From ! {self(), done},
427: loop(Id, Config);
428: {_From, {send_messages, Round, Id}} ->
429: psend_messages(Round, Id),
430: %From ! {self(), done},
431: loop(Id, Config);
432: {From, decide} ->
433: From ! {self(), decide()},
434: loop(Id, Config);
435: {From, is_faulty} ->
436: From ! {self(), strategy:is_faulty(Config, Id)},
437: loop(Id, Config);
438: {From, is_source} ->
439: From ! {self(), config:get_source(Config) == Id},
440: loop(Id, Config);
441: {From, {find_majority, Path}} ->
442: From ! {self(), find_majority(Path)},
443: loop(Id, Config);
444: Any ->
445: io:format("Unexpected ~p~n", [Any]),
446: loop(Id, Config)
447: end.
448:
449:
453: test_process() ->
454: counter:start(),
455: repository:start(),
456: Config = config:new(3, 5, 6),
457: Pid = start(0, Config),
458: init(Pid, [Pid, 1, 2, 3, 4, 5, 6]),
459: Path = "mypath",
460: Node = node:new(),
461: receive_message(Pid, Path, Node),
462: send_messages(Pid, 0, 0),
463: ?UNKNOWN = decide(Pid),
464: ?ONE = find_majority(Pid, Path),
465: false = is_faulty(Pid),
466: false = is_source(Pid).
467:
468:
469:
470:
471:
472:
476: -module(repository).
477: -export([start/0, reset/0, get_rank_list/2, get_children_path/1, generate_children/1, die/0, test_generate/0, test_path_ranks/0, test_children/0]).
478: -include("byz_general_test.hrl").
479:
480:
481:
482:
486: start() ->
487: register(repository, spawn(fun() -> init_loop() end)).
488:
489:
490: init_loop() ->
491: Children = ets:new(children, [set]),
492: PathsByRank = ets:new(path_ranks, [set]),
493: loop(Children, PathsByRank).
494:
495: get_children_path(Path) ->
496: lib_util:rpc(repository, {get_children_path, Path}).
497:
498: get_rank_list(Rank, Source) ->
499: lib_util:rpc(repository, {get_rank_list, Rank, Source}).
500:
501: generate_children(Config) ->
502: repository ! {self(), generate_children, Config}.
503:
504: reset() ->
505: repository ! {self(), reset}.
506:
507: die() ->
508: repository ! {self(), die, "Exiting"}.
509:
515:
519: table_lookup(Tab, Key) ->
520: Result = ets:lookup(Tab, Key),
521: case Result of
522: [] ->
523: [];
524: error ->
525: [];
526: {ok, Value} ->
527: lists:reverse(Value);
528: [{Key, Value}] ->
529: lists:reverse(Value)
530: end.
531:
532: table_insert(Tab, Key, Value) ->
533: NewValue = case table_lookup(Tab, Key) of
534: [] ->
535: [Value];
536: L ->
537: [Value|L]
538: end,
539: ets:insert(Tab, {Key, NewValue}).
540:
541:
542:
546: set_children_path(Children, PathKey, PathValue) ->
547: table_insert(Children, PathKey, PathValue).
548:
549: get_children_path(Children, Path) ->
550: table_lookup(Children, Path).
551:
552:
553:
554:
558: get_rank_list(PathsByRank, Rank, Source) ->
559: table_lookup(PathsByRank, {Rank, Source}).
560:
561:
562: set_rank_list(PathsByRank, Rank, Source, Path) ->
563: Key = {Rank, Source},
564: table_insert(PathsByRank, Key, Path).
565:
566:
567: generate_children(Config, Children, PathsByRank) ->
568: generate_children(
569: Config, Children, PathsByRank,
570: config:get_source(Config),
571: [],
572: "",
573: 0).
574:
575: generate_children(Config, Children, PathsByRank, Source, Ids, Path, Rank) ->
576: Ids1 = [Source|Ids],
577: Path1 = Path ++ integer_to_list(Source),
578: set_rank_list(PathsByRank, Rank, Source, Path1),
579: %%%
580: Rounds = config:get_num_rounds(Config),
581: if
582: Rank < Rounds ->
583: IdsToProcess = lists:seq(0, config:get_num_procs(Config)-1) -- Ids1,
584: lists:foreach(
585: fun(Source1) -> generate_children(Config, Children, PathsByRank, Source1, Ids1, Path1, Rank+1),
586: set_children_path(Children, Path1, Path1 ++ integer_to_list(Source1))
587: end, IdsToProcess);
588: true ->
589: true
590: end,
591: Children.
592:
593:
594: loop(Children, PathsByRank) ->
595: receive
596: {From, {get_rank_list, Rank, Source}} ->
597: From ! {repository, get_rank_list(PathsByRank, Rank, Source)},
598: loop(Children, PathsByRank);
599: {From, {get_children_path, Path}} ->
600: From ! {repository, get_children_path(Children, Path)},
601: loop(Children, PathsByRank);
602: {_From, reset} ->
603: ets:delete_all_objects(Children),
604: ets:delete_all_objects(PathsByRank),
605: loop(Children, PathsByRank);
606: {_From, generate_children, Config} ->
607: generate_children(Config, Children, PathsByRank),
608: loop(Children, PathsByRank);
609: {_From, die, Reason} ->
610: ets:delete(Children),
611: ets:delete(PathsByRank),
612: exit(Reason);
613: Any ->
614: io:format("Unexpected message~p~n", [Any])
615: end.
616:
617:
618:
720:
724: benchmark_byz_general() ->
725: engine:init(),
726: benchmark_byz_general(5).
727:
728: benchmark_byz_general(N) when N < 100 ->
729: benchmark_byz_general(N, 10),
730: benchmark_byz_general(N+10);
731: benchmark_byz_general(N) when N >= 100 ->
732: true.
733:
734: benchmark_byz_general(N, M) when M < 100 ->
735: statistics(runtime),
736: statistics(wall_clock),
737: Source = round(N / 3),
738: io:format("Starting|~p|~p|~p~n", [M, N, Source]),
739: engine:start(Source, N, M),
740: engine:run(),
741: {_, RT} = statistics(runtime),
742: {_, WC} = statistics(wall_clock),
743: io:format("Finished|~p|~p|~p|~p|~p|~p~n", [M, N, Source, counter:get_value(), RT, WC]),
744: engine:reset(),
745: benchmark_byz_general(N, M+10);
746: benchmark_byz_general(_N, M) when M >= 100 ->
747: true.
748:
749:
I merged the results with following script:
1: class Stats
2: attr_accessor :processes
3: attr_accessor :rounds
4: attr_accessor :messages
5: attr_accessor :java_time
6: attr_accessor :erlang_time
7: attr_accessor :cpp_time
8: attr_accessor :ruby_time
9: def initialize(procs, rounds, msgs)
10: @processes = procs.to_i
11: @rounds = rounds.to_i
12: @messages = msgs.to_i
13: end
14: def key
15: "#{@processes}/#{@rounds}"
16: end
17: def to_s
18: "#{@processes},#{rounds},#{@messages},#{@cpp_time},#{@ruby_time},#{@java_time},#{@erlang_time}"
19: end
20: end
21:
22: stats = {}
23:
24: files = ["cpp.out", "ruby.out", "java.out", "erlang.out"]
25: count_setter = ["cpp_time=", "ruby_time=", "java_time=", "erlang_time="]
26: for i in 0...files.length
27: File.open(files[i], "r").readlines.each do |line|
28: if line =~ /Finish/
29: t = line.split("|")
30: stat = Stats.new(t[2], t[1], t[4])
31: time = t[5].to_i
32: stat.send(count_setter[i], time)
33: stats[stat.key] = stat
34: end
35: end
36: end
37:
38:
39: puts "Processes,Rounds,Messages,CPP Time,Ruby Time,Java Time,Erlang Time"
40: stats.values.sort_by {|stat| stat.processes * stat.rounds}.each do |stat|
41: puts stat
42: end
43:
And here are the results of benchmark that I ran on my Dell notebook (Note I used time function in C++ which only returns timing in seconds, so C++ timings are actually not precise.):

Conclusion:
This was interesting problem that tested limits of these languages. For example, I found when using more than 10 processes things got really nasty. The Java program gave up with OutOfMemory. The Erlang program dumped crashed, though I would have used OTP’s supervisors if I was writing more fault tolerant application. Ruby program became too slow, so I had to kill it.
Java turned out to be the performance leader in this case and I was a bit surprised when Erlang’s response time became really high with 9 processes. As, I mentioned earlier, the C++, Java and Ruby versions are not really concurrent and their message passing is really method invocation.
As far as mapping algorithm to the language, I found that Erlang fit very nicely for distributed algorithms that involve a lot of communications. I left the C++, Java and Ruby version very simple and didn’t try to implement truly indpendent processes and communication because that would have required a lot more effort.
Resources:
Source Code