Shahzad Bhatti Welcome to my ramblings and rants!

November 4, 2020

Structured Concurrency in modern programming languages – Part-II

Filed under: Computing,Erlang,Languages — admin @ 8:46 pm

In this second part of the series on structured concurrency (Part-I, Part-III, Part-IV), I will review Elixir and Erlang languages for writing concurrent applications and their support for structured concurrency:

Erlang

The Erlang language was created by late Joe Armstrong when he worked at Ericsson and it is designed for massive concurrency by means of very light weight processes that are based on actors. Each process has its own mailbox for storing incoming messages of various kinds. The receive block in Erlang is triggered upon new message arrival and the message is removed and executed when it matches specific pattern match. The Erlang language uses supervisors for monitoring processes and immutable functional paradigm for writing robust concurrent systems. Following is high-level architecture of Erlang system:

As the cost of each process or actor is only few hundred bytes, you can create millions of these processes for writing highly scalable concurrent systems. Erlang is a functional language where all data is immutable by default and the state within each actor is held private so there is no shared state or race conditions.

An actor keeps a mailbox for incoming messages and processes one message at a time using the receive API. Erlang doesn’t provide native async/await primitives but you can simulate async by sending an asynchronous message to an actor, which can then reply back to the sender using its process-id. The requester process can then block using receive API until reply is received. Erlang process model has better support for timeouts with receive API to exit early if it doesn’t receive response within a time period. Erlang system uses the mantra of let it crash for building fault tolerant applications and you can terminate a process and all children processes connected.

Using actor model in Erlang

Following code shows how native send and receive primitives can be used to build the toy web crawler:

-module(erlcrawler).

-export([start_link/0, crawl_urls/3, total_crawl_urls/1]).

-record(request, {clientPid, ref, url, depth, timeout, created_at=erlang:system_time(millisecond)}).
-record(result, {url, status=pending, child_urls=0, started_at=erlang:system_time(millisecond), completed_at, error}).

-define(MAX_DEPTH, 4).
-define(MAX_URL, 11).
-define(DOMAINS, [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "yz.com"]).

make_request(ClientPid, Ref, Url, Depth, Timeout) ->
    #request{clientPid=ClientPid, ref=Ref, url=Url, depth=Depth, timeout=Timeout}.

make_result(Req) ->
    Url = Req#request.url,
    #result{url=Url}.

%%% Client API
start_link() ->
    spawn_link(fun init/0).

%%%%%%%%%%%% public method for crawling %%%%%%%%%%%%
%%% calling private method for crawling
%%% Pid - process-id of actor
%%% 0 - current depth
%%% Urls - list of urls to crawl
%%% Timeout - max timeout
crawl_urls(Pid, Urls, Timeout) when is_pid(Pid), is_list(Urls)  ->
    %% Boundary for concurrency and it will not return until all
    %% child URLs are crawled up to MAX_DEPTH limit.
    do_crawl_urls(Pid, 0, Urls, [], Timeout, 0).

total_crawl_urls(Pid) when is_pid(Pid) ->
    Self = self(),
    Pid ! {total, Self},
    receive {total_reply, Self, N} ->
        N
    end.

%%% Server functions
init() ->
    {ok, DownloaderPid} = downloader:start_link(),
    {ok, IndexerPid} = indexer:start_link(),
    loop(DownloaderPid, IndexerPid, 0).

%%% Main server loop
loop(DownloaderPid, IndexerPid, N) ->
    receive
        {crawl, Req} ->
            CrawlerPid = self(),
            spawn_link(fun() -> handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) end),
            debug_print(N),
            loop(DownloaderPid, IndexerPid, N+1);
        {total, Pid} ->
            Pid ! {total_reply, Pid, N},
            loop(DownloaderPid, IndexerPid, N);
        terminate ->
            ok
    end.


%%% Internal client functions
debug_print(N) when N rem 10000 == 0 ->
    io:format("~p...~n", [{N}]);
debug_print(_) ->
    ok.

%% Go through URLs to crawl, send asynchronous request to crawl and
%% then add request to a list to monitor that will be used to receive
%% reply back from the crawling actor.
do_crawl_urls(_, _, [], [], _, ChildURLs) ->
    ChildURLs; % all done
do_crawl_urls(_, ?MAX_DEPTH, _, _, _, _) ->
    0; % reached max depth, stop more crawling
do_crawl_urls(Pid, Depth, [Url|T], SubmittedRequests, Timeout, 0) when is_pid(Pid), is_integer(Depth), is_integer(Timeout) ->
    %%% monitoring actor so that we are notified when actor process dies
    Ref = erlang:monitor(process, Pid),
    %%% crawling next url to process
    Req = make_request(self(), Ref, Url, Depth, Timeout),
    Pid ! {crawl, Req},
    do_crawl_urls(Pid, Depth, T, SubmittedRequests ++ [Req], Timeout, 0);
do_crawl_urls(Pid, Depth, [], [Req|T], Timeout, ChildURLs) when is_pid(Pid) ->
    %%% receiving response from the requests that were previously stored
    Ref = Req#request.ref,
    receive
        {crawl_done, Ref, Res} ->
            erlang:demonitor(Ref, [flush]),
            do_crawl_urls(Pid, Depth, [], T, Timeout, Res#result.child_urls+ChildURLs+1);
        {'DOWN', Ref, process, Pid, Reason} ->
            erlang:error(Reason)
    after Timeout ->
        erlang:error({crawl_timeout, Timeout})
    end.


%%% Internal server functions called by actor to process the crawling request
handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) ->
    Res = make_result(Req),
    ClientPid = Req#request.clientPid,
    Url = Req#request.url,
    Ref = Req#request.ref,
    Depth = Req#request.depth,
    Timeout = Req#request.timeout,

    case downloader:download(DownloaderPid, Url) of
        {ok, Contents} ->
        {ok, Contents1} = downloader:jsrender(DownloaderPid, Url, Contents),
        Changed = has_content_changed(Url, Contents1),
        Spam = is_spam(Url, Contents1),
        if Changed and not Spam ->
            indexer:index(IndexerPid, Url, Contents1), % asynchronous call
        Urls = parse_urls(Url, Contents1),
                %% Crawling child urls synchronously before returning
                ChildURLs = do_crawl_urls(CrawlerPid, Depth+1, Urls, [], Timeout, 0) + 1,
                Res1 = Res#result{completed_at=erlang:system_time(millisecond), child_urls=ChildURLs},
                ClientPid ! {crawl_done, Ref, Res1};
            true ->
                Res1 = Res#result{completed_at=erlang:system_time(millisecond)},
                ClientPid ! {crawl_done, Ref, Res1}
            end;
        Err ->
            Res1 = Res#result{completed_at=erlang:system_time(millisecond), error = Err},
            ClientPid ! {crawl_done, Ref, Res1}
        end,
    ok.

%%%%%%%%%%%%%%% INTERNAL METHODS FOR CRAWLING %%%%%%%%%%%%%%%%
parse_urls(_Url, _Contents) ->
    % tokenize contents and extract href/image/script urls
    random_urls(?MAX_URL).

random_urls(N) ->
    [random_url() || _ <- lists:seq(1, N)].

has_content_changed(_Url, _Contents) ->
     % calculate hash digest and compare it with last digest
    true.

is_spam(_Url, _Contents) ->
     % apply standardize, stem, ngram, etc for indexing
    false.

random_url() ->
    "https://" ++ random_domain() ++ "/" ++ random_string(20).

random_domain() ->
    lists:nth(random:uniform(length(?DOMAINS)), ?DOMAINS).

random_string(Length) ->
    AllowedChars = "abcdefghijklmnopqrstuvwxyz",
    lists:foldl(fun(_, Acc) -> [lists:nth(random:uniform(length(AllowedChars)), AllowedChars)] ++ Acc end, [], lists:seq(1, Length)).

In above implementation, crawl_urls method takes list of URLs and time out and waits until all URLs are crawled. It uses spawn_link to create a process, which invokes handle_crawl method to process requests concurrently. The handle_crawl method recursively crawl the URL and its children up to MAX_DEPTH limit. This implementation uses separate Erlang OTP processes for downloading, rendering and indexing contents. The handle_crawl sends back the response with number of child URLs that it crawled.

-module(erlcrawler_test).
-include_lib("eunit/include/eunit.hrl").

-define(ROOT_URLS, ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]).

crawl_urls_test() ->
    {spawn, {timeout,30, do_crawl_urls(10000)}}.

%% Testing timeout and by default, it will terminate the test process so we will instead convert
%% kill signal into a message using erlang:exit
crawl_urls_with_timeout_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Timeout = 10, % We know that processing takes longer than 10 milliseconds
    Pid = erlcrawler:start_link(),
    process_flag(trap_exit, true),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout)
    end),
    {{crawl_timeout, _}, _} = receive
        {'EXIT', _, Reason} -> Reason
    after 1000 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_timeout_test: timedout as expected in millis ~p ~n", [{Elapsed}]).

%% Testing terminate/cancellation and killing a process will kill all its children
crawl_urls_with_terminate_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, 1000) % crawl_urls is synchronous method so calling in another process
    end),
    receive
    after 15 -> % waiting for a bit before terminating (canceling) process
        exit(Pid, {test_terminated})
    end,
    {test_terminated} = receive
        {'EXIT', Pid, Reason} -> Reason
    after 200 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_terminate_test: terminated as expected in millis ~p ~n", [{Elapsed}]).

do_crawl_urls(Timeout) ->
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    N = erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout),
    N1 = erlcrawler:total_crawl_urls(Pid),
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("do_crawl_urls: Crawled URLs in millis: ~p ~n", [{N, N1, Elapsed}]),
    ?assertEqual(N1, 19032).

Above tests show three ways to try out the crawl_urls API. First test crawl_urls_test tests happy case of crawling URLs within 10 seconds. The crawl_urls_with_timeout_test tests the timeout behavior to make sure proper error message is returned and all Erlang processes are terminated. The crawl_urls_with_terminate_test tests cancellation behavior by terminating the main crawling process. You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/erl_actor.

Following are major benefits of using this process model to implement structured concurrency:

  • The main crawl_urls method defines high level scope of concurrency and it waits for the completion of child tasks.
  • crawl_urls method takes a timeout parameter so that the crawling all URLs must complete with the time period.
  • Erlang allows parent-child relationship between processes where you can monitor child processes and get notified when a child process dies. You can use this feature to cancel the asynchronous task. However, it will abruptly end all processes and all state within the process will be lost.
  • Erlang implementation captures the error within the response so the client can handle all error handling using pattern matching or other approach common in Erlang applications.

Following are shortcomings using this approach for structured concurrency:

  • The terminate API is not suitable for clean cancellation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Though, you can combine processes in groups or parent child relationships manually but Erlang doesn’t give you a lot of flexibility to specify the context for execution.
  • Unlike async declared methods in Typescript, Erlang code is not easily composable but you can define client code to wrap send/receive messages so that high level code can be comprehended easily. Also, Erlang processes can be connected with parent-child relationships and you can manage composition via process-supervisor hierarchy.
  • Above code creates a new process for crawling each URL and though the overhead of each process is small but it may use other expensive resources such as network resource. We won’t use such approach for real crawler as it will strain the resources on the website being crawled. Instead, we may need to limit how many concurrent requests can be sent to a given website or maintain delay between successive requests.

Using pmap in Erlang

We can generalize above approach into a general purpose pmap that processes an array (similar to map function in functional languages) concurrently and then waits for their response such as:

-module(pmap).

-export([pmap/3]).

pmap(F, Es, Timeout) ->
   Parent = self(),
   Running = [exec(Parent, F, E) || E <- Es],
   collect(Running, Timeout).

exec(Parent, F, E) ->
    spawn_monitor(fun() -> Parent ! {self(), F(E)} end).

collect([], _Timeout) -> [];
collect([{Pid, MRef} | Next], Timeout) ->
  receive
    {Pid, Res} ->
      erlang:demonitor(MRef, [flush]),
      [{ok, Res} | collect(Next, Timeout)];
    {'DOWN', MRef, process, Pid, Reason} ->
      [{error, Reason} | collect(Next, Timeout)]
  after Timeout ->
    erlang:error({pmap_timeout, Timeout})
  end.

You can download full pmap example from https://github.com/bhatti/concurency-katas/tree/main/erl_pmap.

Elixir

The Elixir language is built upon Erlang BEAM VM and was created by Jose Valim to improve usability of Erlang language and introduce Rubyist syntax instead of Prologist syntax in Erlang language. It also removes some of the boilerplate that you needed in Erlang language and adds higher level abstractions for writing highly concurrent, distributed and fault tolerant applications.

Using a worker-pool and OTP in Elixir

As Elixir uses Erlang VM and runtime system, the application behavior will be similar to Erlang applications but following approach uses a worker pool design where the parent process keeps a list of child-processes and delegates the crawling work to child processes in a round-robin fashion:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  # {:ok, pid} = Crawler.start_link(100000)
  def start_link(size) when is_integer(size) do
    GenServer.start_link(__MODULE__, size)
  end

  def total_crawl_urls(pid) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, 30000)
  end

  ### Public client APIs
  def crawl_urls(pid, urls) when is_pid(pid) and is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self())
  end

  ### Internal client APIs
  def crawl_urls(pid, urls, depth, clientPid) when is_pid(pid) and is_list(urls) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ## init method create pool of workers based on given size
  def init(size) when is_integer(size) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    pids = Map.keys(pid_to_workers)
    {:ok, {pid_to_workers, pids, 0}}
  end

  ## handles crawling
  def handle_cast({:crawl, request}, {pid_to_workers, [pid|rest], total_in}) do
    GenServer.cast(pid, {:crawl, request}) # send request to workers in round-robin fashion
    {:noreply, {pid_to_workers, rest ++ [pid], total_in+1}}
  end

  def handle_call({:total_crawl_urls}, _from, {_, _, total_in} = state) do
    {:reply, total_in, state}
  end

  ## OTP Callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, _, total_in}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)
    pids = Map.keys(new_pid_to_workers)
    {:noreply, {new_pid_to_workers, pids, total_in}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

The parent process in above example defines crawl_urls method for crawling URLs, which is defined as an asynchronous API (handle_cast) and forwards the request to next worker. Following is implementation of the worker:

defmodule Worker do
  @moduledoc """
  Documentation for crawling worker.
  """
  @max_url 11
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"

  use GenServer

  # Client APIs
  def start_link(crawler_pid) when is_pid(crawler_pid) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {crawler_pid, downloader_pid, indexer_pid})
  end

  @doc """
  Crawls web url asynchronously
  """
  def handle_cast({:crawl, request}, {crawler_pid, downloader_pid, indexer_pid}=state) do
    handle_crawl(crawler_pid, downloader_pid, indexer_pid, request)
    {:noreply, state}
  end

  def init(crawler_pid) do
      {:ok, crawler_pid}
  end

  # Internal private methods
  defp handle_crawl(crawler_pid, downloader_pid, indexer_pid, req) do
    res = Result.new(req)
    contents = Downloader.download(downloader_pid, req.url)
    new_contents = Downloader.jsrender(downloader_pid, req.url, contents)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(indexer_pid, req.url, new_contents)
      urls = parse_urls(req.url, new_contents)
      Crawler.crawl_urls(crawler_pid, urls, req.depth+1, req.clientPid)
      send req.clientPid, {:crawl_done, Result.completed(res)}
    else
      send req.clientPid, {:crawl_done, Result.failed(req, :skipped_crawl)}
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

The worker process starts downloader and indexer processes upon start and crawls the URL upon receiving the next request. It then sends back the response to the originator of request using process-id in the request. Following unit tests are used to test the behavior of normal processing, timeouts and cancellation:

defmodule CrawlerTest do
  use ExUnit.Case
  doctest Crawler
  @max_processes 10000
  @max_wait_messages 19032
  @root_urls ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]

  test "test crawling urls" do
    started = System.system_time(:millisecond)
    {:ok, pid} = Crawler.start_link(@max_processes)
    Crawler.crawl_urls(pid, @root_urls)
    wait_until_total_crawl_urls(pid, @max_wait_messages, started)
  end

  defp wait_until_total_crawl_urls(pid, 0, started) do
    n = Crawler.total_crawl_urls(pid)
    elapsed = System.system_time(:millisecond) - started
    IO.puts("Crawled URLs in millis: #{n} #{elapsed}")
    assert n >= @max_wait_messages
  end

  defp wait_until_total_crawl_urls(pid, max, started) do
    if rem(max, 1000) == 0 do
      IO.puts("#{max}...")
    end
    receive do
      {:crawl_done, _} -> wait_until_total_crawl_urls(pid, max-1, started)
    end
  end

end

Following are major benefits of this approach for its support of structured concurrency:

  • The crawl_urls method in parent process defines high level scope of concurrency and it waits for the completion of child tasks.
  • Above implementation also uses timeout similar to the Erlang example to ensure task is completed within given time period.
  • Above implementation also captures the error within the response similar to Erlang for error handling.
  • This approach addresses some of the shortcomings of previous approach in Erlang implementation where a new process was created for each request. Instead a pool of process is used to manage the capacity of resources.

Following are shortcomings using this approach for structured concurrency:

  • This approach also suffers the same drawbacks as Erlang approach regarding cancellation behavior and you will need to implement a cooperative cancellation to cleanup the resources properly.
  • Similar to Erlang, Elixir also doesn’t give you a lot of flexibility to specify the context for execution and it’s not easily composable.

Using async-await in Elixir

Elixir defines abstracts Erlang process with Task when you only need to execute a single action throughout its lifetime. Here is an example that combines Task async/await with pmap implementation:

defmodule Parallel do
  def pmap(collection, func, timeout) do
    collection
    |> Enum.map(&(Task.async(fn -> func.(&1) end)))
    |> Enum.map(fn t -> Task.await(t, timeout) end)
  end
end
defmodule Crawler do
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "ef.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"
  @max_depth 4
  @max_url 11

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def crawl_urls(urls, timeout) when is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    ## Starting external services using OTP for downloading and indexing
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    res = crawl_urls(urls, downloader_pid, indexer_pid, 0, timeout)
    ## Stopping external services using OTP for downloading and indexing
    Process.exit(downloader_pid, :normal)
    Process.exit(indexer_pid, :normal)
    res
  end

  def crawl_urls(urls, downloader_pid, indexer_pid, depth, timeout) when is_list(urls) and is_pid(downloader_pid) and is_pid(indexer_pid) and is_integer(depth) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, downloader_pid, indexer_pid, depth, timeout)))
      Parallel.pmap(requests, &(handle_crawl/1), timeout)
    else
      []
    end
  end

  # Internal private methods
  defp handle_crawl(req) do
    {:ok, contents} = Downloader.download(req.downloader_pid, req.url, req.timeout)
    {:ok, new_contents} = Downloader.jsrender(req.downloader_pid, req.url, contents, req.timeout)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(req.indexer_pid, req.url, new_contents, req.timeout)
      urls = parse_urls(req.url, new_contents)
      res = Crawler.crawl_urls(urls, req.downloader_pid, req.indexer_pid, req.depth+1, req.timeout)
      Enum.reduce(res, 0, &(&1 + &2)) + 1
    else
      0
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

Above example is a bit shorter due to the high level Task abstraction but its design has similar pros/cons as actor and pmap implementation of Erlang example. You can download full source code for this implementation from https://github.com/bhatti/concurency-katas/tree/main/elx_pmap.

Using Queue in Elixir

Following example shows web crawler implementation using queue:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def start_link(size) when is_integer(size) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {size, downloader_pid, indexer_pid})
  end

  ## crawl list of url
  def crawl_urls(pid, urls, timeout) when is_pid(pid) and is_list(urls) and is_integer(timeout) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self(), timeout)
  end

  # returns number of urls crawled
  def total_crawl_urls(pid, timeout) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, timeout)
  end

  ## dequeue returns pops top request from the queue and returns it
  def dequeue(pid) when is_pid(pid) do
    GenServer.call(pid, {:dequeue})
  end

  ###########################################
  ## internal api to crawl urls
  def crawl_urls(pid, urls, depth, clientPid, timeout) when is_pid(pid) and is_list(urls) and is_pid(clientPid) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid, timeout)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ###########################################
  ## init method create pool of workers based on given size
  def init({size, downloader_pid, indexer_pid}) when is_integer(size) and is_pid(downloader_pid) and is_pid(indexer_pid) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    {:ok, {pid_to_workers, :queue.new, 0, 0, downloader_pid, indexer_pid}}
  end

  ## asynchronous server handler for adding request to crawl in the queue
  def handle_cast({:crawl, request}, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    new_queue = :queue.in(request, queue)
    {:noreply, {pid_to_workers, new_queue, total_in+1, total_out, downloader_pid, indexer_pid}}
  end

  ## synchronous server handler for returning total urls crawled
  def handle_call({:total_crawl_urls}, _from, {_, _, _total_in, total_out, _, _} = state) do
    {:reply, total_out, state}
  end

  ## synchronous server handler to pop top request from the queue and returning it
  def handle_call({:dequeue}, _from, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    {head, new_queue} = :queue.out(queue)
    if head == :empty do
      {:reply, {head, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out, downloader_pid, indexer_pid}}
    else
      if rem(:queue.len(queue), 1000) == 0 or rem(total_out+1, 1000) == 0do
        IO.puts("#{total_out+1}...")
      end
      {:value, req} = head
      {:reply, {req, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out+1, downloader_pid, indexer_pid}}
    end
  end

  ## OTP helper callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, queue, total_in, total_out}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)

    {:noreply, {new_pid_to_workers, queue, total_in, total_out}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

You can download full source code of this example from https://github.com/bhatti/concurency-katas/tree/main/elx_queue.

Using Actor model as Abstract Data Structure

As the cost of actors is very small, you can also use it as an abstract data structure or objects that maintains internal state. Alan Kay, the pioneer in object-oriented programming described message-passing, isolation and state encapsulation as foundation of object-oriented design and Joe Armstrong described Erlang as the only object-oriented language. For example, let’s say you need to create a cache of stock quotes using dictionary data structure, which is updated from another source and provides easy access to the latest quotes. You would need to protect access to shared data in multi-threaded environment with synchronization. However, with actor-based design, you may define an actor for each stock symbol that keeps latest value internally and provides API to access or update quote data. This design will remove the need to synchronize shared data structure and will result in better performance.

Overall, Erlang process model is a bit low-level compared to async/await syntax and lacks composition in asynchronous code but it can be designed to provide structured scope, error handling and termination. Further, immutable data structures and message passing obviates the need for locks to protect shared state. Another benefit of Erlang/Elixir is its support of distributed services so it can be used for automatically distributing tasks to remote machines seamlessly.

May 10, 2010

Building a stock quote server in Erlang using Ejabberd, XMPP, Bosh, Exmpp, Strophe and Yaws

Filed under: Erlang — admin @ 1:40 pm

Recently, I have been building a stock quote server at work that publishes financial data using using Ejabberd, XMPP, PubSub, Exmpp and Bosh on the server side and Strophe library on the web application front. I will describe a simplified implementation of the quote server using Yahoo Quotes.

Installation

Download Ejabberd and go through the installation wizad. You will be asked your host name, admin account/password and whether ejabberd would be running in a clustered environment. For this tutorial, we will be running ejabberd on a single. Once installed, you can start the ejabbered server using

 /Applications/ejabberd-2.1.3/bin/ejabberdctl start
 

As, I am using Mac, the actual path on your machine may be different. The ejabbered comes with a web baesd admin tool, that you can access using

 http://<your-host-name>:5280/admin
 

and you would be able to see available nodes, users, etc.


Registering Users

We will be creating two users: producer and consumer, where the former would be used for publishing stock quotes and latter would be used for subscribing quotes on the web side, i.e.,

 sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register producer  producer
 sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register consumer  consumer
 

Debuging with Psi

You can debug XMPP communications using a jabber client such as Psi, which you can download. After you download, you can install and specify your local hostname as a server, e.g.



You can then login using consumer@<your-host-name> with password consumer. As, we will be using PubSub protocol, you can discover available nodes or topics using General->Service Discovery from the menu, e.g.


Downloading Sample Code

I have stored all code needed for this example on http://github.com/bhatti/FQPubSub, that you can checkout using:

 git clone git@github.com:bhatti/FQPubSub.git
 

The sample code depends on exmpp, lhttpc, jsonerl, and yaws modules so after downloading the code, checkout dependent modules using

 git submodule init
 git submodule update
 

Above commands will checkout dependent modules in deps directory.

Building Sample Code

Before building, ensure you have make and autoconf tools installed, then replace <paraclete.local> with your <your-host-name> in docroot/index.html and src/quote_utils.hrl. Then type following command

 make
 

to build all sample code and dependent libraries

Starting Web Server

Though, the web code including Srophe library and Javascript can be run directly in the browser, but you can start Yaws to serve the application as follows:

 erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run web_server start 
 

Note, that the web server will be continuously running, so you can open a separate shell before typing above command.

Publishing Quotes

Create two separate shells and type following command in first shell:

   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start AAPL
 

and following command in second shell

   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start IBM
 

Above commands will start Erlang processes, that will poll Yahoo Quotes every second and publish the quotes on the node AAPL and IBM respectively.

Next point your browser to http://<your-host-name>:8000/, and add “IBM” and “AAPL” symbols, you would then see quotes for both symbols, e.g.

Code under the hood

Now that you are able to run the example, let’s take a look at the code how it works:

Client library for Yahoo Finance

Though, at work we use our own real time stock quote feed, but for this sample I implemented stock quote feed using Yahoo Finance. The src/yquote_client.hrl and src/yquote_client.erl define client API for accessing Yahoo finance service. Here is the Erlang code for requesting the quote using HTTP request and parsing it:

  1 %%%-------------------------------------------------------------------
 
  2 %%% File : yquote_client.erl
  3 %%% Author : Shahzad Bhatti
  4 %%% Purpose : Wrapper Library for Yahoo Stock Quotes
 
  5 %%% Created : May 8, 2010
  6 %%%-------------------------------------------------------------------
  7 
  8 -module(yquote_client).
 
  9 
 10 -author('bhatti@plexobject.com').
 11 
 12 -export([
 13          quote/1
 14         ]).
 
 15 
 16 -record(quote, {
 17         symbol,
 18         price,
 19         change,
 20         volume,
 
 21         avg_daily_volume,
 22         stock_exchange,
 23         market_cap,
 24         book_value,
 25         ebitda,
 26         dividend_per_share,
 
 27         dividend_yield,
 28         earnings_per_share,
 29         week_52_high,
 30         week_52_low,
 31         day_50_moving_avg,
 32         day_200_moving_avg,
 
 33         price_earnings_ratio,
 34         price_earnings_growth_ratio,
 35         price_sales_ratio,
 36         price_book_ratio,
 37         short_ratio}).
 38 
 
 39 
 40 
 41 quote(Symbol) ->
 42     inets:start(),
 43     {ok,{_Status, _Headers, Response}} = http:request(get, {url(Symbol), []},
 
 44         [{timeout, 5000}], [{sync, true}]),
 45 
 46     Values = re:split(Response, "[,\r\n]"),
 47     #quote{
 
 48         symbol = list_to_binary(Symbol),
 49         price = to_float(lists:nth(1, Values)),
 50         change = to_float(lists:nth(2, Values)),
 51         volume = to_integer(lists:nth(3, Values)),
 
 52         avg_daily_volume = to_integer(lists:nth(4, Values)),
 53         stock_exchange = lists:nth(5, Values), % to_string
 54         market_cap = to_float(lists:nth(6, Values)), % B
 
 55         book_value = to_float(lists:nth(7, Values)),
 56         ebitda = to_float(lists:nth(8, Values)), % B
 57         dividend_per_share = to_float(lists:nth(9, Values)),
 
 58         dividend_yield = to_float(lists:nth(10, Values)),
 59         earnings_per_share = to_float(lists:nth(11, Values)),
 60         week_52_high = to_float(lists:nth(12, Values)),
 61         week_52_low = to_float(lists:nth(13, Values)),
 
 62         day_50_moving_avg = to_float(lists:nth(14, Values)),
 63         day_200_moving_avg = to_float(lists:nth(15, Values)),
 64         price_earnings_ratio = to_float(lists:nth(16, Values)),
 65         price_earnings_growth_ratio = to_float(lists:nth(17, Values)),
 
 66         price_sales_ratio = to_float(lists:nth(18, Values)),
 67         price_book_ratio = to_float(lists:nth(19, Values)),
 68         short_ratio = to_float(lists:nth(20, Values))}.
 69 
 
 70 url(Symbol) ->
 71     "http://finance.yahoo.com/d/quotes.csv?s=" ++ Symbol ++ "&f=l1c1va2xj1b4j4dyekjm3m4rr5p5p6s7".
 72 
 
 73 to_float(<<"N/A">>) ->
 74     -1;
 75 to_float(Bin) ->
 76     {Multiplier, Bin1} = case bin_ends_with(Bin, <<$B>>) of
 
 77         true ->
 78             {1000000000, bin_replace(Bin, <<$B>>, <<>>)};
 79         false ->
 80             case bin_ends_with(Bin, <<$M>>) of
 
 81                 true ->
 82                     {1000000, bin_replace(Bin, <<$M>>, <<>>)};
 83                 false ->
 84                     {1,Bin}
 
 85             end
 86     end,
 87     L = binary_to_list(Bin1),
 88     list_to_float(L) * Multiplier.
 
 89 
 90 
 91 
 

Note that I am omitting some code in above listing, as I just wanted to highlight HTTP request and parsing code.

Publishing the Stock Quote

I used exmpp library to communicate with the XMPP server in Erlang. Here is the code for publishing the quotes using Bosh/XMPP protocol:

  1 %%%-------------------------------------------------------------------
 
  2 %%% File : quote_publisher.erl
  3 %%% Author : Shahzad Bhatti
  4 %%% Purpose : OTP server for publishing quotes
 
  5 %%% Created : May 8, 2010
  6 %%%-------------------------------------------------------------------
  7 -module(quote_publisher).
 
  8 
  9 -export([
 10     start/1,
 11     start/5,
 12     stop/1]).
 13 
 
 14 -export([init/5]).
 15 
 16 -include_lib("quote_utils.hrl").
 17 
 18 -record(state, {session, jid, service=?TEST_XMPP_PUBSUB, symbol}).
 
 19 
 20 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 21 %% APIs
 22 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
 
 23 start(Symbol) ->
 24     start(?TEST_XMPP_SERVER, ?TEST_XMPP_PORT, ?PRODUCER_USERNAME,
 25         ?PRODUCER_PASSWORD, Symbol).
 
 26 
 27 start(Host, Port, User, Password, Symbol) ->
 28     spawn(?MODULE, init, [Host, Port, User, Password, Symbol]).
 
 29 
 30 stop(Pid) ->
 31     Pid ! stop.
 32   
 33 init(Host, Port, User, Password, Symbol) ->
 
 34     {ok, {MySession, MyJID}} = quote_utils:connect(Host, Port, User, Password),
 35     State = #state{session=MySession, jid=MyJID, symbol = Symbol},
 
 36     create_symbol_node(State),
 37     loop(State).
 38 
 39 loop(#state{session=MySession, jid=_MyJID, service = _Service,
 
 40         symbol = _Symbol}=State) ->
 41     receive
 42         stop ->
 43             quote_utils:disconnect(MySession);
 
 44         Record = #received_packet{packet_type=message, raw_packet=_Packet} ->
 45             loop(State);
 46         Record ->
 
 47             loop(State)
 48     after 2000 ->
 49         publish_quote(State),
 50         loop(State)
 
 51     end.
 52 
 53 create_symbol_node(#state{session=MySession, jid=MyJID, service = Service,
 
 54         symbol = Symbol}) ->
 55     IQ = exmpp_client_pubsub:create_node(Service, Symbol),
 56     PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),
 
 57     PacketId2 = erlang:binary_to_list(PacketId),
 58     receive #received_packet{id=PacketId2, raw_packet=Raw} ->
 
 59       case exmpp_iq:is_error(Raw) of
 60         true -> {error, Raw};
 61         _ -> ok
 
 62       end
 63     end.
 64   
 65 publish_quote(#state{session=MySession, jid=MyJID, service = Service, symbol = Symbol}) ->
 
 66     Quote = yquote_client:quote(Symbol),
 67     JsonQuote = ?record_to_json(quote, Quote),
 68     M = exmpp_xml:element(?QUOTE_DATA),
 
 69     IQ = exmpp_client_pubsub:publish(Service, Symbol, exmpp_xml:append_cdata(M,
 70             JsonQuote)),
 71     Xml = exmpp_stanza:set_id(exmpp_stanza:set_sender(IQ, MyJID), Symbol),
 
 72     PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),
 73     PacketId2 = erlang:binary_to_list(PacketId),
 
 74     receive #received_packet{id=PacketId2, raw_packet=Raw} ->
 75       case exmpp_iq:is_error(Raw) of
 
 76         true -> error;
 77         _ -> ok
 78       end
 79     end.
 
 80 
 81 
 82 
 

In above code, a process is created for each symbol, which periodically polls stock quote and publishes it to the XMPP node using pubsub/bosh protocol. Note that a unique node is created for each symbol and node must be created before anyone can publish or subscribe. Also, note that publish/subscribe APIs use request/ack protocol, so after sending the request, the process retrieves the acknowledgement of the request.

Here are some utility functions used by the publisher:

  1 -module(quote_utils).
 
  2   
  3 -include_lib("quote_utils.hrl").
  4 
  5 -export([
  6     init_session/2,
 
  7     connect/4,
  8     disconnect/1]).
  9 
 10 bosh_url(Host, Port) ->
 
 11     "http://" ++ Host ++ ":" ++ integer_to_list(Port) ++ "/http-bind".
 12 
 
 13 
 14 connect(Host, _Port, User, Password) ->
 15     safe_start_apps(),
 
 16     MySession = exmpp_session:start({1,0}),
 17     exmpp_xml:start_parser(), %% Create XMPP ID (Session Key):
 18     MyJID = exmpp_jid:make(User, Host, random),
 
 19     %% Create a new session with basic (digest) authentication:
 20     exmpp_session:auth_basic_digest(MySession, MyJID, Password),
 21     
 
 22     
 23     {ok, _StreamId, _Features} = exmpp_session:connect_BOSH(MySession, bosh_url(Host, 5280), Host, []),
 
 24     try quote_utils:init_session(MySession, Password)
 25     catch
 26         _:Error -> io:format("got error: ~p~n", [Error]), {error, Error}
 
 27     end,
 28     {ok, {MySession, MyJID}}.
 29 
 30 init_session(MySession, Password) ->
 
 31     %% Login with defined JID / Authentication:
 32     try exmpp_session:login(MySession, "PLAIN")
 33     catch
 
 34         throw:{auth_error, 'not-authorized'} ->
 35         %% Try creating a new user:
 36         io:format("Register~n",[]),
 37         %% In a real life client, we should trap error case here
 
 38         %% and print the correct message.
 39         exmpp_session:register_account(MySession, Password),
 40         %% After registration, retry to login:
 
 41         exmpp_session:login(MySession)
 42     end,
 43     %% We explicitely send presence:
 44     exmpp_session:send_packet(MySession, exmpp_presence:set_status(exmpp_presence:available(), "Ready to publish!!!")),
 
 45     ok.
 46 
 47 disconnect(MySession) ->
 48     exmpp_session:stop(MySession).
 49 
 
 50 safe_start_apps() ->
 51     try start_apps()
 52     catch
 53         _:Error -> io:format("apps already started : ~p~n", [Error]), {error, Error}
 
 54     end.
 55 
 56 start_apps() ->
 57     ok = application:start(exmpp),
 58     ok = application:start(crypto),
 59     ok = application:start(ssl),
 
 60     ok = application:start(lhttpc).
 61 
 

Note that above code auto-registers users, which is not recommended for production use.

Javascript code using Strophe library

The web application depends on jQuery, Strophe and Strophe Pubsub. These libraries are included in docroot directory that are imported by index.html. The Strophe library and ejabbered 2.1.3 version supports cross domain scripting so that bosh service here doesn’t need to be on the same domain/port, but it must have a /crossdomain.xml policy file that allows access from wherever index.html lives. The Javascript initializes the connection parameter as follows (you would have to change Host):

   1 <script type="text/javascript">
 
   2     // The BOSH_SERVICE here doesn't need to be on the same domain/port, but
 
   3     // it must have a /crossdomain.xml policy file that allows access from
 
   4     // wherever crossdomain.html lives.
   5     // TODO: REPLACE <paraclete.local> with your <host-name>
 
   6     var HOST = 'paraclete.local';
   7     var JID = 'consumer@' + HOST;
 
   8     var PASSWORD = 'consumer';
   9     var BOSH_SERVICE = 'http://' + HOST + ':5280/http-bind'; //'/xmpp-httpbind'
 
  10     var PUBSUB = 'pubsub.' + HOST;
  11     var connection = null;
 
  12     var autoReconnect = true;
  13     var hasQuotes = [];
  14     var subscriptions = [];
 
  15   
  16     function log(msg) {
  17         $('#log').append('<div></div>').append(document.createTextNode(msg));
 
  18     }
  19   
  20     function rawInput(data) {
  21         //log('RECV: ' + data);
 
  22     }
  23     
  24     function rawOutput(data) {
  25         //log('SENT: ' + data);
 
  26     }
  27     function onQuote(stanza) {
  28         //log('onQuote###### ' + stanza);
 
  29         try {
  30             $(stanza).find('event items item data').each(function(idx, elem) {
  31                 quote = jQuery.parseJSON($(elem).text());
 
  32                 //{"price":235.86,"change":-10.39,"volume":59857756,"avg_daily_volume":20775600,"stock_exchange":[78,97,115,100,97,113,78,77],"market_cap":2.146e+11,
 
  33                 //"book_value":43.257,"ebitda":1.5805e+10,"dividend_per_share":0.0,"dividend_yield":-1,"earnings_per_share":11.796,"week_52_high":272.46,"week_52_low":119.38,
 
  34                 //"day_50_moving_avg":245.206,"day_200_moving_avg":214.119,"price_earnings_ratio":20.88,"price_earnings_growth_ratio":1.05,"price_sales_ratio":4.38,
 
  35                 //"price_book_ratio":5.69,"short_ratio":0.7}
  36                 if (hasQuotes[quote.symbol] != undefined) {
 
  37                     $('price_' + quote.symbol).innerHTML = quote.price;
  38                     $('change_' + quote.symbol).innerHTML = quote.change;
  39                     $('volume_' + quote.symbol).innerHTML = quote.volume;
 
  40                 } else {
  41                     hasQuotes[quote.symbol] = true;
  42                     $('#quotesTable > tbody:last').append('<tr id="quote_' +
 
  43                         quote.symbol + '"><td>' + quote.symbol +
  44                         '</td><td id="price_' + quote.symbol + '">' + quote.price +
 
  45                         '</td><td id="change_' + quote.symbol + '" class="class_change_' + quote.symbol + '">' +
  46                         quote.change + '</td><td id="volume_' +
 
  47                         quote.symbol + '">' +
  48                         quote.volume + '</td></tr>');
  49                 }
 
  50 
  51                 if(quote.change < 0) {
  52                     $('.class_change_' + quote.symbol).css('color', 'red');
 
  53                 } else {
  54                     $('.class_change_' + quote.symbol).css('color', 'green');
 
  55                 }
  56             });
  57         } catch (e) {
  58             log(e)
 
  59         }
  60         return true;
  61     }
  62 
 
  63     function handleSubscriptionChange (stanza) {
  64         //log("***handleSubscriptionChange Received: " + stanza);
 
  65     }
  66         
  67     function onConnect(status) {
  68         if (status == Strophe.Status.CONNECTING) {
 
  69             log('Strophe is connecting.');
  70         } else if (status == Strophe.Status.CONNFAIL) {
  71             log('Strophe failed to connect.');
 
  72             $('#connect').get(0).value = 'connect';
  73         } else if (status == Strophe.Status.DISCONNECTING) {
 
  74             log('Strophe is disconnecting.');
  75         } else if (status == Strophe.Status.DISCONNECTED) {
  76             if (autoReconnect) {
 
  77                 log( "Streaming disconnected. Trying to reconnect...", METHODNAME );
  78                 connection.connect($('#jid').get(0).value, $('#pass').get(0).value, onConnect);
  79                 log( "Streaming reconnected.", METHODNAME );
 
  80             } else {
  81                 log('Strophe is disconnected.');
  82                 $('#connect').get(0).value = 'connect';
 
  83                 //publishEvent( "streamingDisconnected" );
  84             }
  85         } else if (status == Strophe.Status.CONNECTED) {
 
  86             log('Strophe is connected.');
  87             //log('QUOTE_BOT: Send a message to ' + connection.jid + ' to talk to me.');
 
  88             connection.addHandler(onMessage, null, 'message', null, null, null);
  89             connection.send($pres().tree());
 
  90             publishEvent( "streamingConnected" );
  91         }
  92     }
  93 
 
  94     function subscribe(symbol) {
  95         if (subscriptions[symbol]) return;
  96         try {
 
  97             connection.pubsub.subscribe(JID, PUBSUB, symbol, [], onQuote, handleSubscriptionChange);
  98             subscriptions[symbol] = true;
  99             log("Subscribed to " + symbol);
 
 100         } catch (e) {
 101             alert(e)
 102         }
 103     }
 104     function unsubscribe(symbol) {
 
 105         if (!subscriptions[symbol]) return;
 106         try {
 107             connection.pubsub.unsubscribe(JID, PUBSUB, symbol, handleSubscriptionChange);
 108             subscriptions[symbol] = false;
 
 109             log("Unsubscribed from " + symbol);
 110         } catch (e) {
 111             alert(e)
 112         }
 
 113     }
 114   
 115     function onMessage(msg) {
 116         var to = msg.getAttribute('to');
 
 117         var from = msg.getAttribute('from');
 118         var type = msg.getAttribute('type');
 119         var elems = msg.getElementsByTagName('body');
 
 120   
 121         if (type == "chat" && elems.length > 0) {
 122             var body = elems[0];
 
 123             log('QUOTE_BOT: I got a message from ' + from + ': ' + Strophe.getText(body));
 124             var reply = $msg({to: from, from: to, type: 'chat'}).cnode(Strophe.copyElement(body));
 125             connection.send(reply.tree());
 
 126             log('QUOTE_BOT: I sent ' + from + ': ' + Strophe.getText(body));
 127         }
 128         // we must return true to keep the handler alive.
 
 129         // returning false would remove it after it finishes.
 
 130         return true;
 131     }
 132  
 133     $(document).ready(function () {
 
 134         connection = new Strophe.Connection(BOSH_SERVICE);
 135         connection.rawInput = rawInput;
 136         connection.rawOutput = rawOutput;
 137         connection.connect(JID, PASSWORD, onConnect);
 138         //connection.disconnect();
 
 139         $('#add_symbol').bind('click', function () {
 140             var symbol = $('#symbol').get(0).value;
 
 141             subscribe(symbol);
 142         });
 143     });
 144 
 145 </script>
 146 
 
 

When the document is loaded, the connection to the ejabberd server is established. Here is the form and table that is used to add subscription and display current quote information for the symbols:

  1 <form name='symbols'>
 
  2     <label for='symbol'>Symbol:</label>
  3     <input type='text' id='symbol'/>
 
  4     <input type='button' id='add_symbol' value='add' />
 
  5 </form>
  6 <hr />
  7 <div id='log'></div>
 
  8 <table id="quotesTable" width="600" border="2" bordercolor="#333333">
 
  9     <thead>
 10         <tr>
 11             <th>Symbol</th>
 
 12             <th>Price</th>
 13             <th>Change</th>
 14             <th>Volume</th>
 
 15         </tr>
 16     </thead>
 17     <tbody>
 18     </tbody>
 
 19 </table>
 20 
 

When the form is submitted, it calls subscribe method, which in turn sends request to the ejabbered server for subscription. When a new quote is received, it calls onQuote function, which inserts a row in the table when a new symbol is added or updates the quote information if it already exists.

Conclusion

The ejabberd, XMPP, exmpp, Bosh and Strophe provides a robust and mature solution for messaging and are especially suitable for web applications that want to build highly scalable and interactive applications. Though, above code is fairly simple, but same design principles can be used to support large number of stock quotes updates. As, we need to send stock quotes from tens of thousands symbols for every tick within a fraction of a second, the Erlang provides very scalable solution, where each symbol is simply served by an Erlang process. Finally, I am still learning more about Ejabberd’s clustering, security, and other features so that it can truly survive the production load, so I would love to hear any feedback you might have with similar systems.

References


May 20, 2008

Rebooting philosophy in Erlang

Filed under: Erlang — admin @ 10:49 am

I just read “Let It Crash” Programming, which talks about how Erlang is designed as a fault tolerant language from ground up. I have been learning Erlang since Joe Armstrong’s book came out and have heard Joe a few times talk about fault tolerance. Steve Vionski has also talked about Erlang: It.s About Reliability in flame war between him and Ted Neward. For me, Erlang reminds of Microsoft Windows, i.e. when Windows stops working I just reboot the machine. Erlang does the same thing, when some process fails, it just restarts the processes. About sixteen years ago, I started my career in old VAX, Mainframe and UNIX environments and my managers used to say that he never had to restart Mainframe if something fails, but somehow bugs on Windows get fixed after reboot. When I worked at Fermilab in mid 90s, we had server farms of hundreds of machines and fault tolerance was quite important. Though, Google didn’t invent server farms, but it scaled them to new level, where failure of machines don’t stop the entire application. Erlang takes the same philosophy to the programming language. Obviously, in order to make truly fault tolerant application, the Erlang processes will need to be spawned on separate machines. Erlang’s support of CSP style communication and distributed computing such as OTP makes it trivial. You can further increase fault tolerance and high availibility by using machines on separate racks, networks, power sources or data centers. No wonder, Facebook is using Erlang in its Chat application.

December 23, 2007

Released ErlSDB 0.1

Filed under: Erlang,SimpleDB,Web Services — admin @ 7:09 pm

I started working on an Erlang library to access Amazon’s SimpleDB web service and I released an early version of the library this weekend. Here are some notes on its usage:
Installing

svn checkout http://erlsdb.googlecode.com/svn/trunk/ erlsdb-read-only

Building

make

Testing

edit Makefile and add access key and secret key, then type make test

Usage

Take a look at test/erlsdb_test.erl to learn usage, here is a sample code

Starting Server

erlsdb:start(type,
    	[#sdb_state{
		access_key = "YourAccessKey",
		secret_key = "YourSecretKey",
		domain = "YourDomain"
		}
	])

Creating Domain

    erlsdb:create_domain()

Note that the server will use the domain that was passed during initialization.

Listing all Domains

    {ok, List, _} = erlsdb:list_domains()

Deleting Domain

    erlsdb:delete_domain()

Adding an item

    Attributes = lists:sort([
	["StreetAddress", "705 5th Ave"],
        ["City", "Seattle"],
        ["State", "WA"],
        ["Zip", "98101"]
	]),
    erlsdb:put_attributes("TccAddr", Attributes)

Retrieving an item

    {ok, UnsortedAttrs} = erlsdb:get_attributes("TccAddr")

Deleting an item

    erlsdb:delete_attributes("TccAddr"),

Powered by WordPress