Implementing Actor-based message passing using Object-oriented methods

I have blogged about use of actor based programming for parallel and concurrent programming earlier [ 1 , 2 ]. An actor has its own thread of execution with a mailbox. It communicates to other actors by sending messages that are delivered to its mailbox. In this model, the message passing takes literal form rather than method invocation. I studied Actor based parallel computing when I worked on my post graduate research on parallel and distributed field in mid 90s. I learned message passing systems such as Actors, MPI, MPL were much more scalable than shared memory. This describes popularity of languges like Erlang and Scala that has adopted Actor model to tackle concurrency. I wrote about object-oriented nature in Erlang more than year ago. According to Ralph Johnson concurrent Erlang is object oriented. Alan Kay has also argued that message passing is more important than objects in object-oriented programming and regrets that he chose the term “object-oriented” instead of “message-passing”. Even Joe Armstrong, who generally dislikes object oriented programming admits similarity between object-oriened programming and Erlang’s message passing.

However, despite effectiveness of message passing in new era of concurrency, I find that pure message passing is a bit intimidating to an average programmer. In this blog, I will show how an object-oriented methods can be used to abstract actor-based message passing. As, primary benefit of message passing alongside with immutability is its support of concurrency so the following example will demonstrate use of message passing in multiple threads. Let’s suppose, we are designing a simple chat service to send and receive messages and the message looks like following:

package actor;

 public class ChatMessage {
     public final String fromUser;
     public final String toUser;
     public final String message;

     public ChatMessage(String fromUser, String toUser, String message) {
         this.fromUser = fromUser;
         this.toUser = toUser;
         this.message = message;
     }
 }
 

Following is an interface for the chat service:

package actor;

 import java.util.Collection;

 public interface ChatService {
     /**
      * sends a message to another user
      * @param message - message object
      */
     @Asynchronous
     public void sendMessage(ChatMessage message);

     /**
      * retrieves messages that were sent to the user.
      * @param user - target user who received the messages
      * @return - collection of messages
      */
     public Collection<ChatMessage> getMessages(String user);

     /**
      * remove all messages
      */
     @Asynchronous
     public void clear();

     /**
      * return number of messages
      */
     public long countMessages();

     /**
      * @throws IllegalArgumentException
      */
     public void raiseError() throws IllegalArgumentException;

 }
 

Note that above interface uses “@Asynchronous” annotation for “sendMessage” and “clear” methods. One of the benefit of message-passing is that you can optionally send asynchronous messages. So I defined an annotation to declare a particular method as asynchronous, e.g.

package actor;
 import java.lang.reflect.*;
 import java.lang.annotation.*;

 @Target({ElementType.TYPE, ElementType.METHOD})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface Asynchronous {
 }
 

Following code implements the chat service in non-thread-safe fashion:

package actor;

 import java.util.Collection;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;

 /**
  * Simple non-thread-safe implementation of ChatService
  */
 public class ChatServiceImpl implements ChatService {
     private long count;
     private Map<String, Collection<ChatMessage>> messagesByUsers;

     public ChatServiceImpl() {
         messagesByUsers = new HashMap<String, Collection<ChatMessage>>();
     }

     /**
      * sends a message to another user
      * @param message - message object
      */
     public void sendMessage(ChatMessage message) {
         getMessages(message.toUser).add(message);
         count++;
     }

     /**
      * retrieves messages that were sent to the user.
      * @param user - target user who received the messages
      * @return - collection of messages
      */
     public Collection<ChatMessage> getMessages(String user) {
         Collection<ChatMessage> messages = messagesByUsers.get(user);
         if (messages == null) {
             messages = new ArrayList<ChatMessage>();
             messagesByUsers.put(user, messages);
         }
         return messages;
     }

     /**
      * remove all messages
      */
     public void clear() {
         messagesByUsers.clear();
     }

     /**
      * return number of messages
      */
     public long countMessages() {
         return count;
     }

     /**
      * @throws IllegalArgumentException
      */
     public void raiseError() throws IllegalArgumentException {
         throw new IllegalArgumentException();
     }

 }
 

Note that above code cannot be used in multi-threading environment. If we were building thread-safe service using Java, we would have to add synchronization or locking such as:

package actor;

 import java.util.Collection;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;

 /**
  * Simple thread-safe implementation of ChatService
  */
 public class SafeChatServiceImpl implements ChatService {
     private ChatService delegate;

     public SafeChatServiceImpl(ChatService delegate) {
         this.delegate = delegate;
     }

     /**
      * sends a message to another user
      * @param message - message object
      */
     public synchronized void sendMessage(ChatMessage message) {
         delegate.sendMessage(message);
     }

     /**
      * retrieves messages that were sent to the user.
      * @param user - target user who received the messages
      * @return - collection of messages
      */
     public synchronized Collection<ChatMessage> getMessages(String user) {
         return delegate.getMessages(user);
     }

     /**
      * remove all messages
      */
     public synchronized void clear() {
         delegate.clear();
     }

     /**
      * return number of messages
      */
     public synchronized long countMessages() {
         return delegate.countMessages();
     }

     /**
      * @throws IllegalArgumentException
      */
     public synchronized void raiseError() throws IllegalArgumentException {
         delegate.raiseError();
     }

 }
 

Above code demonstrates use of composition to convert a non-thread-safe class into a thread-safe class.

Now the meat of this library is following factory class that converts any Java object into an actor:

package actor;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;

 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ArrayBlockingQueue;

 /**
  * Facory class to convert any POJO class into Actor.
  */
 public class ActorFactory {
     private static final int MAIL_BOX_SIZE = 100;
     private static class MethodRequest {
         private final Object object;
         private final Method method;
         private final Object[] args;
         private final BlockingQueue<MethodResponse> replyQueue;
         private MethodRequest(Object object, Method method, Object[] args, BlockingQueue<MethodResponse> replyQueue) {
             this.object = object;
             this.method = method;
             this.args = args;
             this.replyQueue = replyQueue;
         }
     }

     private static class MethodResponse {
         private Object response;
         private Exception exception;
         private MethodResponse(Object response) {
             this.response = response;
         }
         private MethodResponse(Exception exception) {
             this.exception = exception;
         }
         Object getResponse() throws Exception {
             if (exception != null) throw exception;
             return response;
         }
     }

     private static class ActorThread extends Thread {
         private volatile boolean shutdown;
         private BlockingQueue<MethodRequest> queue;
         private ActorThread(BlockingQueue<MethodRequest> queue) {
             this.queue = queue;
             setDaemon(true);
         }
         public void run() {
             while (!shutdown) {
                 try {
                     MethodRequest request = queue.take();
                     try {
                         Object response = request.method.invoke(request.object, request.args);
                         if (request.replyQueue != null) {
                             request.replyQueue.put(new MethodResponse(response));
                         }
                     } catch (InvocationTargetException e) {
                         if (request.replyQueue != null) {
                             request.replyQueue.put(new MethodResponse((Exception) e.getCause()));
                         } else {
                             e.printStackTrace();
                         }
                     } catch (Exception e) {
                         if (request.replyQueue != null) {
                             request.replyQueue.put(new MethodResponse(e));
                         } else {
                             e.printStackTrace();
                         }
                     }
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupted();
                 }
             }
         }
     }

     public static Object create(final Class ifaceClass, final Object object) {
         final BlockingQueue<MethodRequest> queue = new ArrayBlockingQueue<MethodRequest>(MAIL_BOX_SIZE);
         new ActorThread(queue).start();
         //
         final InvocationHandler handler = new InvocationHandler() {
             public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
                 if (method.isAnnotationPresent(Asynchronous.class)) {
                     queue.put(new MethodRequest(object, method, args, null));
                     return null;
                 } else {
                     final BlockingQueue<MethodResponse> replyQueue = new ArrayBlockingQueue<MethodResponse>(1);
                     queue.put(new MethodRequest(object, method, args, replyQueue));
                     return replyQueue.take().getResponse();
                 }
             }
         };
         return ifaceClass.cast(Proxy.newProxyInstance(ifaceClass.getClassLoader(), new Class<?>[] {ifaceClass}, handler));
     }

     public static Object createWithoutActor(final Class ifaceClass, final Object object) {
         final InvocationHandler handler = new InvocationHandler() {
             public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
                 Object response = method.invoke(object, args);
                 return response;
             }
         };
         return ifaceClass.cast(Proxy.newProxyInstance(ifaceClass.getClassLoader(), new Class<?>[] {ifaceClass}, handler));
     }
 }
 

The ActoryFactory uses reflection and proxy features of Java. The client code creates an actor by invoking

      public static Object create(final Class ifaceClass, final Object object) {
 

e.g.

         ChatService service = (ChatService) ActorFactory.create(ChatService.class, new ChatServiceImpl());
 

The create method instantiates a blocking queue that serves as mailbox for the actor and then starts a thread. The create method then instantiates InvocationHandler that intercepts all methods. Finally, the proxy object is returned to the client. When the client invokes any method it is intercepted by the proxy, which then sends the message to the mailbox of the actor. The proxy also checks if the method i defined as asynchronous then it does not wait for the reply. The thread simply waits until there is a message in the mailbox. The message for the actor is defined in MethodRequest that encapsulates reflection information to invoke a method. The actor thread invokes the method using reflection and puts back reply in the queue supplied by the proxy. The proxy waits for the reply and then sends back the reply back to the client. As an actor serves one request at a time, there is no need for locking or synchronization.

Following code demonstrate client test code:

package actor;
 import java.util.Collection;

 import junit.framework.*;

 public class ActorTest extends TestCase {
     private static ChatService service;
     private static final int NUM_THREADS = 100;
     private static final int NUM_MESSAGES = 100;
     protected void setUp() throws Exception {
         super.setUp();
         
     }

     public void testSendGetMessages() throws Exception {
         for (int i=0; i<NUM_MESSAGES; i++) {
             final String suffix = Thread.currentThread().getName() + "--" + i;
             ChatMessage janeToAdamMessage = new ChatMessage("jane" + suffix, "adam" + suffix, "hi there" + suffix);
             service.sendMessage(janeToAdamMessage);
             ChatMessage adamToJaneMessage = new ChatMessage("adam" + suffix, "jane" + suffix, "hi there too"  +suffix);
             service.sendMessage(adamToJaneMessage);
             Collection<ChatMessage> janeMessages = service.getMessages("jane" + suffix);
             assertEquals(1, janeMessages.size());
             assertEquals("hi there too" + suffix, janeMessages.iterator().next().message);

             Collection<ChatMessage> adamMessages = service.getMessages("adam" + suffix);
             assertEquals(1, adamMessages.size());
             assertEquals("hi there" + suffix, adamMessages.iterator().next().message);
         }
     }

     public void testError() throws Exception {
         try {
             service.raiseError();
             fail("should have thrown IllegalArgumentException");
         } catch (IllegalArgumentException e) {
         }
     }
     public static Test suite ( ) {
         TestSuite suite= new TestSuite(ActorTest.class);
         return suite;
     }

     private static void runTestsWith(ChatService svc, String logMessage) throws InterruptedException {
         service = svc;
         long started = System.currentTimeMillis();
         Thread[] threads = new Thread[NUM_THREADS];
         for (int i=0; i<NUM_THREADS; i++) {
             threads[i] = new Thread(new Runnable() {
                 public void run() {
                     junit.textui.TestRunner.run(suite());
               }
             });
             threads[i].start();
         }
         for (int i=0; i<NUM_THREADS; i++) {
             threads[i].join();
         }
         assertEquals(NUM_THREADS*NUM_MESSAGES*2, service.countMessages());
         long elapsed = System.currentTimeMillis() - started;
         System.out.println("Completed " + logMessage + " in " + elapsed + " millis");
     }

     public static void main (String[] args) throws Exception {
         runTestsWith(new SafeChatServiceImpl(new ChatServiceImpl()), "thread-safe direct");
         runTestsWith((ChatService) ActorFactory.create(ChatService.class, new ChatServiceImpl()), "Actor model");
         runTestsWith((ChatService) ActorFactory.createWithoutActor(ChatService.class, new SafeChatServiceImpl(new ChatServiceImpl())), "thread-safe reflection");
     }
 }
 

Above class demonstrates a variety of way you can instantiate service class, e.g. by simplying invoking the constructor, using reflection and using actors. Note that you cannot simply run non-thread-safe implementation because it would produce incorrect results. For each type of service, the test creates 100 threads that try to send and receive messages. By the end of each test, 200,000 messages are sent and retrieved. Following are runtimes for the service invocation:

 Completed thread-safe direct in 286 millis
 Completed Actor model in 1412 millis
 Completed thread-safe reflection in 693 millis
 

As expected, using builtin synchronization yields the fastest response, reflection adds some overhead and finally actor based communication adds considerable more overhead. One of the feature of Erlang is its support of seamless networking support. Using an abstraction layer as above can easily be enhanced to start an actor on remote machine. Though, in that case the service will only exist on the remote server. As threads in Java are natives and map to light-weight process, they are much more heavy weight than process in Erlang, which only consumes 300 bytes per process. I have found that native threads are more suitable for CPU-bound processing and green or user-based threads suit well for IO-bound processing. In the end, biggest gain of above design from programmer’s perspective is that the client code looks exactly like any other object oriented and unlike Erlang you get real type safety in message passing. Another big advantage is use of polymorphism, above technique uses dependency inversion principle to interface and implementation separate, thus actual implementation can be changed as needed.

You can download all code from here

contact me using:

"bhatti AT plexobject DOT com". or tweet me at "http://twitter.com/bhatti_shahzad".