Shahzad Bhatti Welcome to my ramblings and rants!

September 19, 2007

Erlang Solution for finding 51 billionth letter when converting integers from 1 to 999,999,999 are written as words.

Filed under: Computing — bhatti @ 6:54 pm

I came across an interesting problem from ITA software about If the integers from 1 to 999,999,999 are written as words, sorted alphabetically, and concatenated, what is the 51 billionth letter? For those who don’t know ITA software writes software to search airline flights and I used it when working for Orbitz. Their software is still the fastest I know and I like this problem that they posted for recruiting. Since, I am learning Erlang lately, so I gave a try in it. Here is the source code:

  1 -module(ita_puzzle).
  2 -compile(export_all).
  3
  4
  5 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  6 %%% Puzzle from ITASoftware http://www.itasoftware.com/careers/puzzles07.html
  7 %%% Word Numbers
  8 %%% "If the integers from 1 to 999,999,999 are written as words, sorted 
  9 %%%  alphabetically, and concatenated, what is the 51 billionth letter?" 
 10 %%% To be precise: if the integers from 1 to 999,999,999 are expressed in 
 11 %%% words (omitting spaces, 'and', and punctuation[1]), and sorted 
 12 %%% alphabetically so that the first six integers are
 13 %%%
 14 %%%    * eight
 15 %%%    * eighteen
 16 %%%    * eighteenmillion
 17 %%%    * eighteenmillioneight
 18 %%%    * eighteenmillioneighteen
 19 %%%    * eighteenmillioneighteenthousand
 20 %%%
 21 %%%    and the last is
 22 %%%
 23 %%%    * twothousandtwohundredtwo
 24 %%%
 25 %%% then reading top to bottom, left to right, the 28th letter completes 
 26 %%% the spelling of the integer "eighteenmillion".
 27 %%%
 28 %%% The 51 billionth letter also completes the spelling of an integer. 
 29 %%% Which one, and what is the sum of all the integers to that point?
 30 %%% [1] For example, 911,610,034 is written 
 31 %%% "ninehundredelevenmillionsixhundredtenthousandthirtyfour"; 
 32 %%% 500,000,000 is written "fivehundredmillion"; 1,709 is written 
 33 %%% "onethousandsevenhundrednine".
 34 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 35
 36
 37
 38 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 39 % test function
 40 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 41 test() ->
 42     "ninehundredelevenmillionsixhundredtenthousandthirtyfour" = number_to_words(911610034),
 43     "fivehundredmillion" = number_to_words(500000000),
 44     "onethousandsevenhundrednine" = number_to_words(1709),
 45     run().
 46
 47 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 48 % Run creates 1 through 999999999, sorts them and prints the
 49 % number at 51000000000th (51-billion) letter and sum of all
 50 % numbers until that number.
 51 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 52 run() ->
 53
 54     statistics(runtime),
 55     statistics(wall_clock),
 56
 57     open(),
 58     convert_numbers(999999999),
 59     lists:foldl(fun run/2, {0, 51000000000, 0}, "abcdefghijklmnopqrstuvwxyz"),
 60     close(),
 61
 62     {_, RT} = statistics(runtime),
 63     {_, WC} = statistics(wall_clock),
 64     io:format("Completed in ~p (~p) milliseconds.~n", [RT, WC]).
 65
 66 run(Letter, {I, Max, Sum}) ->
 67     Matched = lists:sort(dets:lookup(wordfile, Letter)),
 68     lists:foldl(fun print/2, {I, Max, Sum}, Matched).
 69
 70
 71
 72
 73 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 74 % Rules for creating number suffixes
 75 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 76 number_suffix(N, M) when N >= 1+303, M == 0 -> {"", 303};
 77 number_suffix(N, M) when N >= 1+100, M == 0 -> {"", 100};
 78 number_suffix(N, M) when N >= 1+63, M == 0 -> {"", 63};
 79 number_suffix(N, M) when N >= 1+60, M == 0 -> {"", 60};
 80 number_suffix(N, M) when N >= 1+57, M == 0 -> {"", 57};
 81 number_suffix(N, M) when N >= 1+54, M == 0 -> {"", 54};
 82 number_suffix(N, M) when N >= 1+51, M == 0 -> {"", 51};
 83 number_suffix(N, M) when N >= 1+48, M == 0 -> {"", 48};
 84 number_suffix(N, M) when N >= 1+45, M == 0 -> {"", 45};
 85 number_suffix(N, M) when N >= 1+42, M == 0 -> {"", 42};
 86 number_suffix(N, M) when N >= 1+39, M == 0 -> {"", 39};
 87 number_suffix(N, M) when N >= 1+36, M == 0 -> {"", 36};
 88 number_suffix(N, M) when N >= 1+33, M == 0 -> {"", 33};
 89 number_suffix(N, M) when N >= 1+30, M == 0 -> {"", 30};
 90 number_suffix(N, M) when N >= 1+27, M == 0 -> {"", 27};
 91 number_suffix(N, M) when N >= 1+24, M == 0 -> {"", 24};
 92 number_suffix(N, M) when N >= 1+21, M == 0 -> {"", 21};
 93 number_suffix(N, M) when N >= 1+18, M == 0 -> {"", 18};
 94 number_suffix(N, M) when N >= 1+15, M == 0 -> {"", 15};
 95 number_suffix(N, M) when N >= 1+12, M == 0 -> {"", 12};
 96 number_suffix(N, M) when N >= 1+9, M == 0 -> {"", 9};
 97 number_suffix(N, M) when N >= 1+6, M == 0 -> {"", 6};
 98 number_suffix(N, M) when N >= 1+3, M == 0 -> {"", 3};
 99 %%%
100 number_suffix(N, _) when N >= 1+303 -> {"centillion", 303};
101 number_suffix(N, _) when N >= 1+100 -> {"googol", 100};
102 number_suffix(N, _) when N >= 1+63 -> {"vigintillion", 63};
103 number_suffix(N, _) when N >= 1+60 -> {"novemdecillion", 60};
104 number_suffix(N, _) when N >= 1+57 -> {"octodecillion", 57};
105 number_suffix(N, _) when N >= 1+54 -> {"septendecillion", 54};
106 number_suffix(N, _) when N >= 1+51 -> {"sexdecillion", 51};
107 number_suffix(N, _) when N >= 1+48 -> {"quindecillion", 48};
108 number_suffix(N, _) when N >= 1+45 -> {"quattuordecillion", 45};
109 number_suffix(N, _) when N >= 1+42 -> {"tredecillion", 42};
110 number_suffix(N, _) when N >= 1+39 -> {"duodecillion", 39};
111 number_suffix(N, _) when N >= 1+36 -> {"undecillion", 36};
112 number_suffix(N, _) when N >= 1+33 -> {"decillion", 33};
113 number_suffix(N, _) when N >= 1+30 -> {"nonillion", 30};
114 number_suffix(N, _) when N >= 1+27 -> {"octillion", 27};
115 number_suffix(N, _) when N >= 1+24 -> {"septillion", 24};
116 number_suffix(N, _) when N >= 1+21 -> {"sextillion", 21};
117 number_suffix(N, _) when N >= 1+18 -> {"quintillion", 18};
118 number_suffix(N, _) when N >= 1+15 -> {"quadrillion", 15};
119 number_suffix(N, _) when N >= 1+12 -> {"trillion", 12};
120 number_suffix(N, _) when N >= 1+9 -> {"billion", 9};
121 number_suffix(N, _) when N >= 1+6 -> {"million", 6};
122 number_suffix(N, _) when N >= 1+3 -> {"thousand", 3};
123 number_suffix(_, M) when M >= 100 -> {"hundred", 2};
124 number_suffix(N, _) when N > 2 -> {"", 2};
125 number_suffix(_, M) when M >= 90 -> {"ninety", 1};
126 number_suffix(_, M) when M >= 80 -> {"eighty", 1};
127 number_suffix(_, M) when M >= 70 -> {"seventy", 1};
128 number_suffix(_, M) when M >= 60 -> {"sixty", 1};
129 number_suffix(_, M) when M >= 50 -> {"fifty", 1};
130 number_suffix(_, M) when M >= 40 -> {"fourty", 1};
131 number_suffix(_, M) when M >= 30 -> {"thirty", 1};
132 number_suffix(_, M) when M >= 20 -> {"twenty", 1};
133 number_suffix(_, _) -> {undefined, 0}.
134
135
136 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
137 % Rules for converting numbers to words.
138 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
139 get_word("90") -> "ninety";
140 get_word("80") -> "eighty";
141 get_word("70") -> "seventy";
142 get_word("60") -> "sixty";
143 get_word("50") -> "fifty";
144 get_word("40") -> "fourty";
145 get_word("30") -> "thirty";
146 get_word("20") -> "twenty";
147 get_word("19") -> "nineteen";
148 get_word("18") -> "eighteen";
149 get_word("17") -> "seventeen";
150 get_word("16") -> "sixteen";
151 get_word("15") -> "fifteen";
152 get_word("14") -> "fourteen";
153 get_word("13") -> "thirteen";
154 get_word("12") -> "twelve";
155 get_word("11") -> "eleven";
156 get_word("10") -> "ten";
157 get_word("9") -> "nine";
158 get_word("8") -> "eight";
159 get_word("7") -> "seven";
160 get_word("6") -> "six";
161 get_word("5") -> "five";
162 get_word("4") -> "four";
163 get_word("3") -> "three";
164 get_word("2") -> "two";
165 get_word("1") -> "one";
166 get_word([$0|T]) -> get_word(T);
167 get_word(_N) -> "".
168
169 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
170 % Opening Dets file
171 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
172 open() ->
173     File = "./ita_puzzles.dat",
174     file:delete(File),
175     case dets:open_file(wordfile, [{type, bag}, {file, [File]}]) of
176         {ok, wordfile} ->
177             wordfile;
178         {error,_Reason} ->
179             io:format("cannot open word file ~p~n", [File]),
180             exit(eDetsOpen)
181     end.
182
183 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
184 % Closing Dets file
185 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
186 close() -> dets:close(wordfile).
187
188
189 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
190 % Converting numbers from 1 to some range and store it in dets table.
191 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
192 convert_numbers(1) ->
193     true;
194 convert_numbers(Num) ->
195     Word = number_to_words(Num),
196     dets:insert(wordfile, {hd(Word), Word, Num}),
197     convert_numbers(Num-1).
198
199 print(_X, {Max, Max, Sum}) ->
200     {Max, Max, Sum};
201 print({H, Word, Num}, {I, Max, Sum}) ->
202     %io:format("Number ~p ~p~n", [Num, Word]),
203     Len = length(Word),
204     Sum1 = Num + Sum,
205     I1 = I + Len,
206     if
207         I < Max, I1 >= Max -> Letter = lists:nth(Max-I, Word),
208             io:format("Found ~pth letter ~p, Word ~p Num ~p, sum ~p~n", [Max, Letter, Word, Num, Sum1]);
209         true ->
210             true
211     end,
212     {I1, Max, Sum1}.
213
214
215 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
216 % Converting a number to word
217 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
218 number_to_words(Num) ->
219     lists:flatten(number_to_words(integer_to_list(Num), [])).
220
221 number_to_words([], Out) ->
222     Out;
223
224 number_to_words(L, Out) ->
225     Len = length(L),
226     {Suffix, Cutoff} = number_suffix(Len, list_to_integer(L)),
227     number_to_words(Suffix, Cutoff, Len, L, Out).
228
229 number_to_words(undefined, _Cutoff, _Len, L, Out) ->
230     [get_word(L)|Out];
231
232 number_to_words(Suffix, _Cutoff, Len, [_H|T], Out) when Len =< 2 ->
233     Suffix ++ get_word(T) ++ Out;
234
235 number_to_words(Suffix, Cutoff, Len, [H|_T]=L, Out) ->
236     {L1, L2} = lists:split(Len-Cutoff, L),
237     get_word(H) ++ number_to_words(L1, [Suffix]) ++ number_to_words(L2, Out).
238
239
240

This problem consists of two sub-problems, first one, which is relatively easy is to convert numbers into words, but the hardest problem is sorting and iterating over the results. And it’s going to take a long time to finish all that. I will have to run the program overnight and will post my solution once it is finished. By the way, I would have loved to see foldl method in dets that takes order_by function, unfortunately I had to create some hack to get chunks of data and then sort in memory. Any suggestions to improve this code would be welcome.

September 15, 2007

PROLOG roots in ERLANG

Filed under: Computing — bhatti @ 3:27 pm

I have been trying Erlang lately and since Erlang has a lot of its roots in PROLOG and its first VM (JAM) was written in PROLOG, I thought I would look at some similarities in syntax. So I picked up “PROLOG Though Examples by Kononenko and Lavrac”, which has been collecting dust on my bookshelf since 1989. Here are some similarities I found:

  • atoms or symbols start with lower case.
  • variables start with Uppercase or underscore (anonymous variables).
  • statements end with period.
  • structures are similar to records.
  • lists are declared same, with [1, 2, 3] syntax. Also, [Head|Tail] syntax is same.
  • strings are represented as array of ASCII codes.
  • arithmetic operators are same “> < >= =< =:= =\= div mod”
  • a lot of list functions are similar to BIFs in ERLANG such as member, conc (called append in ERLANG), insert, delete.
  • both can read terms from a file using consult function.

Despite all these similarities both languages are fundamentally different, as PROLOG was written for Expert systems and its interpreter took in facts and rules and deduced inferences and included sophisticated backtracking (e.g. cut) features. While, ERLANG is a functional language with strong support for concurrent and distributed applications.

September 12, 2007

Erlang solutions to Problem 1-25 of “L-99: Ninety-Nine Lisp Problems”

Filed under: Computing — admin @ 1:57 pm

L-99: Ninety-Nine Lisp Problems provides a good exercise for learning new language so here is my attempt at Erlang solutions of problems 1 through 25:

  1 -module(lisp99).
  2 -compile(export_all).
  3
  4 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
  5 % My solutions to L-99: Ninety-Nine Lisp Problems from 
  6 % http://www.ic.unicamp.br/~meidanis/courses/mc336/2006s2/funcional/L-99_Ninety-Nine_Lisp_Problems.html
  7 % Based on a Prolog problem list by werner.hett@hti.bfh.ch
  8 % Note: I am using BIFs for a lot of solutions because learning language library APIs is just as 
  9 % important as learning language constructs.
 10 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 11
 12 %%%%%%%%%%%%%%%%%%%%%%%%%
 13 %%% P01 (*) Find the last box of a list.
 14 %%%%%%%%%%%%%%%%%%%%%%%%%
 15 last(L) ->
 16     lists:nthtail(length(L)-1, L).
 17
 18 %%%%%%%%%%%%%%%%%%%%%%%%%
 19 %%% P02 (*) Find the last but one box of a list.
 20 %%%%%%%%%%%%%%%%%%%%%%%%%
 21 last_but(L) ->
 22     lists:nthtail(length(L)-2, L).
 23
 24 %%%%%%%%%%%%%%%%%%%%%%%%%
 25 %%% P03 (*) Find the K'th element of a list.
 26 %%%%%%%%%%%%%%%%%%%%%%%%%
 27 kth(L, K) ->
 28     lists:nth(K, L).
 29
 30 %%%%%%%%%%%%%%%%%%%%%%%%%
 31 %%% P04 (*) Find the number of elements of a list.
 32 %%%%%%%%%%%%%%%%%%%%%%%%%
 33 len(L) ->
 34     length(L).
 35
 36 %%%%%%%%%%%%%%%%%%%%%%%%%
 37 %%% P05 (*) Reverse a list.
 38 %%%%%%%%%%%%%%%%%%%%%%%%%
 39 rev(L) ->
 40     lists:reverse(L).
 41
 42 %%%%%%%%%%%%%%%%%%%%%%%%%
 43 %%% P06 (*) Find out whether a list is a palindrome.
 44 %%%%%%%%%%%%%%%%%%%%%%%%%
 45 palindrome(L) ->
 46     L == lists:reverse(L).
 47
 48
 49 %%%%%%%%%%%%%%%%%%%%%%%%%
 50 %%% P07 (**) Flatten a nested list structure.
 51 %%%%%%%%%%%%%%%%%%%%%%%%%
 52 flatten(L) ->
 53     lists:flatten(L).
 54
 55 %%%%%%%%%%%%%%%%%%%%%%%%%
 56 %%% P08 (**) Eliminate consecutive duplicates of list elements.
 57 %%%%%%%%%%%%%%%%%%%%%%%%%
 58 compress(L) ->
 59   lists:reverse(lists:foldl(fun compress/2, [], L)).
 60 compress(H, [H|T]) ->
 61   [H|T];
 62 compress(H, L) ->
 63   [H|L].
 64
 65
 66 %%%%%%%%%%%%%%%%%%%%%%%%%
 67 %%% P09 (**) Pack consecutive duplicates of list elements into sublists.
 68 %%%%%%%%%%%%%%%%%%%%%%%%%
 69 pack(L) ->
 70   lists:reverse(lists:foldl(fun pack/2, [], L)).
 71 pack(H, [[H|T]|TT]) ->
 72   [[H,H|T]|TT];
 73 pack(H, L) ->
 74   [[H]|L].
 75
 76 %%%%%%%%%%%%%%%%%%%%%%%%%
 77 %%% P10 (*) Run-length encoding of a list.
 78 %%%%%%%%%%%%%%%%%%%%%%%%%
 79 encode(L) ->
 80   lists:reverse(lists:foldl(fun encode/2, [], L)).
 81 encode(H, []) ->
 82   [[1, H]];
 83 encode(H, [[N,H]|T]) ->
 84   [[N+1,H]|T];
 85 encode(H, [[_N,_X]|_T]=L) ->
 86   [[1,H]|L].
 87
 88 %%%%%%%%%%%%%%%%%%%%%%%%%
 89 %%% P11 (*) Modified run-length encoding.
 90 %%%%%%%%%%%%%%%%%%%%%%%%%
 91 encode_modified(L) ->
 92   lists:reverse(lists:foldl(fun encode_modified/2, [], L)).
 93 encode_modified(H, []) ->
 94   [H];
 95 encode_modified(H, [H|T]) ->
 96   [[2,H]|T];
 97 encode_modified(H, [[N,H]|T]) ->
 98   [[N+1,H]|T];
 99 encode_modified(H, [[_N,_X]|_T]=L) ->
100   [H|L];
101 encode_modified(H, [_X|_T]=L) ->
102   [H|L].
103
104 %%%%%%%%%%%%%%%%%%%%%%%%%
105 %%% P12 (**) Decode a run-length encoded list.
106 %%%%%%%%%%%%%%%%%%%%%%%%%
107 decode(L) ->
108   lists:reverse(lists:foldl(fun decode/2, [], L)).
109 decode([N,H], L) ->
110   lists:duplicate(N, H) ++ L;
111 decode(H, L) ->
112   [H|L].
113
114 %%%%%%%%%%%%%%%%%%%%%%%%%
115 %%% P13 (**) Run-length encoding of a list (direct solution).
116 %%%%%%%%%%%%%%%%%%%%%%%%%
117 direct_encode(L) ->
118     encode_modified(L). %% it already creates RLE by counting
119
120 %%%%%%%%%%%%%%%%%%%%%%%%%
121 %%% P14 (*) Duplicate the elements of a list.
122 %%%%%%%%%%%%%%%%%%%%%%%%%
123 dupli(L) ->
124     lists:reverse(lists:foldl(fun dupli/2, [], L)).
125 dupli(H, L) ->
126     [H,H|L].
127
128 %%%%%%%%%%%%%%%%%%%%%%%%%
129 %%% P15 (**) Replicate the elements of a list a given number of times.
130 %%%%%%%%%%%%%%%%%%%%%%%%%
131 repli(L, N) ->
132     {N, L1} = lists:foldl(fun do_repli/2, {N,[]}, L),
133     lists:reverse(L1).
134 do_repli(H, {N,L}) ->
135     {N, lists:duplicate(N, H) ++ L}.
136
137 %%%%%%%%%%%%%%%%%%%%%%%%%
138 %%% P16 (**) Drop every N'th element from a list.
139 %%%%%%%%%%%%%%%%%%%%%%%%%
140 drop(L, N) ->
141     {_I, N, L1} = lists:foldl(fun do_drop/2, {1,N,[]}, L),
142     lists:reverse(L1).
143 do_drop(H, {I,N,L}) ->
144     Rem = I rem N,
145     if
146         Rem == 0 ->
147             {1,N,L};
148         true ->
149             {I+1,N,[H|L]}
150     end.
151
152 %%%%%%%%%%%%%%%%%%%%%%%%%
153 %%% P17 (*) Split a list into two parts; the length of the first part is given.
154 %%%%%%%%%%%%%%%%%%%%%%%%%
155 split(L,N) ->
156     lists:split(N,L).
157
158 %%%%%%%%%%%%%%%%%%%%%%%%%
159 %%% P18 (**) Extract a slice from a list.
160 %%%%%%%%%%%%%%%%%%%%%%%%%
161 slice(L, Start, End) ->
162     lists:sublist(L, Start, End-Start+1).
163
164
165 %%%%%%%%%%%%%%%%%%%%%%%%%
166 %%% P19 (**) Rotate a list N places to the left.
167 %%%%%%%%%%%%%%%%%%%%%%%%%
168 rotate(L, N) ->
169     N1 = if
170         N > 0 ->
171             N;
172         true ->
173             length(L) + N
174     end,
175     Len = length(L),
176     lists:reverse(do_rotate(N1, Len, L, [])).
177
178 do_rotate(I, Len, In, Out) ->
179     I1 = (I rem Len)+1,       % In Erlang offset of list starts from 1 
180     Out1 = [lists:nth(I1, In)|Out],
181     if
182         length(Out1) == Len ->
183             Out1;
184         true ->
185             do_rotate(I1, Len, In, Out1)
186     end.
187
188 %%%%%%%%%%%%%%%%%%%%%%%%%
189 %%% P20 (*) Remove the K'th element from a list.
190 %%%%%%%%%%%%%%%%%%%%%%%%%
191 remove_at(L, N) ->
192     Elem = lists:nth(N,L),
193     lists:delete(Elem,L).
194
195
196 %%%%%%%%%%%%%%%%%%%%%%%%%
197 %%% P21 (*) Insert an element at a given position into a list.
198 %%%%%%%%%%%%%%%%%%%%%%%%%
199 insert_at(Elem, L, N) ->
200     {L1,L2} = lists:split(N-1,L),         % there got to be better way
201     L1 ++ [Elem] ++ L2.
202
203 %%%%%%%%%%%%%%%%%%%%%%%%%
204 %%% P22 (*) Create a list containing all integers within a given range.
205 %%%%%%%%%%%%%%%%%%%%%%%%%
206 range(Start, End) ->
207     lists:seq(Start, End).
208
209 %%%%%%%%%%%%%%%%%%%%%%%%%
210 %%% P23 (**) Extract a given number of randomly selected elements from a list.
211 %%%%%%%%%%%%%%%%%%%%%%%%%
212 rnd_select(L, N) ->
213     rnd_select(N, L, []).
214 rnd_select(N, In, Out) ->
215     I = random:uniform(length(In)),
216     Out1 = [lists:nth(I, In) | Out],
217     In1 = remove_at(In, I),
218     if
219         length(Out1) == N ->
220             Out1;
221         true ->
222             rnd_select(N, In1, Out1)
223     end.
224
225
226 %%%%%%%%%%%%%%%%%%%%%%%%%
227 %%% P24 (*) Lotto: Draw N different random numbers from the set 1..M.
228 %%%%%%%%%%%%%%%%%%%%%%%%%
229 lotto_select(Count, Max) ->
230     L = range(1, Max),
231     rnd_select(L, Count).
232
233 %%%%%%%%%%%%%%%%%%%%%%%%%
234 %%% P25 (*) Generate a random permutation of the elements of a list.
235 %%%%%%%%%%%%%%%%%%%%%%%%%
236 rnd_permu(L) ->
237     rnd_select(L, length(L)).
238
239
240 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
241 % Tests
242 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
243 test_last() ->
244     last("abcd").
245
246 test_last_but() ->
247     last_but("abcd").
248
249 test_kth() ->
250     [kth("abcd", 3)].
251
252 test_len() ->
253     len("abcd").
254
255 test_rev() ->
256     rev("abcd").
257
258 test_flatten() ->
259     flatten([$a, [$b, [c, d], e]]).
260
261 test_compress() ->
262     compress("aaaabccaadeeee").
263
264 test_pack() ->
265     pack("aaaabccaadeeee").
266
267 test_encode() ->
268     encode("aaaabccaadeeee").
269
270 test_encode_modified() ->
271     encode_modified([1,1,1,1,2,3,3,1,1,4,5,5,5,5]).
272
273 test_decode() ->
274     decode(encode_modified([1,1,1,1,2,3,3,1,1,4,5,5,5,5])).
275
276 test_dupli() ->
277     dupli("abccd").
278
279 test_repli() ->
280     repli("abccd",3).
281
282 test_drop() ->
283     drop("abcdefghik",3).
284
285 test_split() ->
286     split("abcdefghik",3).
287
288 test_slice() ->
289     slice("abcdefghik",3,7).
290
291 test_rotate1() ->
292     rotate("abcdefgh",3).
293
294 test_rotate2() ->
295     rotate("abcdefgh",-2).
296
297 test_remove_at() ->
298     remove_at("abcd", 2).
299
300 test_insert_at() ->
301     insert_at("alfa", "abcd", 2).
302
303 test_range() ->
304     range(4,9).
305
306 test_rnd_select() ->
307     rnd_select("abcdefgh", 3).
308
309 test_lotto_select() ->
310     lotto_select(6,49).
311
312 test_rnd_permu() ->
313     rnd_permu("abcdef").
314
315

September 10, 2007

Rosetta solution in Erlang

Filed under: Computing — bhatti @ 9:17 pm

I found a number of solutions of Rosetta problem at Yet Another Rosetta Code Problem (Perl, Ruby, Python, Haskell), but didn’t see any solution in Erlang, so here it is:

-module(rosetta).
-compile(export_all).

test() ->
  rosetta("ZYEEEEEABBCCCDDDDF").

rosetta(L) ->
  lists:reverse(lists:foldl(fun rosetta/2, [], L)).
rosetta(H, [[H|T]|TT]) ->
  [[H,H|T]|TT];
rosetta(H, L) ->
  [[H]|L].

September 5, 2007

Performance Testing with C++, Java, Ruby and Erlang

Filed under: Computing — admin @ 4:22 pm

I came across an interesting blog on classical distributed Byzantine Generals Problem from Mark Nelsons’ blog. He showed C++ implementation of Leslie Lamport algorithm. It seemed like a natural fit Erlang, but instead of writing directly in Erlang, I first wrote the algorithm into Java and Ruby (with slight redesign). Unfortunately, the original C++ source, and my Java and Ruby versions are not really truly distributed or concurrent. I just wanted to translate the original algorithm without changing a whole a lot, but Erlang gave you both distributing features and concurrency for free. You can learn about the Byzantine General problem from Mark Nelson’s post or from this brief summary, or the Wikipedia. The algorithm is heavily based on message communication where a general sends a message value (0 or 1), to his lieutenants and each lieutenant sends the value to all other lieutenants including himself. After some specific number of rounds a consensus is reached. The original C++ code defines a Node structure to store input/output values of each process, a Traits class to store configuration and a method to come up with new values. Since, some processes can be faulty (or traiters), the Traits class determines the value that a process will return. The most of the fun happens in the Process class that performs all message communication. You can download the original C++ source code.

I slightly changed the structure and got rid of static attributes and methods. In my design, the application consists of following modules:

  • Configuration – for storing configuration for simulation such as number of processes, rounds, source (general) process.
  • Strategy – that determines the value that should be returned by the process.
  • Repository – that stores hierarchical paths for messages that were communicated by the processes.
  • Process – GeneralProcesses classification that send or receive messages.
  • Engine – drives the algorithm.
  • Test – that actually invokes engine with different parameters and stores timings.

Here is the Java implementation (I am showing multiple java classes here, but you can download the complete source code from here. I kept most of the algorithm as it was, which turned out to be really memory hog when you increase number of processes. In fact, I could not run the Java application for more than 10 processes with 512MB memory. Here is the source code for the Java version:

public enum Value {
    ZERO,
    ONE,
    FAULTY,
    UNKNOWN
}
public interface ValueStrategy {

    public Node getSourceValue();

    public Value getValue(
            Value value,
            int source,
            int destination,
            String path);


    public Value getDefaultValue();


    public boolean isFaulty(int process);
}
public interface Broadcaster {
    public void broadcast(int round, int id, Node source, String path);
}
public interface PathRepository {

    public void generateChildren();

    public List<String> getRankList(int rank, int source);

    public List<String> getPathChildren(String path);
}
public class Configuration {
    private int source;
    private int roundsOfMessages;
    private int numberOfProcesses;

    //
    Configuration(int source, int roundsOfMessages, int numberOfProcesses) {
        this.source = source;
        this.roundsOfMessages = roundsOfMessages;
        this.numberOfProcesses = numberOfProcesses;
    }

    //
    public int getSource() {
        return source;
    }


    public int getRoundsOfMessages() {
        return roundsOfMessages;
    }


    public int getNumberOfProcesses() {
        return numberOfProcesses;
    }
}
public class DefaultEngine implements Engine, Broadcaster {
    static final boolean debug = System.getProperty("debug", "false").equals("true");
    private Configuration configuration;
    private PathRepository repository;
    private List<Process> processes;
    private ValueStrategy strategy;

    //
    public DefaultEngine(int source, int roundsOfMessages, int numberOfProcesses) {
        this.configuration = new Configuration(source, roundsOfMessages, numberOfProcesses);
        this.strategy = new DefaultValueStrategy(configuration);
        this.repository = new DefaultPathRepository(configuration);
        this.processes = new ArrayList<Process>();
    }

    public void broadcast(int round, int id, Node source, String path) {
        for (int j = 0; j < configuration.getNumberOfProcesses(); j++) {
            if (j != configuration.getSource()) {
                Value value = strategy.getValue(
                        source.input,
                        id,
                        j,
                        path);
                if (debug) {
                    String sourcePath = path.substring(0, path.length() - 1);
                    System.out.println("Sending for round " + round + " from process " + id + " to " + j + ": {" + value +
                            ", " + path + ", " + Value.UNKNOWN + "}, getting value from source " + sourcePath);
                }
                getProcesses().get(j).receiveMessage(path, new Node(value, Value.UNKNOWN));
            }
        }
    }


    public List<Process> getProcesses() {
        return processes;
    }

    public void run() {
        //
        // Starting at round 0 and working up to round M, call the
        // SendMessages() method of each process. It will send the appropriate
        // message to all other sibling processes.
        //
        for (int i = 0; i <= configuration.getRoundsOfMessages(); i++) {
            for (int j = 0; j < configuration.getNumberOfProcesses(); j++) {
                processes.get(j).sendMessages(i);
            }
        }

        //
        // All that is left is to print out the results. For non-faulty processes,
        // we call the Decide() method to see what what the process decision was
        //
        for (int j = 0; j < configuration.getNumberOfProcesses(); j++) {
            if (processes.get(j).isSource()) System.out.print("Source ");
            System.out.print("Process " + j);
            if (!processes.get(j).isFaulty()) {
                System.out.print(" decides on value " +
                        processes.get(j).decide());
            } else {
                System.out.print(" is faulty");
            }
            System.out.println();
        }
    }


    public void start() {
        //
        for (int i = 0; i < configuration.getNumberOfProcesses(); i++) {
            Process process = i == configuration.getSource() ? new GeneralProcess(i, configuration, repository, this, strategy) : new Process(i, configuration, repository, this, strategy);
            processes.add(process);
        }
        repository.generateChildren();
    }
}
public class DefaultPathRepository implements PathRepository {
    private Map<String, List<String>> children;
    private Map<Integer, Map<Integer, List<String>>> pathByRank;
    private Configuration configuration;
    static final boolean debug = System.getProperty("debug", "false").equals("true");

    //
    public DefaultPathRepository(Configuration configuration) {
        this.configuration = configuration;
        this.children = new HashMap<String, List<String>>();
        this.pathByRank = new HashMap<Integer, Map<Integer, List<String>>>();
    }

    public Map<String, List<String>> getChildren() {
        return children;
    }

    public List<String> getPathChildren(String path) {
        List<String> pathList = children.get(path);
        if (pathList == null) {
            pathList = new ArrayList<String>();
            children.put(path, pathList);
        }
        return pathList;
    }


    public List<String> getRankList(int rank, int source) {
        Map<Integer, List<String>> pathMap = pathByRank.get(rank);
        if (pathMap == null) {
            pathMap = new HashMap<Integer, List<String>>();
            pathByRank.put(rank, pathMap);
        }
        List<String> pathList = pathMap.get(source);
        if (pathList == null) {
            pathList = new ArrayList<String>();
            pathMap.put(source, pathList);
        }
        return pathList;
    }

    public void generateChildren() {
        generateChildren(configuration.getSource(), new boolean[configuration.getNumberOfProcesses()], "", 0);
    }


    private void generateChildren(int source, boolean[] ids, String path, int rank) {
        ids[source] = true;
        path += toCharString(source);
        getRankList(rank, source).add(path);

        //
        if (rank < configuration.getRoundsOfMessages()) {
            for (int i = 0; i < ids.length; i++) {
                if (!ids[i]) {
                    boolean[] newIds = new boolean[ids.length];
                    System.arraycopy(ids, 0, newIds, 0, ids.length);
                    generateChildren(i, newIds, path, rank + 1);
                    getPathChildren(path).add(path + toCharString(i));
                }
            }
        }

        //
        if (debug) {
            System.out.print("generateChildren(" + source + "," + rank + "," + path + "), children = ");
            List<String> list = getPathChildren(path);
            for (String s : list) {
                System.out.print(s + " ");
            }
            System.out.println();
        }
    }


    //
    private char toChar(int n) {
        return (char) (n + '0');
    }

    private String toCharString(int n) {
        return String.valueOf(toChar(n));
    }
}
public class DefaultValueStrategy implements ValueStrategy {
    private Configuration configuration;
    private Node sourceValue;

    //
    DefaultValueStrategy(Configuration configuration) {
        this.configuration = configuration;
        this.sourceValue = new Node(Value.ZERO, Value.UNKNOWN);
    }

    //
    // This method returns the true value of the source's value. The source may send
    // faulty values to other processes, but the Node returned by this value will be
    // in its root node.
    //
    // In this case, the General's node has in input value of 0, which makes that
    // the desired value. Of course, since the General is faulty, this doesn't really
    // matter.
    //
    public Node getSourceValue() {
        return sourceValue;
    }

    //
    // During message, GetValue() is called to get the value returned by a given process
    // during a messaging round. 'value' is the input value that it should be sending to
    // the destination process (if it isn't faulty), source is the source process ID,
    // destination is the destination process ID, and Path is the path being used for this
    // particular message.
    //
    // In this particular implementation, we have two faulty processes - the source
    // process, which returns a sort-of random value, and process ID 2, which returns
    // a ONE always, in contradiction of the General's desired value of 0.
    //
    public Value getValue(Value value, int source, int destination, String path) {
        if (configuration.getSource() == source) return (destination & 1) != 0 ? Value.ZERO : Value.ONE;
        else if (source == 2) return Value.ONE;
        return value;
    }


    //
    // When breaking a tie, GetDefault() is used to get the default value.
    //
    // This is an arbitrary decision, but it has to be consistent across all processes.
    // More importantly, the processes have to arrive at a correct decision regardless
    // of whether the default value is always 0 or always 1. In this case we've set it to
    // a value of 1.
    //
    public Value getDefaultValue() {
        return Value.ONE;
    }

    //
    // This method is used to identify fault processes by ID
    //
    public boolean isFaulty(int process) {
        return process == configuration.getSource() || process == 2;
    }
}
public interface Engine extends Runnable {
    public void start();
}
public class GeneralProcess extends Process {
    public GeneralProcess(int id, Configuration configuration, PathRepository repository, Broadcaster broadcaster, ValueStrategy strategy) {
        super(id, configuration, repository, broadcaster, strategy);
        nodes.put("", strategy.getSourceValue());
    }


    @Override
    public Value decide() {
        //
        // The source process doesn't have to do all the work - since it's the decider,
        // it simply looks at its input value to pick the appropriate decision value.
        //
        return nodes.get("").input;
    }
}

public class Node {
    Value input;
    Value output;

    Node() {
        this(Value.FAULTY, Value.FAULTY);
    }

    Node(Value input, Value output) {
        this.input = input;
        this.output = output;
    }
}
public class Process {
    protected int id;
    protected Configuration configuration;
    protected ValueStrategy strategy;
    protected Map<String, Node> nodes;
    protected PathRepository repository;
    protected Broadcaster broadcaster;
    static final boolean debug = System.getProperty("debug", "false").equals("true");

    public Process(int id, Configuration configuration, PathRepository repository, Broadcaster broadcaster, ValueStrategy strategy) {
        this.id = id;
        this.strategy = strategy;
        this.repository = repository;
        this.broadcaster = broadcaster;
        this.configuration = configuration;
        this.nodes = new HashMap<String, Node>();
    }

    static int totalMessages;

    //
    //
    // Receiving a message is pretty simple here, it means that some other process
    // calls this method on the current process with path and a node. All we do
    // is store the value, we'll use it in the next round of messaging.
    //
    public void receiveMessage(String path, Node node) {
        nodes.put(path, node);
        totalMessages++;
    }


    //
    // After constructing all messages, you need to call SendMessages on each process,
    // once per round. This routine will send the appropriate messages for each round
    // to all th eother processes listed in the vector passed in as an argument.
    //
    // Deciding on what messages to send is pretty simple. If we look at the static
    // map pathsByRank, indexed by round and the processId of this process, it gives
    // the entire set of taraget paths that this process needs to send messages to.
    // So there is an iteration loop through that map, and this process sends a message
    // to the correct target process for each path in the map.
    //
    public void sendMessages(int round) {
        List<String> pathList = repository.getRankList(round, id);
        for (String path : pathList) {
            String sourcePath = path.substring(0, path.length() - 1);
            Node source = nodes.get(sourcePath);
            if (source == null) throw new IllegalStateException("Failed to find source node for " + sourcePath);
            broadcaster.broadcast(round, id, source, path);
        }
    }

    //
    // After all messages have been sent, it's time to Decide.
    //
    // This part of the algorithm follows the description in the article closely.
    // It has to work its way from the leaf values up to the root of the tree.
    // The first round consists of going to the leaf nodes, and copying the input
    // value to the output value.
    //
    // All subsequent rounds consist of getting the majority of the output values from
    // each nodes children, and copying that to the nodes output value.
    //
    // When we finally reach the root node, there is only one node with an output value,
    // and that represents this processes decision.
    //
    public Value decide() {
        //
        // Step 1 - set the leaf values
        //
        for (int i = 0; i < configuration.getNumberOfProcesses(); i++) {
            List<String> pathList = repository.getRankList(configuration.getRoundsOfMessages(), i);
            for (String path : pathList) {
                Node node = nodes.get(path);
                node.output = node.input;
            }
        }
        //
        // Step 2 - work up the tree
        //
        for (int round = configuration.getRoundsOfMessages() - 1; round >= 0; round--) {
            for (int i = 0; i < configuration.getNumberOfProcesses(); i++) {
                List<String> pathList = repository.getRankList(round, i);
                for (String path : pathList) {
                    Node node = nodes.get(path);
                    node.output = getMajority(path);
                }
            }
        }
        List<String> pathList = repository.getRankList(0, configuration.getSource());
        String topPath = pathList.get(0);
        return nodes.get(topPath).output;
    }

    //
    // This routine calculates the majority value for the children of a given
    // path. The logic is pretty simple, we increment the count for all possible
    // values over the children. If there is a clearcut majority, we return that,
    // otherwise we return the default value defined by the strategy class.
    //
    public Value getMajority(String path) {
        Map<Value, Integer> counts = new HashMap<Value, Integer>();
        counts.put(Value.ONE, new Integer(0));
        counts.put(Value.ZERO, new Integer(0));
        counts.put(Value.UNKNOWN, new Integer(0));
        Collection<String> list = repository.getPathChildren(path);
        int n = 0;
        if (list == null) {
            if (debug) System.out.println("No child found for '" + path + "'");
        } else {
            n = list.size();
            for (String child : list) {
                Node node = nodes.get(child);
                if (node != null) {
                    counts.put(node.output, counts.get(node.output) + 1);
                } else if (debug) {
                    //System.out.println("Could not find node for count with path " + child);
                }
            }
        }
        //
        //
        if (counts.get(Value.ONE) > (n / 2)) return Value.ONE;
        else if (counts.get(Value.ZERO) > (n / 2)) return Value.ZERO;
        else if (counts.get(Value.ONE).intValue() == counts.get(Value.ZERO).intValue() &&
                counts.get(Value.ONE) == (n / 2)) return strategy.getDefaultValue();
        return Value.UNKNOWN;
    }


    //
    // A utility routine that tells whether a given process is faulty
    //
    public boolean isFaulty() {
        return strategy.isFaulty(id);
    }

    //
    // Another somewhat handy utility routine
    //
    public boolean isSource() {
        return configuration.getSource() == id;
    }
}

 Here is the ruby version, which is pretty much same as the Java version (except a bit smaller). The operator overloading and power of collections give Ruby the terseness over Java version. Again, the application is consisted of classes for process, strategy, repository, engine, node, and config. The benchmarks are kicked off from ByzGeneralTest, which calls Engine, which in turn creates Repository, Strategy, Generals and LProcesses (lieutenants).

class Configuration
	attr_reader :source, :num_rounds, :num_procs
	def initialize(source, num_rounds, num_procs)
		@source = source
		@num_rounds = num_rounds
		@num_procs = num_procs
	end
	def to_s
		"#{@source}|#{@num_rounds}|#{@num_procs}"
	end
end
class Node
	attr_accessor :input
	attr_accessor :output
	def initialize(input = Value::FAULTY, output = Value::FAULTY)
		@input = input
		@output = output
	end
	def to_s
		"#{@input}/#{@output}"
	end
end
class Value
	ZERO = '0'
	ONE = '1'
	FAULTY = 'X'
	UNKNOWN = '?'
end
class PathRepository
	attr_accessor :children
	attr_accessor :path_by_rank
	attr_accessor :config
		def initialize(config)
		@config = config
		@children = Hash.new {|h,k| h[k] = []}
		@path_by_rank = Hash.new {|h,k| h[k] = Hash.new {|hh,kk| hh[kk] = []}}
	end
		def path_children(path)
		@children[path]
	end
		def rank_list(rank, source)
		@path_by_rank[rank][source]
	end
		def generate_children(source = @config.source, ids = new_ids, path = "", rank = 0)
		ids[source] = true
		path = "#{path}#{source}"
		@path_by_rank[rank][source].push path
				if rank < @config.num_rounds
			for i in 0...@config.num_procs
			unless ids[i]
				generate_children(i, new_ids(ids), path, rank + 1)
				@children[path].push("#{path}#{i}")
			end
		end
	end
		if (DEBUG)
		print("generate_children(#{source}, #{rank}, #{path}, children = ")
		@children[path].each do |child|
			print(child + " ")
		end
		puts("")
	end
end
private
def new_ids(old_ids = nil)
	#old_ids.nil? ? Array.new(@config.num_procs) {|i| false} : Array.new(old_ids)
	old_ids.nil? ? Hash.new {|h,k| h[k] = false} : old_ids.dup
end
end
class ValueStrategy
	attr_accessor :config
	attr_accessor :source_value
		def initialize(config)
		@config = config
		@source_value = Node.new(Value::ZERO, Value::UNKNOWN)
	end
		def create_value(value, source, destination, path)
		if @config.source == source
			(destination & 1) != 0 ? Value::ZERO : Value::ONE
		elsif source == 2
			Value::ONE
		else
			value
		end
	end
			def get_default
		return Value::ONE
	end
		def faulty?(process)
		process == @config.source || process == 2
	end
end
class LProcess
	@@total_messages = 0
	attr_reader :id
	attr_reader :config
	attr_reader :strategy
	attr_reader :repository
	attr_reader :broadcaster
	#
	def initialize(id, config, repository, broadcaster, strategy)
		@id = id
		@config = config
		@repository = repository
		@strategy = strategy
		@broadcaster = broadcaster
		@nodes = Hash.new
	end
			def self.total_messages
		@@total_messages
	end
	def self.reset_total_messages
		@@total_messages = 0
	end
		def receive_message(path, node)
		@nodes[path] = node
		@@total_messages += 1
	end
		def send_messages(round)
		@repository.rank_list(round, @id).each do |path|
			source_path = path.slice(0, path.length-1)
			source = @nodes[source_path]
			raise "Source path #{source_path} not found" if source.nil?
			broadcaster.broadcast(round, @id, source, path)
		end
	end
		def decide
		#### Step 1 - set the leaf values
		for i in 0...@config.num_procs
		@repository.rank_list(@config.num_rounds, i).each do |path|
			node = @nodes[path]
			node.output = node.input
		end
	end
		### Step 2 - work up the tree
	(@config.num_rounds - 1).step 0, -1 do |round|
		for i in 0...@config.num_procs
		@repository.rank_list(round, i).each do |path|
			@nodes[path].output = find_majority(path)
		end
	end
end
path_list = @repository.rank_list(0, @config.source)
top_path = path_list[0]
@nodes[top_path].output
end
def find_majority(path)
	counts = Hash.new(0)
	list = @repository.path_children(path)
	n = 0
	if list
		n = list.size
		list.each do |child|
			node = @nodes[child]
			counts[node.output] = counts[node.output] + 1 if node
		end
	end
	#
	if (counts[Value::ONE] > ( n / 2 ) )
		Value::ONE
	elsif (counts[Value::ZERO] > ( n / 2 ) )
		Value::ZERO
	elsif (counts[Value::ONE] == counts[Value::ZERO] &&
	counts[Value::ONE] == (n / 2))
	@strategy.get_default
else
	Value::UNKNOWN
end
end
def faulty?
	@strategy.faulty?(id)
end
def source?
	@config.source == id
end
end
class GeneralProcess < LProcess
	def initialize(id, config, repository, broadcaster, strategy)
		super(id, config, repository, broadcaster, strategy)
		@nodes[""] = @strategy.source_value
	end
			def decide
		@nodes[""].input
	end
end
class Engine
	attr_reader :config
	attr_reader :repository
	attr_reader :procs
	attr_reader :strategy
		def initialize(source, num_rounds, num_procs)
		@config = Configuration.new(source, num_rounds, num_procs)
		@strategy = ValueStrategy.new(config)
		@repository = PathRepository.new(config)
		@procs = []
	end
		def broadcast(round, id, source, path)
		for j in 0...@config.num_procs
		unless j == @config.source
			value = @strategy.create_value(source.input, id, j, path)
			if (DEBUG)
				source_path = path.slice(0, path.length-1)
				puts("Sending for round #{round} from process #{id} to #{j} : #{value}, #{path}, #{Value::UNKNOWN}, getting value from source #{source_path}")
			end
			@procs[j].receive_message(path, Node.new(value, Value::UNKNOWN))
		end
	end
end
def run
	for i in 0...@config.num_rounds
	for j in 0...@config.num_procs
	@procs[j].send_messages(i)
end
end
for j in 0...@config.num_procs
print("Source ") if @procs[j].source?
print("Process #{j}")
unless @procs[j].faulty?
	print("decides on value #{@procs[j].decide}")
else
	print(" is faulty")
end
puts("")
end
end
def start
	for i in 0...@config.num_procs
	process = i == @config.source ?
	GeneralProcess.new(i, @config, @repository, self, @strategy) :
	LProcess.new(i, @config, @repository, self, @strategy)
	@procs.push(process)
end
@repository.generate_children
end
end
class ByzGeneralTest < Test::Unit::TestCase
	def setup
	end
			def run_engine(m, n, source)
		r = Benchmark.realtime() do
			puts("Starting|#{m}|#{n}|#{source}")
			engine = Engine.new(source, m, n)
			engine.start
			engine.run
		end
		puts("Finished|#{m}|#{n}|#{source}|#{LProcess.total_messages}|#{r*1000}")
		Process.reset_total_messages
	end
			def xtest_once
		n = 5
		m = 6
		source = 3
		run_engine(m, n, source)
	end
		def test_multiple
		5.step 50, 5 do |n|
			10.step 100, 10 do |m|
				source = n / 3
				run_engine(m, n, source)
			end
		end
	end
end
Test::Unit::UI::Console::TestRunner.run(ByzGeneralTest)

Finally here is the Erlang source code (Again I am showing multiple modules here). I tried to break the structure same as my Java and Ruby version and the application consisted of process, strategy, repository, engine, node, and config modules. The benchmarks were kicked off from the byz_general_test, which invoked engine and engine created processes for repository, generals and lieutenants. Another distinction is difference between active objects and regular functions. In this design, repository and processes are active objects that can receive messages via Erlang’s message communication primitives. Also, I added message counter just to keep track of number of messages for the simulation (though it is not requirement for the algorithm). I used ets APIs in repository to store paths of messages.

-module(config).
-compile(export_all).


-record(config, {source, num_procs, num_rounds}).

new(S, N, M) ->
  #config{source = S, num_procs = N, num_rounds = M}.

get_source(Self) ->
  Self#config.source.
get_num_procs(Self) ->
  Self#config.num_procs.
get_num_rounds(Self) ->
  Self#config.num_rounds
-module(counter).
-export([start/0, get_value/0, increment/0, reset/0, die/0, test_counter/0]).
-include("byz_general_test.hrl").

start() ->
  register(message_counter, spawn(fun() -> loop(0) end)).


get_value() ->
  lib_util:rpc(message_counter, value).


increment() ->
  message_counter ! {self(), increment}.

reset() ->
  message_counter ! {self(), reset}.


die() ->
  message_counter ! {self(), die, "Exiting"}.



loop(N) ->
  receive
    {_From, increment} ->
      loop(N + 1);
    {_From, reset} ->
      loop(0);
    {From, value} ->
      From ! {message_counter, N},
      loop(N);
    {_From, die, Reason} ->
      unregister(message_counter),
      exit(Reason);
    Any ->
      io:format("Unexpected message ~p~n", [Any])
  end.



test_counter() ->
  start(),
  increment(),
  increment(),
  2 = get_value(),
  reset(),
  0 = get_value(),
  die()
-module(engine).
-export([start/3, init/0, run/0, reset/0, test/0]).

init() ->
  counter:start(),
  repository:start().

reset() ->
  counter:reset(),
  repository:reset().

start(Source, N, M) ->
  Config = config:new(Source, N, M),
  put(config, Config),
  ProcIds = lists:seq(0, config:get_num_procs(Config) - 1),
  Pids = lists:map(fun(I) -> process:start(I, Config) end, ProcIds),
  put(pids, Pids),
  lists:foreach(fun(Pid) -> process:init(Pid, Pids) end, Pids),
  repository:generate_children(Config),
  Config.

run() ->
  Config = get(config),
  Ids = lists:seq(0, config:get_num_procs(Config) - 1),
  lists:foreach(fun(Id) -> send_message(Id) end, Ids),
  lists:foreach(fun(Id) -> print_result(Id) end, Ids).

get_pid(Id) ->
  Pids = get(pids),
  lists:nth(Id + 1, Pids).

send_message(Id) ->
  Config = get(config),
  RoundIds = lists:seq(0, config:get_num_rounds(Config) - 1),
  lists:foreach(fun(Round) -> send_message(Id, Round) end, RoundIds).

send_message(Id, Round) ->
  Pid = get_pid(Id),
  process:send_messages(Pid, Round, Id).

print_result(Id) ->
  Config = get(config),
  Source = config:get_source(Config),
  case Source of
    Id ->
      io:format("Source ");
    _ ->
      true
  end,
  io:format("Process ~p ", [Id]),

  Pid = get_pid(Id),
  Faulty = process:is_faulty(Pid),
  if
    Faulty ->
      io:format(" is faulty~n");
    true ->
      Decision = process:decide(Pid),
      io:format(" decides on value ~p~n", [Decision])
  end.

test() ->
  init(),
  Config = start(3, 5, 6),
  run(),
  Config.
-module(lib_util).
-compile(export_all).

rpc(Pid, Request) ->
  Pid ! {self(), Request},
  receive
    {Pid, Response} ->
      Response
  end.
-module(node).
-include("byz_general_test.hrl").
-compile(export_all).

-record(node, {input, output}).

new() ->
  #node{input = ?FAULTY, output = ?FAULTY}.
new(I, O) ->
  #node{input = I, output = O}.


set_input(Self, I) ->
  Self#node{input = I}.
get_input(Self) ->
  Self#node.input.

set_output(Self, O) ->
  Self#node{output = O}.
get_output(Self) ->
  Self#node.output.
set_output_as_input(Self) ->
  O = get_output(Self),
  Self#node{output = O}.
-module(process).
-export([init/2, start/2, receive_message/3, send_messages/3, decide/1, find_majority/2, is_faulty/1, is_source/1, test_process/0]).
-include("byz_general_test.hrl").


%
%%% starts process loop
%
start(Id, Config) ->
  spawn(fun() -> loop(Id, Config) end).

receive_message(Pid, Path, Node) ->
  %lib_util:rpc(Pid, {receive_message, Path, Node}).
  Pid ! {self(), {receive_message, Path, Node}}.

send_messages(Pid, Round, Id) ->
  %lib_util:rpc(Pid, {send_messages, Round, Id}).
  Pid ! {self(), {send_messages, Round, Id}}.

init(Pid, Pids) ->
  Pid ! {self(), init, Pids}.

decide(Pid) ->
  lib_util:rpc(Pid, decide).

find_majority(Pid, Path) ->
  lib_util:rpc(Pid, {find_majority, Path}).

is_faulty(Pid) ->
  lib_util:rpc(Pid, is_faulty).

is_source(Pid) ->
  lib_util:rpc(Pid, is_source).

put(config, Config),
put(allPids, AllPids),
SourcePid = lists:nth(config:get_source(Config)+ 1, AllPids),
put(lieutenants, AllPids -- [SourcePid]),
Nodes = dict:new(),
put(nodes, Nodes),
Source = config:get_source(Config),
SourceValue = strategy:source_value(),
if
Source =:= Id ->
Nodes1 = dict:store("", SourceValue, Nodes),
put(nodes, Nodes1);
true ->
true
end .


%
%%% stores message receievd in nodes dictionary
%%% we are also keeping track of total number of messages received in simulation.
%
receive_message(Path, Node) ->
  Nodes = get(nodes),
  Nodes1 = dict:store(Path, Node, Nodes),
  put(nodes, Nodes1),
  counter:increment().

get_node(Path) ->
  Nodes = get(nodes),
  Response = dict:find(Path, Nodes),
  case Response of
    {ok, Node} ->
      Node;
    _ ->
      %io:format("Failed to find node for path ~p~n", [Path]),
      node:new()
  end.

%
%%%
%
psend_messages(Round, Id) ->
  L = repository:get_rank_list(Round, Id),
  lists:foreach(
    fun(Path) ->
      psend_messages(Round, Id, Path) end, L).

psend_messages(Round, Id, Path) ->
  Length = string:len(Path),
  SourcePath = if
                 Length > 1 ->
                   string:substr(Path, 1, Length - 1);
                 true ->
                   ""
               end,
  Source = get_node(SourcePath),
  broadcast(Round, Id, Source, Path).


%
%%% broadcast message to all processes
%
broadcast(Round, Id, SourceNode, Path) ->
  Config = get(config),
  ProcIds = lists:seq(0, config:get_num_procs(Config) - 1),
  IdsToProcess = ProcIds -- [config:get_source(Config)],
  lists:foreach(
    fun(Dest) ->
      broadcast(Round, Id, SourceNode, Path, Dest) end, IdsToProcess).

broadcast(_Round, Id, SourceNode, Path, Dest) ->
  Config = get(config),
  Value = strategy:create_value(Config, node:get_input(SourceNode), Id, Dest, Path),
  ProcPids = get(allPids),
  ProcPid = lists:nth(Dest + 1, ProcPids),
  receive_message(ProcPid, Path, node:new(Value, ?UNKNOWN)).


decide() ->
  %%% Step 1 - set the leaf values
  Config = get(config),
  ProcIds = lists:seq(0, config:get_num_procs(Config) - 1),
  lists:foreach(fun(X) -> reset_node(X) end, ProcIds),
  %%% Step 2 - work up the tree
  RoundIds = lists:reverse(lists:seq(0, config:get_num_rounds(Config) - 1)),
  lists:foreach(
    fun(Round) ->
      set_node_value(Round) end, RoundIds),
  Source = config:get_source(Config),
  Result = repository:get_rank_list(0, Source),
  case Result of
    [] ->
      ?UNKNOWN;
    [TopPath] ->
      node:get_output(get_node(TopPath));
    [TopPath | _] ->
      node:get_output(get_node(TopPath))
  end.


set_node_value(Round) ->
  Config = get(config),
  ProcIds = lists:seq(0, config:get_num_procs(Config) - 1),
  lists:foreach(
    fun(Id) ->
      set_node_value(Round, Id) end, ProcIds).

set_node_value(Round, Id) ->
  L = repository:get_rank_list(Round, Id),
  lists:foreach(
    fun(Path) ->
      set_node_value(Round, Id, Path) end, L).

set_node_value(_Round, _Id, Path) ->
  Nodes = get(nodes),
  Node = get_node(Path),
  Value = find_majority(Path),
  Nodes1 = dict:store(Path, node:set_output(Node, Value), Nodes),
  put(nodes, Nodes1).

reset_node(I) ->
  Config = get(config),
  Nodes = get(nodes),
  L = repository:get_rank_list(config:get_num_rounds(Config), I),
  lists:foreach(fun(Path) ->
    Node = get_node(Path),
    Nodes1 = dict:store(Path, node:set_output_as_input(Node), Nodes),
    put(nodes, Nodes1)
                end, L).


increment_count(Child) ->
  Counts = get(counts),
  Node = get_node(Child),
  Count = dict:fetch(node:get_output(Node), Counts) + 1,
  Counts1 = dict:store(node:get_output(Node), Count, Counts),
  put(counts, Counts1).

find_majority(Path) ->
  Counts = dict:new(),
  Counts1 = dict:store(?ONE, 0, Counts),
  Counts2 = dict:store(?ZERO, 0, Counts1),
  Counts3 = dict:store(?UNKNOWN, 0, Counts2),
  put(counts, Counts3),
  L = repository:get_children_path(Path),
  N = length(L),
  lists:foreach(fun(Child) ->
    increment_count(Child) end, L),
  Counts4 = get(counts),
  OneCount = dict:fetch(?ONE, Counts4),
  ZeroCount = dict:fetch(?ZERO, Counts4),

  if
    OneCount > (N / 2) ->
      ?ONE;
    ZeroCount > (N / 2) ->
      ?ZERO;
    OneCount == ZeroCount andalso OneCount == (N / 2) ->
      strategy:get_default();
    true ->
      ?UNKNOWN
  end.


loop(Id, Config) ->
  receive
    {_From, init, AllPids} ->
      init(Id, Config, AllPids),
      loop(Id, Config);
    {_From, {receive_message, Path, Node}} ->
      receive_message(Path, Node),
      %From ! {self(), done},
      loop(Id, Config);
    {_From, {send_messages, Round, Id}} ->
      psend_messages(Round, Id),
      %From ! {self(), done},
      loop(Id, Config);
    {From, decide} ->
      From ! {self(), decide()},
      loop(Id, Config);
    {From, is_faulty} ->
      From ! {self(), strategy:is_faulty(Config, Id)},
      loop(Id, Config);
    {From, is_source} ->
      From ! {self(), config:get_source(Config) == Id},
      loop(Id, Config);
    {From, {find_majority, Path}} ->
      From ! {self(), find_majority(Path)},
      loop(Id, Config);
    Any ->
      io:format("Unexpected ~p~n", [Any]),
      loop(Id, Config)
  end.


test_process() ->
  counter:start(),
  repository:start(),
  Config = config:new(3, 5, 6),
  Pid = start(0, Config),
  init(Pid, [Pid, 1, 2, 3, 4, 5, 6]),
  Path = "mypath",
  Node = node:new(),
  receive_message(Pid, Path, Node),
  send_messages(Pid, 0, 0),
  ?UNKNOWN = decide(Pid),
  ?ONE = find_majority(Pid, Path),
  false = is_faulty(Pid),
  false = is_source(Pid).
-module(repository).
-export([start/0, reset/0, get_rank_list/2, get_children_path/1, generate_children/1, die/0, test_generate/0, test_path_ranks/0, test_children/0]).
-include("byz_general_test.hrl").


start() ->
  register(repository, spawn(fun() -> init_loop() end)).


init_loop() ->
  Children = ets:new(children, [set]),
  PathsByRank = ets:new(path_ranks, [set]),
  loop(Children, PathsByRank).

get_children_path(Path) ->
  lib_util:rpc(repository, {get_children_path, Path}).

get_rank_list(Rank, Source) ->
  lib_util:rpc(repository, {get_rank_list, Rank, Source}).

generate_children(Config) ->
  repository ! {self(), generate_children, Config}.

reset() ->
  repository ! {self(), reset}.

die() ->
  repository ! {self(), die, "Exiting"}.


table_lookup(Tab, Key) ->
  Result = ets:lookup(Tab, Key),
  case Result of
    [] ->
      [];
    error ->
      [];
    {ok, Value} ->
      lists:reverse(Value);
    [{Key, Value}] ->
      lists:reverse(Value)
  end.

table_insert(Tab, Key, Value) ->
  NewValue = case table_lookup(Tab, Key) of
               [] ->
                 [Value];
               L ->
                 [Value | L]
             end,
  ets:insert(Tab, {Key, NewValue}).



set_children_path(Children, PathKey, PathValue) ->
  table_insert(Children, PathKey, PathValue).

get_children_path(Children, Path) ->
  table_lookup(Children, Path).

get_rank_list(PathsByRank, Rank, Source) ->
  table_lookup(PathsByRank, {Rank, Source}).


set_rank_list(PathsByRank, Rank, Source, Path) ->
  Key = {Rank, Source},
  table_insert(PathsByRank, Key, Path).


generate_children(Config, Children, PathsByRank) ->
  generate_children(
    Config, Children, PathsByRank,
    config:get_source(Config),
    [],
    "",
    0).

generate_children(Config, Children, PathsByRank, Source, Ids, Path, Rank) ->
  Ids1 = [Source | Ids],
  Path1 = Path ++ integer_to_list(Source),
  set_rank_list(PathsByRank, Rank, Source, Path1),
  %%%
  Rounds = config:get_num_rounds(Config),
  if
    Rank < Rounds ->
      IdsToProcess = lists:seq(0, config:get_num_procs(Config) - 1) -- Ids1,
      lists:foreach(
        fun(Source1) -> generate_children(Config, Children, PathsByRank, Source1, Ids1, Path1, Rank + 1),
          set_children_path(Children, Path1, Path1 ++ integer_to_list(Source1))
        end, IdsToProcess);
    true ->
      true
  end,
  Children.


loop(Children, PathsByRank) ->
  receive
    {From, {get_rank_list, Rank, Source}} ->
      From ! {repository, get_rank_list(PathsByRank, Rank, Source)},
      loop(Children, PathsByRank);
    {From, {get_children_path, Path}} ->
      From ! {repository, get_children_path(Children, Path)},
      loop(Children, PathsByRank);
    {_From, reset} ->
      ets:delete_all_objects(Children),
      ets:delete_all_objects(PathsByRank),
      loop(Children, PathsByRank);
    {_From, generate_children, Config} ->
      generate_children(Config, Children, PathsByRank),
      loop(Children, PathsByRank);
    {_From, die, Reason} ->
      ets:delete(Children),
      ets:delete(PathsByRank),
      exit(Reason);
    Any ->
      io:format("Unexpected message~p~n", [Any])
  end.




benchmark_byz_general() ->
  engine:init(),
  benchmark_byz_general(5).

benchmark_byz_general(N) when N < 100 ->
  benchmark_byz_general(N, 10),
  benchmark_byz_general(N + 10);
benchmark_byz_general(N) when N >= 100 ->
  true.

benchmark_byz_general(N, M) when M < 100 ->
  statistics(runtime),
  statistics(wall_clock),
  Source = round(N / 3),
  io:format("Starting|~p|~p|~p~n", [M, N, Source]),
  engine:start(Source, N, M),
  engine:run(),
  {_, RT} = statistics(runtime),
  {_, WC} = statistics(wall_clock),
  io:format("Finished|~p|~p|~p|~p|~p|~p~n", [M, N, Source, counter:get_value(), RT, WC]),
  engine:reset(),
  benchmark_byz_general(N, M + 10);
benchmark_byz_general(_N, M) when M >= 100 ->
  true.

I merged the results with following script:

class Stats
     attr_accessor :processes
     attr_accessor :rounds
     attr_accessor :messages
     attr_accessor :java_time
     attr_accessor :erlang_time
     attr_accessor :cpp_time
     attr_accessor :ruby_time
     def initialize(procs, rounds, msgs)
      @processes = procs.to_i
      @rounds = rounds.to_i
      @messages = msgs.to_i
    end
    def key
      "#{@processes}/#{@rounds}"
    end
    def to_s
      "#{@processes},#{rounds},#{@messages},#{@cpp_time},#{@ruby_time},#{@java_time},#{@erlang_time}"
    end
  end
 
  stats = {}
 
  files = ["cpp.out", "ruby.out", "java.out", "erlang.out"]
  count_setter = ["cpp_time=", "ruby_time=", "java_time=", "erlang_time="]
  for i in 0...files.length
      File.open(files[i], "r").readlines.each do |line|
          if line =~ /Finish/
              t = line.split("|")
              stat = Stats.new(t[2], t[1], t[4])
              time = t[5].to_i
              stat.send(count_setter[i], time)
              stats[stat.key] = stat
           end
       end
  end
 
 
  puts "Processes,Rounds,Messages,CPP Time,Ruby Time,Java Time,Erlang Time"
  stats.values.sort_by {|stat| stat.processes * stat.rounds}.each do |stat|
    puts stat
  end
 

And here are the results of benchmark that I ran on my Dell notebook (Note I used time function in C++ which only returns timing in seconds, so C++ timings are actually not precise.):

Conclusion:

This was interesting problem that tested limits of these languages. For example, I found when using more than 10 processes things got really nasty. The Java program gave up with OutOfMemory. The Erlang program dumped crashed, though I would have used OTP’s supervisors if I was writing more fault tolerant application. Ruby program became too slow, so I had to kill it. Java turned out to be the performance leader in this case and I was a bit surprised when Erlang’s response time became really high with 9 processes. As, I mentioned earlier, the C++, Java and Ruby versions are not really concurrent and their message passing is really method invocation. As far as mapping algorithm to the language, I found that Erlang fit very nicely for distributed algorithms that involve a lot of communications. I left the C++, Java and Ruby version very simple and didn’t try to implement truly independent processes and communication because that would have required a lot more effort.

Resources:

Source Code

August 30, 2007

Design/Code Smells

Filed under: Computing — admin @ 10:59 am

I have been redesigning a J2EE system recently that was written about a year ago and I have done this countless in my career of over fifteen years. But I found a quite a number of code smells that show up every time. I find that badly design procedural code is easier to fix than badly design object oriented code. Often the systems are written in object oriented languages and using state of the art technologies such as Hibernate/Spring, so you would think that they also follow good principles of object oriented or domain driven design, but you will be surprised. First, let me summarize characteristics of good systems that I learned at school and over the years:

  • loosely coupled or separation of concerns – this is by far the most important ingredient of good software.
  • good abstraction that focus on areas of domain that are focus of the system
  • separation of concerns – good use layers. Here by layer, I mean logical layer not physical layer (tier), which is also source of confusion to many.
  • single responsibility principle – high encapsulated modules that have high cohesion within and low coupling across

Also, when you are developing large systems, it is also important to consider physical separation of modules and interdependencies, which means system with:

  • low cyclomatic complexity
  • Acyclic Dependency Principle – No circular dependencies
  • Dependency injection – This greatly helps coupling and facilitates testing.
  • Multiple codebases – Though, it doesn’t matter for small systems, but when dealing with hundreds of thousands of lines of code, it is important to separate code into multiple codebases along the line of business functionality.

And now the actual code smells that I have found time and again while maintaining, extending or rewriting systems:

Procedural code in disguise of Object Oriented code

It says, old habits are hard to break, often I find systems that are written in object oriented languages, but in procedural style. The smells I often find are:

  • anemic model (thanks to the EJB legacy), where domain classes are just like C structs and all business logic is in services. Often I find that service managers have methods that take domain class as an argument and do some business logic and return. Such business logic is better off in the domain object itself.
  • static methods and attributes – this is also big code smell often from developers that started with C like languages.

Tight Coupling

This would be the winner of code smell that is found most often. For example, in my current system I found that domain classes were more like structs, and services where all the fun happened. The services did all database queries (Hibernate) and business logic. Worse, the application had severe performance issues so for every service there was a sister class for caching the service methods and often those caching classes also had database access methods. Clearly, they violated coupling and separation of concerns. I am trying to divide all those responsibilities into domain classes, Dao/Repositories, and Services. In this case, most of the business logic will go to the domain classes, all database access will go to Dao/Repositories and high level business or orchestration logic will go to the services. In this design, cross cutting concerns like transactions, caching are defined at service layer in declarative manner. For example, I added some caching annotations to signify the methods that should be cached. Another violation of separation of concerns was coupling of user view in the service layer and a lot of service managers took Locale as argument and looked up resource file. All that logic will be in the view layer.

Factories and Singletons

The GoF made Factories and Singleton a must have pattern for all cool object oriented systems, but in reality they make maintenance and testing hard. I found plenty of usage of these patterns in my current system and I am replacing them with dependency injection.

Too much Code

Though, for a large system you do end up with a lot of code, but I have seen even smaller projects with a lot of code. Also, some languages are more verbose than others. For example, often I find that code I write in Ruby takes less than half number of lines of code.

Distinction between Classes and Objects

I have found this smell in a number of projects where many of the classes were just classes with different values or slightly different data. For example, I found over 400 classes in my current project that I would consider objects with a little bit of difference. Though, for complicated systems where meta information is slightly different, you may use adaptive object model pattern, but that is not for the faint of heart.

Over Engineering

This is another reason for large code base when the system adds some technical requirements when they are not really needed. For example, in my current system we don’t have internationalization requirement, but we have sporadic use of localization. It may not be too bad, except all that localization is inconsistent, which would have to be changed if i18n becomes real requirement. Meanwhile, you will have to maintain all localization code. In this regard, I would follow agile principle of YAGNI.

Cross Cutting Concerns/Separation of Policies

The cross cutting concerns such as caching, security, transactions should be handled centrally in a declarative manner. Though, AOP is not for faint of heart, but EJB 3.0 and Spring provide nice support for that. Also, with annotations in Java, you can define your own policies. For example, caching was one thing that I found embedded in every piece of code (in addition to separate sister classes for services) in my system and it was really hard to track how things work.

Strings as glorified DSL

This is another code smell that I have seen where strings store some complicated data structure that you have to parse. For example, prior to JDK 1.4, developers had to parse line numbers of source code from stack trace, because it would return String. I found countless examples of this in my current system where strings stored complicated data structures or in other cases where strings were used when actual typing such as Long, Date would have been better.

Pareto principle (80/20 Rule)

I believe the system should be design based on the majority of use cases. Unfortunately, I have seen some systems where the whole design was sacrificed to fit some corner cases. This made the whole system very convoluted. The special cases should be handled specially and not at the cost of entire system. Again, I found an example of this in my current system where primary keys were defined with convoluted abstraction just because a couple of classes could not support surrogate keys for primary ids.

Conclusion

These days, I find most the projects are driven by quick wins. I find that in most of the development shops, developers are facing impossible deadlines where they must ship the system. In these circumstances, they do the quick and dirty job to ship the product, but soon it becomes the maintenance hell. I have found that since the dot bombs era, the working environment in most shops have degraded for developers and offshoring has worsened this situation. Many of the managers consider developers “code monkeys” that can be easily replaced. Unfortunately, I don’t see things improving anytime soon.

PS: Further reading:

August 29, 2007

Polymorphism in Erlang

Filed under: Computing — admin @ 4:48 pm

Though, Joe Armstrong denies that Erlang is object oriented language, but according Ralph Johnson concurrent Erlang is object oriented. Erlang is based on Actor model, where each object has a mailbox associated that receives messages. In this model, the message passing takes literal form rather than method invocation. Erlang also has strong pattern matching support, so you can achieve polymorphism by pattern matching. This kind of polymorphism is similar to duck typing rather than inheritance based polymorphism in object oriented languages. One of the hardest thing in object oriented language is to know when something can be a subclass of another class because you have to measure IS-A test or Liskov substitution principle. Most people also consider inheritance bad due to encapsulation leakage so duck typing will be easier. Since, all states are immutable in Erlang, you can’t really keep state as traditional object oriented language, but you can create a new state with every mutation and attach it to the process. For example, here is a simple example of polymorphism in Java :

  1 import junit.framework.Test;
  2 import junit.framework.TestCase;
  3 import junit.framework.TestSuite;
  4
  5 import java.util.*;
  6
  7
  8 public class PolymorTest extends TestCase {
  9     Shape[] shapes;
 10
 11     protected void setUp() {
 12         shapes = new Shape[] {
 13                 new Circle(3.5), new Rectangle(20, 50), new Square(10)
 14             };
 15     }
 16
 17     protected void tearDown() {
 18     }
 19
 20     public void testArea() {
 21         assertEquals(38.484, shapes[0].area(), 0.001);
 22         assertEquals(1000.0, shapes[1].area(), 0.001);
 23         assertEquals(100.0, shapes[2].area(), 0.001);
 24     }
 25
 26     public void testPerimeter() {
 27         assertEquals(21.991, shapes[0].perimeter(), 0.001);
 28         assertEquals(140.0, shapes[1].perimeter(), 0.001);
 29         assertEquals(40.0, shapes[2].perimeter(), 0.001);
 30     }
 31
 32     public static Test suite() {
 33         TestSuite suite = new TestSuite();
 34         suite.addTestSuite(PolymorTest.class);
 35
 36         return suite;
 37     }
 38
 39     public static void main(String[] args) {
 40         junit.textui.TestRunner.run(suite());
 41     }
 42
 43     interface Shape {
 44         public double area();
 45
 46         public double perimeter();
 47     }
 48
 49     class Rectangle implements Shape {
 50         double height;
 51         double width;
 52
 53         Rectangle(double height, double width) {
 54             this.height = height;
 55             this.width = width;
 56         }
 57
 58         public double area() {
 59             return height * width;
 60         }
 61
 62         public double perimeter() {
 63             return (2 * height) + (2 * width);
 64         }
 65
 66         public void setWidth(double width) {
 67             this.width = width;
 68         }
 69
 70         public double getWidth() {
 71             return this.width;
 72         }
 73
 74         public double getHeight() {
 75             return this.height;
 76         }
 77
 78         public void setHeight(double height) {
 79             this.height = height;
 80         }
 81     }
 82
 83     class Square extends Rectangle {
 84         Square(double side) {
 85             super(side, side);
 86         }
 87     }
 88
 89     class Circle implements Shape {
 90         double radius;
 91
 92         Circle(double radius) {
 93             this.radius = radius;
 94         }
 95
 96         public double getRadius() {
 97             return this.radius;
 98         }
 99
100         public void setRadius(double radius) {
101             this.radius = radius;
102         }
103
104         public double area() {
105             return Math.PI * radius * radius;
106         }
107
108         public double perimeter() {
109             return Math.PI * diameter();
110         }
111
112         public double diameter() {
113             return 2 * radius;
114         }
115     }
116 }
117

And here is Ruby equivalent, which uses duck typing:

 1 require 'test/unit'
 2 require 'test/unit/ui/console/testrunner'
 3
 4
 5 class Rectangle
 6   attr_accessor :height, :width
 7   def initialize(height, width)
 8         @height = height
 9         @width = width
10   end
11   def area()
12       height*width
13   end
14   def perimeter()
15       2*height + 2*width
16   end
17 end
18
19
20 class Square
21   attr_accessor :side
22   def initialize(side)
23         @side = side
24     end
25     def area()
26       side*side
27     end
28     def perimeter()
29       4 * side
30     end
31 end
32
33
34 class Circle
35     attr_accessor :radius
36    def initialize(radius)
37         @radius = radius
38     end
39     def area()
40      Math::PI*radius*radius
41     end
42     def perimeter()
43     Math::PI*diameter()
44       end
45      def diameter()
46       2*radius
47     end
48 end
49
50 class PolymorTest < Test::Unit::TestCase
51     def setup
52         @shapes = [Circle.new(3.5), Rectangle.new(20, 50), Square.new(10)]
53     end
54
55     def test_area
56         assert_in_delta(38.484, @shapes[0].area(), 0.001, "didn't match area0 #{@shapes[0].area()}")
57         assert_in_delta(1000.0, @shapes[1].area(), 0.001, "didn't match area1 #{@shapes[1].area()}")
58         assert_in_delta(100.0, @shapes[2].area(), 0.001, "didn't match area2 #{@shapes[2].area()}")
59     end
60     def test_perimeter
61         assert_in_delta(21.991, @shapes[0].perimeter(), 0.001, "didn't match perim0 #{@shapes[0].perimeter()}")
62         assert_in_delta(140.0, @shapes[1].perimeter(), 0.001, "didn't match perim1 #{@shapes[1].perimeter()}")
63         assert_in_delta(40.0, @shapes[2].perimeter(), 0.001, "didn't match perim2 #{@shapes[2].perimeter()}")
64     end
65 end
66
67
68 Test::Unit::UI::Console::TestRunner.run(PolymorTest)
69

And here is Erlang equivalent (obviously it’s a lot more verbose):

  1 -module(rectangle).
  2 -compile(export_all).
  3
  4
  5 -record(rectangle, {height, width}).
  6
  7 new_rectangle(H, W) ->
  8     #rectangle{height = H, width = W}.
  9
 10 start(H, W) ->
 11     Self = new_rectangle(H, W),
 12     {Self, spawn(fun() -> loop(Self) end)}.
 13
 14 rpc(Pid, Request) ->
 15         Pid ! {self(), Request},
 16         receive
 17             {Pid, ok, Response} ->
 18                 Response
 19         end.
 20
 21
 22 area(Self) ->
 23   Self#rectangle.height * Self#rectangle.width.
 24
 25 perimeter(Self) ->
 26   2 * Self#rectangle.height + 2 * Self#rectangle.width.
 27
 28 set_width(Self, W) ->
 29     Self#rectangle{width = W}.
 30
 31 get_width(Self) ->
 32     Self#rectangle.width.
 33
 34 set_height(Self, H) ->
 35     Self#rectangle{height = H}.
 36
 37 get_height(Self) ->
 38     Self#rectangle.height.
 39
 40 loop(Self) ->
 41     receive
 42         {From, area} ->
 43             From ! {self(), ok, area(Self)},
 44             loop(Self);
 45
 46         {From, perimeter} ->
 47             From ! {self(), ok, perimeter(Self)},
 48             loop(Self);
 49
 50         {From, get_width} ->
 51             From ! {self(), ok, get_width(Self)},
 52             loop(Self);
 53
 54         {From, get_height} ->
 55             From ! {self(), ok, get_height(Self)},
 56             loop(Self);
 57
 58         {set_width, W} ->
 59             loop(set_width(Self, W));
 60
 61         {set_height, H} ->
 62             loop(set_height(Self, H))
 63     end.
 64
 65
 66
 67
 68
 69
 70
 71 -module(circle).
 72 -compile(export_all).
 73 -define(PI, 3.14159).
 74
 75 -record(circle, {radius}).
 76
 77 new_circle(R) ->
 78     #circle{radius = R}.
 79
 80 start(R) ->
 81     Self = new_circle(R),
 82     {Self, spawn(fun() -> loop(Self) end)}.
 83
 84 rpc(Pid, Request) ->
 85         Pid ! {self(), Request},
 86         receive
 87             {Pid, ok, Response} ->
 88                 Response
 89         end.
 90
 91
 92 area(Self) ->
 93   ?PI * Self#circle.radius * Self#circle.radius.
 94
 95 perimeter(Self) ->
 96   ?PI * diameter(Self).
 97
 98 set_radius(Self, R) ->
 99     Self#circle{radius = R}.
100
101 get_radius(Self) ->
102     Self#circle.radius.
103 diameter(Self) ->
104     Self#circle.radius * 2.
105
106
107 loop(Self) ->
108     receive
109         {From, area} ->
110             From ! {self(), ok, area(Self)},
111             loop(Self);
112
113         {From, perimeter} ->
114             From ! {self(), ok, perimeter(Self)},
115             loop(Self);
116
117         {From, get_radius} ->
118             From ! {self(), ok, get_radius(Self)},
119             loop(Self);
120
121         {set_radius, R} ->
122             loop(set_radius(Self, R))
123     end.
124
125
126
127
128
129 -module(square).
130 -compile(export_all).
131
132
133 -record(square, {side}).
134
135 new_square(S) ->
136     #square{side = S}.
137
138 start(S) ->
139     Self = new_square(S),
140     {Self, spawn(fun() -> loop(Self) end)}.
141
142 rpc(Pid, Request) ->
143         Pid ! {self(), Request},
144         receive
145             {Pid, ok, Response} ->
146                 Response
147         end.
148
149
150 area(Self) ->
151   Self#square.side * Self#square.side.
152
153 perimeter(Self) ->
154   4 * Self#square.side.
155
156 set_side(Self, S) ->
157     Self#square{side = S}.
158
159 get_side(Self) ->
160     Self#square.side.
161
162 loop(Self) ->
163     receive
164         {From, area} ->
165             From ! {self(), ok, area(Self)},
166             loop(Self);
167
168         {From, perimeter} ->
169             From ! {self(), ok, perimeter(Self)},
170             loop(Self);
171
172         {From, get_side} ->
173             From ! {self(), ok, get_side(Self)},
174             loop(Self);
175
176         {set_side, S} ->
177             loop(set_side(Self, S))
178     end.
179
180
181
182
183 -module(poly_test).
184 -compile(export_all).
185
186 create() ->
187    [circle:start(3.5), rectangle:start(20, 50), square:start(10)].
188
189 area(ShapeNPid) ->
190     {_Shape, Pid} = ShapeNPid,
191     Pid ! {self(), area},
192     receive
193         {Pid, ok, Response} ->
194             Response
195     end.
196
197 perimeter(ShapeNPid) ->
198     {_Shape, Pid} = ShapeNPid,
199     Pid ! {self(), perimeter},
200     receive
201         {Pid, ok, Response} ->
202             Response
203     end.
204
205 get_area(ShapesNPids) ->
206    lists:map(fun(ShapeNPid) -> area(ShapeNPid) end, ShapesNPids).
207 get_perimeter(ShapesNPids) ->
208    lists:map(fun(ShapeNPid) -> perimeter(ShapeNPid) end, ShapesNPids).
209
210 test_area() ->
211     D = create(),
212     %[CircleArea, RectangleArea, SquareArea] = [38.4845,1000,100],
213     [CircleArea, RectangleArea, SquareArea] = get_area(D).
214
215 test_perimeter() ->
216     D = create(),
217     %[CirclePerimeter, RectanglePerimeter, SquarePerimeter] = [21.9911,140,40],
218     [CirclePerimeter, RectanglePerimeter, SquarePerimeter] = get_perimeter(D).
219

Lessons Learned:

Though, Erlang supports object oriented features in its concurrent model, but it is a lot more verbose. For example, in above example Ruby was roughly 60 lines of code and Java was twice as long, whereas Erlang was about four times long.

August 20, 2007

Benchmarking Java Vs Erlang

Filed under: Computing — admin @ 3:07 pm

I have been reading Joe Armstrong’s Erlang book recently and one of the assignment is to compare performance of processes and message communication by creating a ring of processes and sending messages. The assignment needs to be done in two languages so I Java as another language.

Here is the Java version:

  1 import java.util.*;
  2 import java.util.concurrent.*;
  3
  4 /**
  5  * Create N followers in a ring. Send a message round the ring M times so that a total of N * M messages get sent.
  6  * Time how long this takes for different value of N and M.
  7  * javac Ring.java
  8  * java -Xss48k -cp . Ring
  9  */
 10 public class Ring {
 11     //
 12     public class Follower extends Thread {
 13         final BlockingQueue queue = new LinkedBlockingQueue();
 14         Follower nextFollower;
 15         int pid;
 16         int numberOfMessages;
 17
 18         public Follower(int pid, int numberOfMessages) {
 19             this.pid = pid;
 20             this.numberOfMessages = numberOfMessages;
 21         }
 22
 23         public void setNextFollower(Follower nextFollower) {
 24             this.nextFollower = nextFollower;
 25         }
 26
 27
 28         public void sendAsynchronous(Object message) {
 29             queue.add(message);
 30         }
 31
 32         public void run() {
 33             //System.out.println("Starting Pid " + pid + ", numberOfMessages " + numberOfMessages);
 34             for (int i=0; i<numberOfMessages; i++) {
 35                 try {
 36                     Object message = queue.take();
 37                     nextFollower.sendAsynchronous(message);
 38                 } catch (InterruptedException e) {
 39                     Thread.currentThread().interrupted();
 40                 }
 41             }
 42             //System.out.println("Ending Pid " + pid + ", numberOfMessages " + numberOfMessages);
 43         }
 44     }
 45
 46     public class Leader extends Follower {
 47         public Leader(int pid) {
 48             super(pid, 0);
 49         }
 50
 51
 52         public void run() {
 53             // leader will not run asynchronously
 54         }
 55
 56         public void sendReceiveSynchronous(Object message) {
 57             try {
 58                 nextFollower.sendAsynchronous(message);
 59                 message = queue.take();
 60             } catch (InterruptedException e) {
 61                 Thread.currentThread().interrupted();
 62             }
 63         }
 64     }
 65
 66
 67     public Ring() {
 68     }
 69
 70
 71
 72     public void runRing(int numberOfProcesses, int numberOfMessages) {
 73         Leader leader = new Leader(-1);
 74         Follower[] followers = buildFollowers(numberOfProcesses-1, numberOfMessages, leader);
 75         startFollowers(followers);
 76         for (int i=0; i<numberOfMessages; i++) {
 77             leader.sendReceiveSynchronous(Boolean.TRUE); // we are not taking message size into account
 78         }
 79     }
 80
 81
 82     public void benchmarkRing(int numberOfProcesses, int numberOfMessages) {
 83         System.out.println("Starting ring for " + numberOfProcesses + " threads and " + numberOfMessages + " messages");
 84         long start = System.currentTimeMillis();
 85         //
 86         runRing(numberOfProcesses, numberOfMessages);
 87
 88         long elapsed = System.currentTimeMillis() - start;
 89         System.out.println("Ring for " + numberOfProcesses + " threads and " + numberOfMessages + " messages took " + elapsed + " milliseconds.");
 90     }
 91
 92     private void startFollowers(Follower[] followers) {
 93         for (int i=0; i<followers.length; i++) {
 94             followers[i].setDaemon(true);
 95             followers[i].start();
 96         }
 97     }
 98
 99
100     private Follower[] buildFollowers(int numberOfFollowers, int numberOfMessages, Leader leader) {
101         Follower[] followers = new Follower[numberOfFollowers];
102         for (int i=0; i<followers.length; i++) {
103             followers[i] = new Follower(i, numberOfMessages);
104         }
105         leader.setNextFollower(followers[0]);
106         for (int i=0; i<followers.length; i++) {
107             if (i == followers.length-1) {
108                 followers[i].setNextFollower(leader);
109             } else {
110                 followers[i].setNextFollower(followers[i+1]);
111             }
112         }
113         return followers;
114     }
115
116
117     //
118     public static void main(String[] args) {
119         for (int i=100; i<10000; i+=100) {
120             for (int j=100; j<10000; j+=1000) {
121                 new Ring().benchmarkRing(i, j);
122             }
123         }
124     }
125 }
126

And here is the Erlang version:

 1 -module(ring).
 2 -compile(export_all).
 3
 4 % c(ring).
 5 % Pid = ring:start(2).
 6 % Pid ! {self(), 0}.
 7 % Pid ! {self(), 2}.
 8 % ring:benchmark_ring(10, 4).
 9
10
11 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
12 % start - spawns a process and runs receive_send_loop function. It passes M - # of messages to the
13 % loop.
14 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
15 start(M, NextPid) ->
16     spawn(fun() -> receive_send_loop(M, NextPid) end).
17
18
19 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
20 % ring stars N processes, each will run receive_send_loop.
21 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
22 start_ring(N, M) ->
23     LastPid = self(),
24     start_ring(N-1, M, LastPid).
25
26 start_ring(N, M, LastPid) when N > 0 ->
27     Pid = start(M, LastPid),
28     start_ring(N-1, M, Pid);
29 start_ring(N, _M, LastPid) when N == 0 ->
30     LastPid.
31
32 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
33 % send_receive_message sends a message with D, M number of times to all processes inside in list L
34 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
35 send_receive_message(M, D, LastPid) when M > 0 ->
36     Payload = case M of
37       1 -> {nomore, D};
38       _Else -> {ok, D}
39     end,
40     %io:format("Master ~p Sending message to ~p~n", [self(), LastPid]),
41     LastPid ! Payload,
42
43     %io:format("Master ~p Reciving message ~n", [self()]),
44     receive
45             {ok, Response} ->
46                 Response
47     end,
48     %io:format("Master ~p Received message ~n", [self()]),
49     send_receive_message(M-1, D, LastPid);
50
51 send_receive_message(M, _D, _LastPid) when M == 0 ->
52     true.
53
54 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
55 % benchmark_ring invokes ring function and calculates timings.
56 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
57 benchmark_ring() ->
58     benchmark_ring(100).
59
60 benchmark_ring(N) when N < 10000 ->
61     benchmark_ring(N, 100),
62     benchmark_ring(N+100);
63 benchmark_ring(N) when N >= 10000 ->
64     true.
65
66 benchmark_ring(N, M) when M < 10000 ->
67     io:format("Starting ring for ~w processes and ~w messages.~n", [N, M]),
68     statistics(runtime),
69     statistics(wall_clock),
70     LastPid = start_ring(N, M),
71     send_receive_message(M, 'message', LastPid),
72     {_, RT} = statistics(runtime),
73     {_, WC} = statistics(wall_clock),
74     io:format("Ring for ~w processes and ~w messages took ~p (~p) milliseconds.~n", [N, M, RT, WC]),
75     benchmark_ring(N, M+1000);
76 benchmark_ring(_N, M) when M >= 10000 ->
77     true.
78
79
80
81 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
82 % receive_send_loop receives messages in a loop until it receives nomore.
83 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
84 receive_send_loop(M, NextPid) ->
85     receive
86         {nomore, Any} ->
87             %io:format("~p received ~p last message ~p nextPid ~p ~n", [self(), M, NextPid, Any]),
88             NextPid ! {ok, Any};
89         {ok, Any} ->
90             %io:format("~p received ~p message ~p nextPid ~p ~n", [self(), M, NextPid, Any]),
91             NextPid ! {ok, Any},
92             receive_send_loop(M-1, NextPid)
93      end.
94
95

And the verdict is Erlang is much more efficient than Java in process/thread creation and message communication I had to actually explicitly reduce the stack size to run Java program, but there is no comparison. Here is the results of benchmarks.

Here is simple ruby script that I used to merge output statistics from both runs:

 1 class Stats
 2   attr_accessor :processes
 3   attr_accessor :messages
 4   attr_accessor :java_time
 5   attr_accessor :erlang_time
 6   def initialize(procs, msgs, jtime, etime)
 7     @processes = procs.to_i
 8     @messages = msgs.to_i
 9     @java_time = jtime.to_i if jtime
10     @erlang_time = etime.to_i if etime
11   end
12   def key
13     "#{@processes}/#{@messages}"
14   end
15
16   def to_s
17     "#{@processes},#{@messages},#{@java_time},#{@erlang_time}"
18   end
19 end
20
21 stats = {}
22 File.open("javaring.out", "r").readlines.each do |line|
23   if line =~ /Ring for/
24     t = line.split(/ /)
25     stat = Stats.new(t[2], t[5], t[8], nil)
26     stats[stat.key] = stat
27     #puts "#{stat} --- for line #{t.join(', ')}" 
28   end
29 end
30
31 File.open("erlout", "r").readlines.each do |line|
32   if line =~ /Ring for/
33     t = line.split(/ /)
34     stat = Stats.new(t[2], t[5], nil, t[8])
35     old = stats[stat.key]
36     if old
37       old.erlang_time = stat.erlang_time
38     else
39       stats[stat.key] = stat
40     end
41     #puts "#{stat} --- for line #{t.join(', ')}" 
42   end
43 end
44 stats.values.sort_by {|stat| stat.processes * stat.messages}.each do |stat|
45   puts stat
46 end
47
48

Final Thoughts

Erlang is great for writing highly concurrent applications. It shows that smart use of green threads can outperform native threads. The only thing I found a bit verbose about Erlang is that you have to write a switch statement to receive messages. I wish these processes be more object oriented where message passing is done by method invocation instead of explicitly as it’s done in Erlang. What I mean is, instead of writing

  receive
     {label1, From, RealData} ->
          action;
     {label2, From, RealData} ->
          action;

It be more like module with functions defined as label1, label2, .. and arguments that accept From, RealData. Also, a lot of time, you have to send back a message, which can also be implicitly sent if the function returns anything. Back in mid 90s I wrote a Java based ORB that had a ServiceFactory that took any POJO object and converted that into Service. I consider these processes as small services and it would be nice to have same mechanism to convert  module into service. Another thing I find hard about Erlang (besides immutable data) are cryptic error messages. I am used to seeing the line number or stack trace where the error occurred, but Erlang error are very cryptic. As with learning a new language, you also have to learn all the libraries and Erlang has huge OTP beast that I will have to tame. Nevertheless, I like Erlang so far and it’s going to be favorite language.

August 7, 2007

Ten Commandments for Scalable Architecture

Filed under: Computing — admin @ 9:45 pm

Scalability generally means a system can support more users when adding more resources. It can be vertical meaning adding more memory, CPUs or horizontal meaning more computers. Following are commandments that I have learned to make scalable systems such as large traffic sites, travel sites and e-commerce sites:

1. Divide and conquer – Design a loosely coupled and shared nothing architecture by breaking a large system into a set of smaller shared services. The services should be stateless that can deployed on any number of machines. You can use Theory of constraints to find bottlenecks because these will limit how many servers or requests you can perform.

2. Use messaging oriented middleware (ESB) to communicate with the services. This avoids temporal dependencies that are inherent in RPC style services.

3. Resource management – Manage http sessions and remove them for static contents, close all resources after usage such as database connections. Avoid expensive resource management protocols such as two phase commits, which can be scalability killer. Instead use optimistic locking or compensating transactions.

4. Replicate data – For write intensive systems use master-master scheme to replicate database and for read intensive systems use master-slave configuration. Use queues for all database writes so that replication don’t block incoming requests and make sure writes go through your cache system so that reads know about them. For user facing requests use high availability design and for background processes use high consistency design (CAP).

5. Partition data (Sharding) – Use multiple databases to partition the data. Use some naming services to find the data. You can also denormalized data to partion the data.

6. Avoid single point of failure – Identify any kind of single point of failures in hardware, software, network, power supply.

7. Bring processing closer to the data – Instead of transmitting large amount of data over the network, bring the processing closer to the data. May be some kind of mobile agent technology can help here to automatically move processing units.

8. Design for service failures and crashes – Write your services as idempotent so that retries can be done safely. Have a decentralized logging and monitoring. However, have a centralized way to monitor or gather logs for services. Keep an eye on performance. Invest in tools for monitoring and managing services.

9. Dynamic Resources – Design service frameworks so that resources can be removed or added automatically and clients can automatically discover them. You will need some kind of smart load balancer here. Again, all this should be connected to a centralized monitoring system.

10. Smart Caching – Cache expensive operations and contents as much as possible. Precompute your contents to reduce load on the application server. Unfortunately, caching on the same server as application server can reduce memory for application server, so I suggest doing caching on separate servers (caching farm). You can add version to cache and set time out so that you don’t have to invalidate cache. Also, use multi-level caching and store contents on  disks so if the machine crashes it can restart quickly and cache warmup is quick.

Annotation based Caching in Java

Filed under: Computing — admin @ 3:33 pm

I probably had to write cache to store results of expensive operations or queries a dozens of times over last ten years of Java. And this week, I had to redo again. Though, there are a lot of libraries such as JCS , Terracota or Tangosol, but I just decided to write a simple implementation myself.

First, I defined an annotation that will be used to mark any class or methods cachable:

import java.lang.reflect.*;
import java.lang.annotation.*;

/**
 * Annotation for any class that wish to be cacheable. This annotation can be defined
 * at the class level which will allow caching of all methods or at methods or both.
 */

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Cacheable {
    /**
     * This method allows disabling cache for certain methods.
     */
    boolean cache() default true;

    /**
     * This method defines maximum number of items that will be stored in cache. If
     * number of items exceed this, then old items will be removed (LRU). This value
     * should only be defined at class level and not at the method level.
     */
    int maxCapacity() default 1000;

    /**
     * @return true if block of [ if (in cache) load ] is protected with lock. This
     * will prevent two threads from executing the same load
     */
    boolean synchronizeAccess() default false;

    int timeoutInSecs() default 300;

    /**
     * @return true if cache will be populated asynchronously when it's about to expire.
     */
    public boolean canReloadAsynchronously() default false;
}
I then defined a simple map class to store cache:

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;

/**
 * CacheMap - provides lightweight caching based on LRU size and timeout
 * and asynchronous reloads.
 */

public class CacheMap<K, V> implements Map<K, V> {
    private final static int MAX_THREADS = 10; // for all cache items across VM
    private final static int MAX_ITEMS = 10000; // for all cache items across VM

    private final static ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREADS);

    static class FixedSizeLruLinkedHashMap<K, V> extends LinkedHashMap<K, V> {
        private final int maxSize;

        public FixedSizeLruLinkedHashMap(int initialCapacity, float loadFactor, int maxSize) {
            super(initialCapacity, loadFactor, true);
            this.maxSize = maxSize;
        }

        @Override
        protected boolean removeEldestEntry(java.util.Map.Entry<K, V> eldest) {
            return size() > maxSize;
        }

    }

    private final Cacheable classCacheable;
    private final Map<K, Pair<Date, V>> map;
    private final Map<Object, ReentrantLock> locks = new HashMap<Object, ReentrantLock>();

    public CacheMap(Cacheable cacheable) {
        this.classCacheable = cacheable;
        int maxCapacity = cacheable != null && cacheable.maxCapacity() > 0
                && cacheable.maxCapacity() < MAX_ITEMS ? cacheable.maxCapacity() : MAX_ITEMS;
        this.map = Collections.synchronizedMap(
                new FixedSizeLruLinkedHashMap<K, Pair<Date, V>>(
                        maxCapacity / 10, 0.75f, maxCapacity));
    }

    @Override
    public int size() {
        return map.size();
    }

    @Override
    public boolean isEmpty() {
        return map.isEmpty();
    }

    @Override
    public boolean containsKey(Object key) {
        return map.containsKey(key);
    }

    @Override
    public boolean containsValue(Object value) {
        return map.containsValue(value);
    }

    @Override
    public V put(K key, V value) {
        Pair<Date, V> old = map.put(key, new Pair<>(new Date(), value));
        if (old != null) {
            return old.second;
        }
        return null;
    }

    @Override
    public V remove(Object key) {
        Pair<Date, V> old = map.remove(key);
        if (old != null) {
            return old.second;
        }
        return null;
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> m) {
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
            put(e.getKey(), e.getValue());
        }
    }

    @Override
    public void clear() {
        map.clear();
    }

    @Override
    public Set<K> keySet() {
        return map.keySet();
    }

    @Override
    public Collection<V> values() {
        List<V> list = new ArrayList<>();
        for (Pair<Date, V> e : map.values()) {
            list.add(e.getSecond());
        }
        return list;
    }

    @Override
    public Set<Entry<K, V>> entrySet() {
        Set<Entry<K, V>> set = new HashSet<>();
        for (final Map.Entry<K, Pair<Date, V>> e : map.entrySet()) {
            set.add(new Map.Entry<K, V>() {
                @Override
                public K getKey() {
                    return e.getKey();
                }

                @Override
                public V getValue() {
                    return e.getValue().getSecond();
                }

                @Override
                public V setValue(Object value) {
                    return null;
                }
            });
        }
        return set;
    }

    /**
     * This method is simple get without any loading behavior
     *
     * @param key to fetch
     */
    @Override
    public V get(Object key) {
        Pair<Date, V> item = this.map.get(key);
        if (item == null) {
            return null;
        }
        return item.getSecond();
    }

    /**
     * This method is simple get with loading behavior
     *
     * @param key to fetch
     */
    public V get(K key, Cacheable cacheable, CacheLoader<K, V> loader) {
        Pair<Date, V > item = this.map.get(key);
        V value;
        ReentrantLock lock = null;
        try {
            synchronized (this) {
                if (cacheable.synchronizeAccess()) {
                    lock = lock(key);
                }
            }
            if (item == null) {
                // load initial value
                value = reloadSynchronously(key, loader);
            } else if (cacheable.timeoutInSecs() > 0
                    && System.currentTimeMillis() - item.getFirst().getTime() > cacheable.timeoutInSecs() * 1000L) {
                // the element has expired, reload it
                if (cacheable.canReloadAsynchronously()) {
                    reloadAsynchronously(key, loader);
                    value = item.getSecond();
                } else {
                    value = reloadSynchronously(key, loader);
                }
            } else {
                value = item.getSecond();
            }
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
        return value;
    }

    private ReentrantLock lock(Object key) {
        ReentrantLock lock;
        synchronized (locks) {
            lock = locks.get(key);
            if (lock == null) lock = new ReentrantLock();
        }

        lock.lock();
        return lock;
    }

    private V reloadSynchronously(final K key, final CacheLoader<K, V> loader) {
        V value = loader.loadCache(key);
        put(key, value);
        return value;
    }

    private void reloadAsynchronously(final K key, final CacheLoader< K, V> loader) {
        executorService.submit(new Callable<V>() {
            public V call() {
                return reloadSynchronously(key, loader);
            }
        });
    }
}
A couple of interfaces to load or search key/values:

import java.lang.reflect.*;
public interface CacheKeyable<K> {
    K toKey(Method method, Object[] args);
}
public interface CacheLoader<K, V> {
    /** 
     * This mehod loads value for the object by calling underlying 
     * method 
     * @param key - key of the cache 
     * @return value for the key
    */
  	public V loadCache(K key);
}
Following is a simple class to convert method and its args into a unique key (I suppose we can use MD5 SHA1 instead here):

public class SimpleKeyable implements CacheKeyable<String> {
    @Override
    public String toKey(Method method, Object[] args) {
        StringBuilder sb = new StringBuilder(method.getName());
        for (int i = 0; args != null && i < args.length; i++) {
            sb.append(".");
            if (isPrimitive(args[i])) {
                sb.append(args[i].toString());
            } else {
                sb.append(System.identityHashCode(args[i]));
            }
        }
        return sb.toString();
    }

    // checks if object is primitive type
    private boolean isPrimitive(Object arg) {
        return arg.getClass().isPrimitive() ||
                arg.getClass().isEnum() ||
                arg instanceof Number ||
                arg instanceof CharSequence ||
                arg instanceof Character ||
                arg instanceof Boolean ||
                arg instanceof Date;
    }
}

A utility class for storing pair of objects:

class Pair<FIRST, SECOND> {
    final FIRST first;
    final SECOND second;

    Pair(FIRST first, SECOND second) {
        this.first = first;
        this.second = second;
    }

    public FIRST getFirst() {
        return first;
    }

    public SECOND getSecond() {
        return second;
    }
}

Following code adds support for parsing cache annotation:

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

public class CacheProxy<K, V> implements InvocationHandler {
    private final Object delegate;
    private final CacheMap<K, V> cache;
    private final Cacheable classcacheable;
    private final CacheKeyable<String> keyable;

    class CacheLoaderImpl implements CacheLoader<K, V> {
        Method method;
        Object[] args;

        CacheLoaderImpl(Method method, Object[] args) {
            this.method = method;
            this.args = args;
        }

        @Override
        public V loadCache(Object key) {
            try {
                return (V) method.invoke(delegate, args);
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    // Constructs cache proxy
    public CacheProxy(Object aDelegate) {
        this.delegate = aDelegate;
        this.classcacheable = getCacheable(aDelegate.getClass().getAnnotations());
        this.cache = new CacheMap<>(classcacheable);
        this.keyable = new SimpleKeyable();
    }

    // creates proxy object for cache
    public static Object proxy(Object delegate, Class<?> iface) {
        return Proxy.newProxyInstance(iface.getClassLoader(),
                new Class<?>[]{iface}, new CacheProxy(delegate));
    }

    public V invoke(
            final Object proxy,
            final Method method,
            final Object[] args) throws Exception {
        Cacheable cacheable = getCacheable(method);
        if (cacheable != null) {
            K key = toKey(method, args);
            return cache.get(key, cacheable, new CacheLoaderImpl(method, args));
        } else {
            return (V) method.invoke(delegate, args);
        }
    }

    private Cacheable getCacheable(Annotation[] ann) {
        for (int i = 0; ann != null && i < ann.length; i++) {
            if (ann[i] instanceof Cacheable) {
                return (Cacheable) ann[i];
            }
        }
        return null;
    }

    //
    private Cacheable getCacheable(Method method) throws
            Exception {
        // get method from actual class because interface method will not be annotated.
        method = delegate.getClass().getMethod(method.getName(), method.getParameterTypes());
        // first search method level annotation
        Cacheable cacheable = getCacheable(method.getDeclaredAnnotations());
        if (cacheable == null) {
            cacheable = classcacheable; // if not found search class level annotation
        }
        if (cacheable != null && !cacheable.cache()) {
            cacheable = null;
        }
        return cacheable;
    }

    private K toKey(Method method, Object[] args) {
        if (delegate instanceof CacheKeyable) {
            return (K) ((CacheKeyable) delegate).toKey(method, args);
        } else {
            return (K) keyable.toKey(method, args);
        }
    }
}

Instead of Java 1.3 proxy, we can also use aspects, e.g.

import java.util.*;
import java.lang.reflect.*;
import java.lang.annotation.*;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;

@Aspect
public class CacheAspect<K, V> {
    private final CacheMap<K, V> cache;
    private final CacheKeyable<String> keyable;
    class CacheLoaderImpl implements CacheLoader<K, V> {
        ProceedingJoinPoint pjp;
        CacheLoaderImpl(ProceedingJoinPoint pjp) {
            this.pjp = pjp;
        }
        @Override
        public V loadCache(Object key) {
            try {
                return pjp.proceed();
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    // Constructor
    public CacheAspect() {
        this.cache = new CacheMap<>(null);
        this.keyable = new SimpleKeyable();
    }

    @Around("@target(com.plexobject.Cacheable)")
    public Object proxy(ProceedingJoinPoint pjp) throws Exception {
        Object target = pjp.getTarget();
        Cacheable cacheable = getCacheable(pjp);
        //
        if (cacheable != null) {
            Method method = ((MethodSignature)pjp.getStaticPart().getSignature()).getMethod();
            Object key = toKey(target, method, pjp.getArgs());
            return cache.get(key, cacheable, new CacheLoaderImpl(pjp)); 51
        } else {
            try {
                return pjp.proceed();
            } catch (Exception e) {
                throw e;
            } catch (Throwable e) {
                throw new Error(e);
            }
        }
    }
    private Cacheable getCacheable(Annotation[] ann) {
        for (int i=0; ann != null && i<ann.length; i++) {
            if (ann[i] instanceof Cacheable) {
                return (Cacheable) ann[i];
            }
        }
        return null;
    }
    private Cacheable getCacheable(ProceedingJoinPoint pjp) throws Exception {
        if (pjp.getStaticPart().getSignature() instanceof MethodSignature == false) {
            return null;
        }
        Method method = ((MethodSignature)pjp.getStaticPart().getSignature()).getMethod();
        // first search method level annotation
        Cacheable cacheable = getCacheable(method.getDeclaredAnnotations());
        if (cacheable != null && cacheable.cache() == false) {
            cacheable = null;
        }
        return cacheable;
    }
    private K toKey(Object target, Method method, Object[] args) {
        if (target instanceof CacheKeyable) {
            return (K) ((CacheKeyable) target).toKey(method, args);
        } else {
            return (K) keyable.toKey(method, args);
        }
    }
}
 And finally a unit test to test everything:

import junit.framework.*;
import java.lang.reflect.*;
import java.util.*;
public class CacheProxyTest extends TestCase {
    interface IService {
        Date getCacheTime(boolean junk);
        Date getCacheTime(int junk);
        Date getRealtime(boolean junk);
    }
    @Cacheable(maxCapacity=100)
    class ServiceImpl implements IService {
        @Cacheable(timeoutInSecs = 1, synchronizeAccess = true)
        public Date getCacheTime(boolean junk) {
            return new Date();
        }
        @Cacheable(timeoutInSecs = 1, synchronizeAccess = true)
        public Date getCacheTime(int junk) {
            return new Date();
        }
        @Cacheable(cache=false)
        public Date getRealtime(boolean junk) {
            return new Date();
        }
    }

    public void testCache() throws Exception {
        ServiceImpl real = new ServiceImpl();
        Date started = new Date();
        IService svc = (IService) CacheProxy.proxy(real, IService.class);
        Date cacheDate = svc.getCacheTime(true);
        assertTrue(cacheDate.getTime() >= started.getTime());
        Date realDate = svc.getRealtime(true);
        assertTrue(realDate.getTime() >= started.getTime());
        Thread.currentThread().sleep(200);
        assertTrue(cacheDate.getTime() == svc.getCacheTime(true).getTime());
        assertTrue(svc.getCacheTime(false).getTime() > cacheDate.getTime());
        assertTrue(svc.getRealtime(true).getTime() > realDate.getTime());

        cacheDate = svc.getCacheTime(0);
        for (int i=0; i<100; i++) {
            svc.getCacheTime(i).getTime();
        }
        // after max capacity
        assertFalse(cacheDate == svc.getCacheTime(0));
    }
}
« Newer PostsOlder Posts »

Powered by WordPress