I have been reading Joe Armstrong’s Erlang book recently and one of the assignment is to compare performance of processes and message communication by creating a ring of processes and sending messages. The assignment needs to be done in two languages so I Java as another language.
Here is the Java version:
1 import java.util.*; 2 import java.util.concurrent.*; 3 4 /** 5 * Create N followers in a ring. Send a message round the ring M times so that a total of N * M messages get sent. 6 * Time how long this takes for different value of N and M. 7 * javac Ring.java 8 * java -Xss48k -cp . Ring 9 */ 10 public class Ring { 11 // 12 public class Follower extends Thread { 13 final BlockingQueue queue = new LinkedBlockingQueue(); 14 Follower nextFollower; 15 int pid; 16 int numberOfMessages; 17 18 public Follower(int pid, int numberOfMessages) { 19 this.pid = pid; 20 this.numberOfMessages = numberOfMessages; 21 } 22 23 public void setNextFollower(Follower nextFollower) { 24 this.nextFollower = nextFollower; 25 } 26 27 28 public void sendAsynchronous(Object message) { 29 queue.add(message); 30 } 31 32 public void run() { 33 //System.out.println("Starting Pid " + pid + ", numberOfMessages " + numberOfMessages); 34 for (int i=0; i<numberOfMessages; i++) { 35 try { 36 Object message = queue.take(); 37 nextFollower.sendAsynchronous(message); 38 } catch (InterruptedException e) { 39 Thread.currentThread().interrupted(); 40 } 41 } 42 //System.out.println("Ending Pid " + pid + ", numberOfMessages " + numberOfMessages); 43 } 44 } 45 46 public class Leader extends Follower { 47 public Leader(int pid) { 48 super(pid, 0); 49 } 50 51 52 public void run() { 53 // leader will not run asynchronously 54 } 55 56 public void sendReceiveSynchronous(Object message) { 57 try { 58 nextFollower.sendAsynchronous(message); 59 message = queue.take(); 60 } catch (InterruptedException e) { 61 Thread.currentThread().interrupted(); 62 } 63 } 64 } 65 66 67 public Ring() { 68 } 69 70 71 72 public void runRing(int numberOfProcesses, int numberOfMessages) { 73 Leader leader = new Leader(-1); 74 Follower[] followers = buildFollowers(numberOfProcesses-1, numberOfMessages, leader); 75 startFollowers(followers); 76 for (int i=0; i<numberOfMessages; i++) { 77 leader.sendReceiveSynchronous(Boolean.TRUE); // we are not taking message size into account 78 } 79 } 80 81 82 public void benchmarkRing(int numberOfProcesses, int numberOfMessages) { 83 System.out.println("Starting ring for " + numberOfProcesses + " threads and " + numberOfMessages + " messages"); 84 long start = System.currentTimeMillis(); 85 // 86 runRing(numberOfProcesses, numberOfMessages); 87 88 long elapsed = System.currentTimeMillis() - start; 89 System.out.println("Ring for " + numberOfProcesses + " threads and " + numberOfMessages + " messages took " + elapsed + " milliseconds."); 90 } 91 92 private void startFollowers(Follower[] followers) { 93 for (int i=0; i<followers.length; i++) { 94 followers[i].setDaemon(true); 95 followers[i].start(); 96 } 97 } 98 99 100 private Follower[] buildFollowers(int numberOfFollowers, int numberOfMessages, Leader leader) { 101 Follower[] followers = new Follower[numberOfFollowers]; 102 for (int i=0; i<followers.length; i++) { 103 followers[i] = new Follower(i, numberOfMessages); 104 } 105 leader.setNextFollower(followers[0]); 106 for (int i=0; i<followers.length; i++) { 107 if (i == followers.length-1) { 108 followers[i].setNextFollower(leader); 109 } else { 110 followers[i].setNextFollower(followers[i+1]); 111 } 112 } 113 return followers; 114 } 115 116 117 // 118 public static void main(String[] args) { 119 for (int i=100; i<10000; i+=100) { 120 for (int j=100; j<10000; j+=1000) { 121 new Ring().benchmarkRing(i, j); 122 } 123 } 124 } 125 } 126
And here is the Erlang version:
1 -module(ring). 2 -compile(export_all). 3 4 % c(ring). 5 % Pid = ring:start(2). 6 % Pid ! {self(), 0}. 7 % Pid ! {self(), 2}. 8 % ring:benchmark_ring(10, 4). 9 10 11 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 12 % start - spawns a process and runs receive_send_loop function. It passes M - # of messages to the 13 % loop. 14 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 15 start(M, NextPid) -> 16 spawn(fun() -> receive_send_loop(M, NextPid) end). 17 18 19 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 20 % ring stars N processes, each will run receive_send_loop. 21 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 22 start_ring(N, M) -> 23 LastPid = self(), 24 start_ring(N-1, M, LastPid). 25 26 start_ring(N, M, LastPid) when N > 0 -> 27 Pid = start(M, LastPid), 28 start_ring(N-1, M, Pid); 29 start_ring(N, _M, LastPid) when N == 0 -> 30 LastPid. 31 32 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 33 % send_receive_message sends a message with D, M number of times to all processes inside in list L 34 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 35 send_receive_message(M, D, LastPid) when M > 0 -> 36 Payload = case M of 37 1 -> {nomore, D}; 38 _Else -> {ok, D} 39 end, 40 %io:format("Master ~p Sending message to ~p~n", [self(), LastPid]), 41 LastPid ! Payload, 42 43 %io:format("Master ~p Reciving message ~n", [self()]), 44 receive 45 {ok, Response} -> 46 Response 47 end, 48 %io:format("Master ~p Received message ~n", [self()]), 49 send_receive_message(M-1, D, LastPid); 50 51 send_receive_message(M, _D, _LastPid) when M == 0 -> 52 true. 53 54 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 55 % benchmark_ring invokes ring function and calculates timings. 56 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 57 benchmark_ring() -> 58 benchmark_ring(100). 59 60 benchmark_ring(N) when N < 10000 -> 61 benchmark_ring(N, 100), 62 benchmark_ring(N+100); 63 benchmark_ring(N) when N >= 10000 -> 64 true. 65 66 benchmark_ring(N, M) when M < 10000 -> 67 io:format("Starting ring for ~w processes and ~w messages.~n", [N, M]), 68 statistics(runtime), 69 statistics(wall_clock), 70 LastPid = start_ring(N, M), 71 send_receive_message(M, 'message', LastPid), 72 {_, RT} = statistics(runtime), 73 {_, WC} = statistics(wall_clock), 74 io:format("Ring for ~w processes and ~w messages took ~p (~p) milliseconds.~n", [N, M, RT, WC]), 75 benchmark_ring(N, M+1000); 76 benchmark_ring(_N, M) when M >= 10000 -> 77 true. 78 79 80 81 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 82 % receive_send_loop receives messages in a loop until it receives nomore. 83 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 84 receive_send_loop(M, NextPid) -> 85 receive 86 {nomore, Any} -> 87 %io:format("~p received ~p last message ~p nextPid ~p ~n", [self(), M, NextPid, Any]), 88 NextPid ! {ok, Any}; 89 {ok, Any} -> 90 %io:format("~p received ~p message ~p nextPid ~p ~n", [self(), M, NextPid, Any]), 91 NextPid ! {ok, Any}, 92 receive_send_loop(M-1, NextPid) 93 end. 94 95
And the verdict is Erlang is much more efficient than Java in process/thread creation and message communication I had to actually explicitly reduce the stack size to run Java program, but there is no comparison. Here is the results of benchmarks.
Here is simple ruby script that I used to merge output statistics from both runs:
1 class Stats 2 attr_accessor :processes 3 attr_accessor :messages 4 attr_accessor :java_time 5 attr_accessor :erlang_time 6 def initialize(procs, msgs, jtime, etime) 7 @processes = procs.to_i 8 @messages = msgs.to_i 9 @java_time = jtime.to_i if jtime 10 @erlang_time = etime.to_i if etime 11 end 12 def key 13 "#{@processes}/#{@messages}" 14 end 15 16 def to_s 17 "#{@processes},#{@messages},#{@java_time},#{@erlang_time}" 18 end 19 end 20 21 stats = {} 22 File.open("javaring.out", "r").readlines.each do |line| 23 if line =~ /Ring for/ 24 t = line.split(/ /) 25 stat = Stats.new(t[2], t[5], t[8], nil) 26 stats[stat.key] = stat 27 #puts "#{stat} --- for line #{t.join(', ')}" 28 end 29 end 30 31 File.open("erlout", "r").readlines.each do |line| 32 if line =~ /Ring for/ 33 t = line.split(/ /) 34 stat = Stats.new(t[2], t[5], nil, t[8]) 35 old = stats[stat.key] 36 if old 37 old.erlang_time = stat.erlang_time 38 else 39 stats[stat.key] = stat 40 end 41 #puts "#{stat} --- for line #{t.join(', ')}" 42 end 43 end 44 stats.values.sort_by {|stat| stat.processes * stat.messages}.each do |stat| 45 puts stat 46 end 47 48
Final Thoughts
Erlang is great for writing highly concurrent applications. It shows that smart use of green threads can outperform native threads. The only thing I found a bit verbose about Erlang is that you have to write a switch statement to receive messages. I wish these processes be more object oriented where message passing is done by method invocation instead of explicitly as it’s done in Erlang. What I mean is, instead of writing
receive {label1, From, RealData} -> action; {label2, From, RealData} -> action;
It be more like module with functions defined as label1, label2, .. and arguments that accept From, RealData. Also, a lot of time, you have to send back a message, which can also be implicitly sent if the function returns anything. Back in mid 90s I wrote a Java based ORB that had a ServiceFactory that took any POJO object and converted that into Service. I consider these processes as small services and it would be nice to have same mechanism to convert module into service. Another thing I find hard about Erlang (besides immutable data) are cryptic error messages. I am used to seeing the line number or stack trace where the error occurred, but Erlang error are very cryptic. As with learning a new language, you also have to learn all the libraries and Erlang has huge OTP beast that I will have to tame. Nevertheless, I like Erlang so far and it’s going to be favorite language.