Shahzad Bhatti Welcome to my ramblings and rants!

September 6, 2008

Rails save the day

Filed under: Ruby — admin @ 12:52 pm

In our daily scrum meeting at work last week, we identified one of application that had been crashing often. The application was pretty simple web application written in Java using Spring-MVC, Hibernate, Spring and Apache-commons libraries. However, the developer who released it embedded that application in another much bigger web application in order to save some deployment issues, which can be real pain at amazon. So, I volunteered to address the issue and rewrite the application in JRuby on Rails (2.1). It took me a day to rewrite all application and migrate all data from the old application. Along the way, I also added a couple of some additional features. The original application took a couple of weeks to develop and was consisted of over 6500 lines of code. The ruby version consisted a little less than 800 lines of code. I have been using Ruby on my own since 2004 and started learning Rails in 2005. Though, I started using Rails a bit more on some side projects after I attended Rails conference of 2006. I still use Java for most of the work, and was delighted to show speed of development with Rails at work. Since, the deployment issues are still pain in Rails so I chose JRuby that allows Rails apps to deploy on Tomcat, which we are using for most of the projects. Hopefuly, that will fix the stability issues and save me from pagers as I will be oncall next week.

August 27, 2008

Implementing HTTP Proxy Service with XSL Transformation

Filed under: Java — admin @ 10:19 pm

Recently I had to implement a portal that embedded contents from other web applications. I tried to build a framework for easily adding remote applications without any changes to the remote applications. The framework had following design:

  • For different functionality and pages we stored feature-link and url in configuration, that configuration basically allowed us to find where to
    get the HTML/XHTML render page or form.

  • However, instead of displaying raw HTML, I first converted it into valid XHTML using Tidy and then transformed such that all form submissions
    go through our proxy server. As part of the transformation, I added original form URLs, methods and cookies as hidden fields. This allowed
    me to keep the proxy server without any client state, which was nice for clustering.

  • When the form is submitted, the proxy server intercepts it and then makes the request to the remote application and sends back response. The proxy
    server reads the original target URLs, method types, cookies from the form so that remote application can manage state if it’s using Sessions.

Following are a few simplified classes that I developed for the proxy framework:

Content Transformation

First, I developed HTML to XHTML conversion and transformation to modify forms. Following is the XSLT that I used:

  1 <xsl:stylesheet
 
  2   xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
  3   version="1.0"
  4   xmlns:xhtml="http://www.w3.org/1999/xhtml"
 
  5   xmlns="http://www.w3.org/1999/xhtml"
  6   exclude-result-prefixes="xhtml">
  7 
 
  8   <xsl:param name="callbackHandler"/>
  9   <xsl:param name="callbackUser"/>
 
 10   <xsl:param name="callbackState"/>
 11 
 12   <xsl:template match="@* | node()">
 
 13     <xsl:copy>
 14       <xsl:apply-templates select="@* | node()"/>
 15     </xsl:copy>
 
 16   </xsl:template>
 17 
 18   <xsl:template match="form">
 19     <xsl:copy>
 
 20       <xsl:apply-templates select="@*"/>
 21       <xsl:attribute name="action" xmlns:java="http://xml.apache.org/xslt/java">
 
 22             <xsl:param name="callbackOriginalActionUrl" select="@action"/>
 23             <xsl:text disable-output-escaping="yes">/web/myproxy?myarg=xxx&amp;otherarg=2</xsl:text>
 
 24             <xsl:value-of select="java:com.plexobject.transform.XslContentTransformer.setAction($callbackHandler, string(@action))" />
 25       </xsl:attribute>
 26       <xsl:attribute name="method" xmlns:java="http://xml.apache.org/xslt/java">
 
 27             <xsl:param name="callbackOriginalMethodType" select="@method"/>
 28             <xsl:text disable-output-escaping="yes">POST</xsl:text>
 
 29             <xsl:value-of select="java:com.plexobject.transform.XslContentTransformer.setMethod($callbackHandler, string(@method))" />
 30       </xsl:attribute>
 31       <xsl:attribute name="id">_Form</xsl:attribute>
 
 32       <xsl:attribute name="name">_Form</xsl:attribute>
 33         <input type="hidden" name="_user" value="{$callbackUser}"/>
 
 34         <input type="hidden" name="_originalActionUrl" value="{@action}"/>
 
 35         <input type="hidden" name="_orginalMethodType" value="{@method}"/>
 
 36         <input type="hidden" name="_userState" value="{$callbackState}"/>
 
 37       <xsl:apply-templates select="node()"/>
 38     </xsl:copy>
 39   </xsl:template>
 
 40 
 41 
 42   <xsl:template match="title"/>
 43 
 44 
 
 45 </xsl:stylesheet>
 46 
 47 
 48 
 

A few things note

  • xsl:param allows passing parameters from the runtime (Java)
  • xsl:template is matching for “form” tag and replaces action/method attributes and adds id/name attributes. It then adds a few input hidden fields
  • Finally, I am removing title tag

ContentTransformer interface

  1 package com.plexobject.transform;
 
  2 
  3 import java.util.Map;
  4 
  5 public interface ContentTransformer {
 
  6     /**
  7      * This method transforms given contents
 
  8      * 
  9      * @param contents
 10      *            - input contents
 
 11      * @param properties
 12      *            - input/output properties for transformation
 
 13      * @return transformed contents
 14      * @throws TransformationException
 
 15      *             - when error occurs while transforming content.
 
 16      */
 17     public String transform(String contents, Map<String, String> properties)
 18             throws TransformationException;
 
 19 }
 20 
 21 
 

ContentTransformer implementation

A few things to note in following implementation:

  • I use JTidy to convert HTML to XHTML
  • I pass some of the parameters to the XSL stylesheet and I also read a few properties back. Though, reading properties back is a bit kludgy but it works.
   1 package com.plexobject.transform;
 
   2 
   3 import java.io.ByteArrayInputStream;
   4 import java.io.ByteArrayOutputStream;
   5 import java.io.InputStream;
 
   6 import java.util.HashMap;
   7 import java.util.Map;
   8 
   9 import javax.xml.transform.Result;
 
  10 import javax.xml.transform.Source;
  11 import javax.xml.transform.Transformer;
  12 import javax.xml.transform.TransformerException;
 
  13 import javax.xml.transform.TransformerFactory;
  14 import javax.xml.transform.stream.StreamResult;
  15 import javax.xml.transform.stream.StreamSource;
 
  16 
  17 import org.w3c.tidy.Tidy;
  18 
  19 public class XslContentTransformer implements ContentTransformer {
 
  20     public static final String ACTION = "form_action";
  21     public static final String METHOD = "form_method";
 
  22 
  23     private static final Map<String, Map<String, String>> xslProperties = new HashMap<String, Map<String, String>>();
 
  24     private volatile Transformer transformer;
  25     private final String xslUri;
  26     private final boolean useTidy;
 
  27 
  28     public XslContentTransformer(final String xslUri, final boolean useTidy) {
 
  29         this.xslUri = xslUri;
  30         this.useTidy = useTidy;
  31     }
  32 
 
  33     public static final String setAction(final String callbackHandler,
  34             final String action) {
 
  35         getPropertiesForCallback(callbackHandler).put(ACTION, action);
  36         return "";
  37     }
  38 
 
  39     public static final String getAction(final String callbackHandler) {
  40         return getPropertiesForCallback(callbackHandler).get(ACTION);
 
  41     }
  42 
  43     public static final String setMethod(final String callbackHandler,
 
  44             final String method) {
  45         getPropertiesForCallback(callbackHandler).put(METHOD, method);
  46         return "";
  47     }
 
  48 
  49     public static final String getMethod(final String callbackHandler) {
  50         return getPropertiesForCallback(callbackHandler).get(METHOD);
 
  51     }
  52 
  53     /**
  54      * This method transforms given contents
 
  55      * 
  56      * @param contents
  57      *            - input contents
 
  58      * @param properties
  59      *            - input/output properties for transformation
 
  60      * @return transformed contents
  61      * @throws TransformationException
 
  62      *             - when error occurs while transforming content.
 
  63      */
  64     public String transform(String contents, Map<String, String> properties)
  65             throws TransformationException {
 
  66         initTransformer();
  67         final long started = System.currentTimeMillis();
  68 
  69         contents = contents.replaceAll("<!--.*?-->", "");
 
  70         InputStream in = new ByteArrayInputStream(contents.getBytes());
  71         if (useTidy) {
  72             in = tidy(in, (int) contents.length());
 
  73         }
  74 
  75         //
  76         final Source xmlSource = new StreamSource(in);
 
  77         final ByteArrayOutputStream out = new ByteArrayOutputStream(
  78                 (int) contents.length());
  79 
 
  80         final Result result = new StreamResult(out);
  81         String callbackHandler = properties.get("callbackHandler");
  82         if (callbackHandler == null) {
 
  83             callbackHandler = Thread.currentThread().getName();
  84         }
  85         final Map<String, String> props = new HashMap<String, String>();
 
  86         xslProperties.put(callbackHandler, props);
  87         transformer.setParameter("callbackHandler", callbackHandler);
  88         for (Map.Entry<String, String> e : properties.entrySet()) {
 
  89             transformer.setParameter(e.getKey(), e.getValue());
  90         }
  91         try {
  92             transformer.transform(xmlSource, result);
 
  93         } catch (TransformerException e) {
  94             throw new TransformationException("Failed to transform " + contents, e);
 
  95         }
  96         properties.put(ACTION, getAction(callbackHandler));
  97         properties.put(METHOD, getMethod(callbackHandler));
  98         xslProperties.remove(callbackHandler);
  99         return new String(out.toByteArray());
 
 100     }
 101 
 102     private static final Map<String, String> getPropertiesForCallback(
 
 103             String callbackHandler) {
 104         final Map<String, String> props = xslProperties.get(callbackHandler);
 105         if (props == null) {
 
 106             throw new NullPointerException(
 107                     "Failed to find properties for callback " + callbackHandler);
 108         }
 109         return props;
 
 110     }
 111 
 112     // no synchronization needed, multiple initialization is acceptable
 113     private final void initTransformer() {
 
 114         if (transformer == null) {
 115             try {
 116                 TransformerFactory transFact = TransformerFactory.newInstance();
 117                 InputStream in = getClass().getResourceAsStream(xslUri);
 
 118                 if (in == null) {
 119                     throw new TransformationException("failed to find xslt "
 120                             + xslUri);
 
 121                 }
 122                 Source xsltSource = new StreamSource(in);
 123                 transformer = transFact.newTransformer(xsltSource);
 124             } catch (TransformationException e) {
 
 125                 throw e;
 126             } catch (RuntimeException e) {
 127                 throw e;
 128             } catch (Exception e) {
 
 129                 throw new TransformationException(
 130                         "Failed to initialize XSL transformer", e);
 131             }
 132         }
 
 133     }
 134 
 135     private final InputStream tidy(InputStream in, int length) {
 136         ByteArrayOutputStream out = new ByteArrayOutputStream(length);
 
 137         Tidy converter = new Tidy();
 138         converter.setTidyMark(false);
 139         converter.setXmlOut(true);
 140         converter.setXmlPi(true);
 
 141         converter.setXmlPIs(true);
 142         converter.setNumEntities(true);
 143         converter.setDocType("omit");
 144         converter.setUpperCaseTags(false);
 
 145         converter.setUpperCaseAttrs(false);
 146         converter.setFixComments(true);
 147         converter.parse(in, out);
 148         return new ByteArrayInputStream(out.toByteArray());
 
 149     }
 150 }
 151 
 152 
 

Proxy

Following interfaces and classes show how GET/POST requests are proxied:

Proxy Interface

  1 package com.plexobject.web.proxy;
 
  2 
  3 import java.io.IOException;
  4 import java.util.Map;
  5 
 
  6 public interface HttpProxy {
  7     /**
  8      * This method issues a GET or POST request based on method and URI URI specified in the ProxyState
 
  9      * and adds given parameters to the request.
 
 10      * 
 11      * @param state
 12      *            - proxy state
 
 13      * @param params
 14      *            - name/value pairs of parameters that are sent to the get
 
 15      *            request
 16      */
 17     public ProxyResponse request(ProxyState state, Map<String, String[]> params)
 
 18             throws IOException;
 19 }
 20 
 21 
 

Proxy Implementation

Following class implements HttpProxy interface using HTTPClient library:

   1 package com.plexobject.web.proxy;
 
   2 
   3 import java.io.IOException;
   4 import java.util.ArrayList;
   5 import java.util.List;
 
   6 import java.util.Map;
   7 
   8 import org.apache.commons.httpclient.Cookie;
   9 import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
 
  10 import org.apache.commons.httpclient.HttpClient;
  11 import org.apache.commons.httpclient.HttpMethodBase;
  12 import org.apache.commons.httpclient.HttpState;
 
  13 import org.apache.commons.httpclient.NameValuePair;
  14 import org.apache.commons.httpclient.cookie.CookiePolicy;
  15 import org.apache.commons.httpclient.methods.GetMethod;
 
  16 import org.apache.commons.httpclient.methods.PostMethod;
  17 import org.apache.commons.httpclient.params.HttpMethodParams;
  18 
  19 import com.plexobject.io.IoUtil;
 
  20 
  21 public class HttpProxyImpl implements HttpProxy {
  22     private static final int CONNECTION_TIMEOUT_MILLIS = 30000;
 
  23 
  24     /**
  25      * This method issues a GET or POST request based on method and URI URI specified in the ProxyState
 
  26      * and adds given parameters to the request.
 
  27      * 
  28      * @param state
  29      *            - proxy state
 
  30      * @param params
  31      *            - name/value pairs of parameters that are sent to the get
 
  32      *            request
  33      */
  34     public ProxyResponse request(ProxyState state, Map<String, String[]> params)
 
  35             throws IOException {
  36         if (state.getMethod() == MethodType.GET) {
  37             return get(state, params);
 
  38         } else {
  39             return post(state, params);
  40         }
 
  41     }
  42 
  43 
  44     /**
  45      * This method issues a GET request on the URI specified in the ProxyState
 
  46      * and adds given parameters to the request.
 
  47      * 
  48      * @param state
  49      *            - proxy state
 
  50      * @param params
  51      *            - name/value pairs of parameters that are sent to the get
 
  52      *            request
  53      */
  54     private ProxyResponse get(ProxyState state, Map<String, String[]> params)
 
  55             throws IOException {
  56         GetMethod method = new GetMethod(state.getUri());
  57         method.setQueryString(toNameValues(params));
 
  58         return doRequest(state, params, method);
  59     }
  60 
  61     /**
 
  62      * This method issues a POST request on the URI specified in the ProxyState
 
  63      * and adds given parameters to the request.
 
  64      * 
  65      * @param state
  66      *            - proxy state
 
  67      * @param params
  68      *            - name/value pairs of parameters that are sent to the POST
 
  69      *            request
  70      */
  71     private ProxyResponse post(ProxyState state, Map<String, String[]> params)
 
  72             throws IOException {
  73         PostMethod method = new PostMethod(state.getUri());
  74         method.setRequestBody(toNameValues(params));
 
  75         return doRequest(state, params, method);
  76     }
  77 
  78     private ProxyResponse doRequest(ProxyState proxyState,
 
  79             Map<String, String[]> params, HttpMethodBase method)
  80             throws IOException {
  81         long started = System.currentTimeMillis();
 
  82         HttpClient client = new HttpClient();
  83         client.getHttpConnectionManager().getParams().setConnectionTimeout(
  84                 CONNECTION_TIMEOUT_MILLIS);
  85         client.getParams().setCookiePolicy(CookiePolicy.BROWSER_COMPATIBILITY);
 
  86         method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
  87                 new DefaultHttpMethodRetryHandler(3, false));
  88 
  89         HttpState initialState = new HttpState();
 
  90         for (Cookie cookie : proxyState.getCookies()) {
  91             initialState.addCookie(cookie);
  92         }
  93         client.setState(initialState);
 
  94 
  95         try {
  96             int statusCode = client.executeMethod(method);
  97             String contents = IoUtil.read(method.getResponseBodyAsStream());
 
  98             //
  99             Cookie[] cookies = client.getState().getCookies();
 100             for (Cookie cookie : cookies) {
 101                 proxyState.addCookie(cookie);
 
 102             }
 103 
 104             return new ProxyResponse(statusCode, contents, proxyState);
 105         } catch (RuntimeException e) {
 
 106             throw e;
 107         } catch (IOException e) {
 108             throw e;
 109         } catch (Exception e) {
 
 110             throw new IOException("failed to process request", e);
 111         } finally {
 112             method.releaseConnection();
 
 113         }
 114     }
 115 
 116     private NameValuePair[] toNameValues(Map<String, String[]> params) {
 117         if (params == null || params.size() == 0) {
 
 118             return new NameValuePair[0];
 119         }
 120         List<NameValuePair> nvPairs = new ArrayList<NameValuePair>();
 
 121         for (Map.Entry<String, String[]> e : params.entrySet()) {
 122             String[] values = e.getValue();
 123             for (int j = 0; j < values.length; j++) {
 
 124                 nvPairs.add(new NameValuePair(e.getKey(), values[j]));
 125             }
 126         }
 127         return (NameValuePair[]) nvPairs.toArray(new NameValuePair[nvPairs
 
 128                 .size()]);
 129     }
 130 }
 131 
 132 
 

ProxyState

Following class maintains URL, cookies, headers, and other information related to web request:

   1 package com.plexobject.web.proxy;
 
   2 
   3 import java.io.Serializable;
   4 import java.io.UnsupportedEncodingException;
   5 import java.net.URLDecoder;
 
   6 import java.net.URLEncoder;
   7 import java.util.Collection;
   8 import java.util.Date;
 
   9 import java.util.HashMap;
  10 import java.util.Map;
  11 
  12 import org.apache.commons.httpclient.Cookie;
 
  13 
  14 /**
  15  * Class: ProxyState
 
  16  * 
  17  * Description: This class stores state needed to make a proxy request including
 
  18  * method type and cookies.
  19  * 
 
  20  */
  21 public class ProxyState implements Serializable {
  22     private static final long serialVersionUID = 1L;
 
  23     private static final String DATA_DELIMITER = "\n";
  24     private static final String COOKIE_DELIMITER = ";";
 
  25     private static final String NULL = "null";
  26 
  27     private String uri;
 
  28     private MethodType method;
  29     private Map<String, Cookie> cookies;
  30 
 
  31     /**
  32      * Constructors for ProxyState
  33      */
 
  34     public ProxyState(String uri, String method) {
  35         this(uri, MethodType.valueOf(method));
  36     }
  37 
 
  38     public ProxyState(String uri, MethodType method) {
  39         this.uri = uri;
  40         this.method = method;
  41         this.cookies = new HashMap<String, Cookie>();
 
  42     }
  43 
  44     /**
  45      * @return uri
 
  46      */
  47     public String getUri() {
  48         return this.uri;
 
  49     }
  50 
  51     /**
  52      * @return method
 
  53      */
  54     public MethodType getMethod() {
  55         return this.method;
 
  56     }
  57 
  58     /**
  59      * @return cookies
 
  60      */
  61     public Collection<Cookie> getCookies() {
  62         return this.cookies.values();
 
  63     }
  64 
  65 
  66     /**
  67      * @param cookies
 
  68      */
  69     public void addCookies(Collection<Cookie> cookies) {
  70         for (Cookie cookie : cookies) {
 
  71             addCookie(cookie);
  72         }
  73     }
  74 
  75     /**
 
  76      * @param cookie
  77      *            - to add
 
  78      */
  79     public void addCookie(Cookie cookie) {
  80         this.cookies.put(cookie.getName(), cookie);
 
  81     }
  82 
  83     public String getCookieString() {
  84         StringBuilder sb = new StringBuilder(512);
 
  85         for (Cookie cookie : cookies.values()) {
  86             if (cookie.getDomain() != null) {
  87                 sb.append(cookie.getDomain()).append(COOKIE_DELIMITER);
 
  88             } else {
  89                 sb.append(NULL).append(COOKIE_DELIMITER);
  90             }
  91             sb.append(cookie.getName()).append(COOKIE_DELIMITER).append(
 
  92                     cookie.getValue()).append(COOKIE_DELIMITER);
  93 
  94             if (cookie.getPath() != null) {
  95                 sb.append(cookie.getPath()).append(COOKIE_DELIMITER);
 
  96             } else {
  97                 sb.append(NULL).append(COOKIE_DELIMITER);
  98             }
  99             if (cookie.getExpiryDate() != null) {
 
 100                 sb.append(String.valueOf(cookie.getExpiryDate().getTime()))
 101                         .append(COOKIE_DELIMITER);
 102             } else {
 103                 sb.append(NULL).append(COOKIE_DELIMITER);
 104             }
 
 105             sb.append(String.valueOf(cookie.getSecure()))
 106                     .append(DATA_DELIMITER);
 107         }
 108         return sb.toString();
 109     }
 
 110 
 111 
 112     @Override
 113     public String toString() {
 114         StringBuilder sb = new StringBuilder(512);
 
 115         sb.append(uri.toString()).append(DATA_DELIMITER);
 116         sb.append(method.toString()).append(DATA_DELIMITER);
 117         sb.append(getCookieString());
 118         return sb.toString();
 119     }
 
 120 
 121     /**
 122      * This method converts proxy state into string based serialized state
 
 123      * 
 124      * @return string based serialized state
 
 125      */
 126     public String toExternalFormat() {
 127         try {
 128             return URLEncoder.encode(toString(), "UTF8");
 
 129         } catch (UnsupportedEncodingException e) {
 130             throw new IllegalStateException("failed to encode", e);
 131         }
 
 132     }
 133 
 134     /**
 135      * This method converts a string based serialized state into the proxy state
 
 136      * 
 137      * @param ser
 138      *            - string based serialized state
 
 139      * @return ProxyState
 140      * @throws IllegalArgumentException
 141      *             - if serialized state is null or corrupted.
 
 142      */
 143     public static ProxyState valueOf(String ser) {
 144         if (ser == null)
 
 145             throw new IllegalArgumentException("Null serialized object");
 146         String decoded;
 147         try {
 
 148             decoded = URLDecoder.decode(ser, "UTF8");
 149         } catch (UnsupportedEncodingException e) {
 150             throw new IllegalArgumentException("Unsupported encoding " + ser, e);
 
 151         }
 152         String[] lines = decoded.split(DATA_DELIMITER);
 153         if (lines.length < 2)
 154             throw new IllegalArgumentException(
 
 155                     "Insufficient number of tokens in serialized object ["
 156                             + decoded + "]");
 157         ProxyState state = new ProxyState(lines[0], lines[1]);
 158         for (int i = 2; i < lines.length; i++) {
 
 159             String[] cookieFields = lines[i].split(COOKIE_DELIMITER);
 160             if (cookieFields.length < 6)
 161                 throw new IllegalArgumentException(
 
 162                         "Insufficient number of tokens 6 in serialized cookies ["
 163                                 + lines[i] + "]/[" + decoded + "]");
 164             String domain = cookieFields[0];
 165             if (NULL.equals(domain)) {
 
 166                 domain = null;
 167             }
 168             String name = cookieFields[1];
 169             String value = cookieFields[2];
 170             String path = cookieFields[3];
 
 171             if (NULL.equals(path)) {
 172                 path = null;
 173             }
 174             Date expires = null;
 
 175             if (!NULL.equals(cookieFields[4])) {
 176                 expires = new Date(Long.parseLong(cookieFields[4]));
 177             }
 178             boolean secure = new Boolean(cookieFields[5]).booleanValue();
 
 179             Cookie cookie = new Cookie(domain, name, value, path, expires,
 180                     secure);
 181             state.addCookie(cookie);
 182         }
 183         return state;
 
 184     }
 185 
 186     @Override
 187     public boolean equals(Object o) {
 188         if (this == o)
 
 189             return true;
 190         if (!(o instanceof ProxyState))
 191             return false;
 
 192         final ProxyState other = (ProxyState) o;
 193         if (uri != null ? !uri.equals(other.uri) : other.uri != null)
 194             return false;
 
 195         if (method != null ? !method.equals(other.method)
 196                 : other.method != null)
 197             return false;
 
 198         return true;
 199     }
 200 
 201     @Override
 202     public int hashCode() {
 
 203         int result;
 204         result = (uri != null ? uri.hashCode() : 0);
 205         result = 29 * result + (method != null ? method.hashCode() : 0);
 
 206         return result;
 207     }
 208 }
 209 
 210 
 

ProxyResponse

Following class stores response from the HttpProxy interface:

  1 package com.plexobject.web.proxy;
 
  2 
  3 import java.io.Serializable;
  4 
  5 
  6 /**
 
  7  * Class: ProxyResponse
  8  * 
  9  * Description: This class stores proxy state and response.
 
 10  */
 11 public class ProxyResponse implements Serializable {
 12     private static final long serialVersionUID = 1L;
 
 13     private int responseCode;
 14     private String contents;
 15     private ProxyState state;
 
 16 
 17     /**
 18      * Constructor for ProxyResponse
 19      */
 
 20     public ProxyResponse(int responseCode, String contents, ProxyState state) {
 21         this.responseCode = responseCode;
 22         this.contents = contents;
 23         this.state = state;
 
 24     }
 25 
 26     /**
 27      * @return http response code
 
 28      */
 29     public int getResponseCode() {
 30         return this.responseCode;
 
 31     }
 32 
 33     /**
 34      * @return XHTML contents
 
 35      */
 36     public String getContents() {
 37         return this.contents;
 38     }
 
 39 
 40     /**
 41      * @return state associated with the proxy web request
 
 42      */
 43     public ProxyState getState() {
 44         return this.state;
 45     }
 
 46 
 47     @Override
 48     public String toString() {
 49         return this.responseCode + "\n" + this.state + "\n" + this.contents;
 
 50     }
 51 }
 52 
 53 
 

MethodType

Following class defines enum for http method types:

  1 package com.plexobject.web.proxy;
 
  2 
  3 /**
  4  * Class: MethodType
 
  5  * 
  6  * Description: Defines supported method types for proxy request.
 
  7  * 
  8  */
  9 public enum MethodType {
 
 10     GET, POST;
 11 }
 12 
 13 
 

Service Example

Following classes show how above HTTPProxy and ContentTransfomer interfaces can be used with Servlet/Portlet APIs:

ProxyService Interface

 1 package com.plexobject.web.service;
 
 2 import javax.servlet.http.*;
 3 import java.io.*;
 4 
 5 public interface ProxyService {
 
 6     public void render(HttpServletRequest request,  HttpServletResponse response) throws IOException ;
 7     public void submit(HttpServletRequest request,  HttpServletResponse response) throws IOException ;
 
 8 }
 9 
 0 
 

ProxyService Implementation

  1 package com.plexobject.web.service;
 
  2 import com.plexobject.web.proxy.*;
  3 import com.plexobject.transform.ContentTransformer;
  4 import javax.servlet.http.*;
 
  5 import java.io.*;
  6 import java.util.*;
  7 
  8 
 
  9 public class ProxyServiceImpl implements ProxyService {
 10     private HttpProxy httpProxy;
 11     private ContentTransformer contentTransformer;
 
 12     public ProxyServiceImpl(HttpProxy httpProxy, ContentTransformer contentTransformer) {
 13         this.httpProxy = httpProxy;
 14         this.contentTransformer = contentTransformer;
 15     }
 16 
 
 17     public void render(HttpServletRequest request,  HttpServletResponse response)  throws IOException {
 18         String url = "http://plexrails.plexobject.com/guest_book/sign";
 19         ProxyState state = new ProxyState(url, MethodType.GET);
 
 20         String inputXhtml = httpProxy.request(state, null).getContents();
 21         Map<String, String> properties = new HashMap<String, String>();
 22         properties.put("callbackState", state.toExternalFormat());
 
 23         String transformedXhtml = contentTransformer.transform(inputXhtml, properties);
 24         response.getWriter().println(transformedXhtml);
 25     }
 26 
 27     public void submit(HttpServletRequest request,  HttpServletResponse response)  throws IOException {
 
 28         String originalActionUrl = request.getParameter("originalActionUrl");
 29         String orginalMethodType = request.getParameter("orginalMethodType");
 30         ProxyState userState = ProxyState.valueOf(request.getParameter("userState"));
 31         Map<String, String[]> params = request.getParameterMap();
 
 32         ProxyState state = new ProxyState(originalActionUrl, orginalMethodType);
 33         state.addCookies(userState.getCookies());
 34         ProxyResponse proxyResponse = httpProxy.request(state, params);
 35         response.getWriter().println(proxyResponse.getContents());
 36     }
 
 37 }
 38 
 39 
 

Download Code

You can download above code from here.

Acknowledgement

I would like to thank the folks at XSLT forum of Programmer-to-Programmer (http://p2p.wrox.com/forum.asp?FORUM_ID=79) for answering my XSLT questions.

August 14, 2008

Benefits of REST based services

Filed under: REST — admin @ 10:47 pm

I saw Damien Katz blog on REST, I just don’t get it and I was a bit surprised that he doesn’t get REST especially since he wrote CouchDB based on REST. Though, I admit there are a lot of bad examples of REST services that use REST sort of like RPC over POST, but resource oriented services can be quite simple and powerful. REST principles are building blocks for the web and it has proven to be quite scalable and efficient. I have been developing REST based services for a number of years, in some ways before I learned about Roy Fieldings’ thesis and REST principles. Back in 90s, I worked on building traffic sites and used CORBA to subscribe and publish traffic events. We also published that data on the website, but soon we found a number of people were scraping the website so I wrote a simple XML over HTTP service to download the data that other groups can use. I have found following benefits when using REST based services:

  • separating reads from writes. I have worked on large ecommerce and travel website and one of the lesson is to keep your read/query services separate from your transactional services. REST APIs define separate operations for reads and write/updates.
  • caching: you can find tons of off the shelf solutions for caching GET requests including hardware solutions. There are tons of features like ETags and cache headers that provide this feature.
  • compression: Since REST uses HTTP, you can use compression such as gzip. This can improve the performance of the services.
  • idempotency: GET, PUT, DELETE and HEAD are idempotent, which means if designed correctly the request can be retried without any worries about side effects. POST on the other hand is not idempotent and may have side effects.
  • bookmarking: GET requests can be easily bookmarked. It is important not to use GET to change state of application.
  • security: Though, security has been weakest area of REST as compared to SOAP, but HTTPS and simple authentication surfice. Though, there are better standards like oauth.
  • big response size: REST/HTTP is the only service platform that I have seen supports gigabytes of responses. I have done a lot of CORBA based services in 90s, EJBs/SOAP in early 2000s and messaging based services for over ten years. None of those platforms support large size responses.
  • simplicity: I find this is the biggest reason for using REST. I can use browser to call GET based requests and write client in any language.
  • resources: REST response can include URIs for other APIs and client can change state through these resources. You can use XHTML to embed all these resources that can be easily tested with browsers.
  • No need for additional jars: When I used CORBA, EJBs, RMI or JINI, I always had to put client/skeleton jar files. Having worked in large companies where I had to import dozens of these jar files became maintenance problem. With REST, I can simply call the service without importing anything.
  • Error codes: HTTP comes with a number of response codes for real life services including thrashing requests such as server busy (503).
  • Meta data: As opposed to CORBA, JINI, RMI services, I can pass meta data easily as HTTP supports predefined and user-defined headers. These headers can include information on authentication, quality of service, timeout or other context related data. Occasionally, I add Map<String,String> to APIs when I use Java based services, but it polutes pure APIs.

The only real drawbacks I see for REST based services are that they are generally synchronous and blocking which can waste threads (though some of it can be solved with async I/O or event based dispatching). Personally I like to use messaging underneath REST services that provide asynchronous, persistence, and better reliability.

July 31, 2008

Love and Hate with Java

Filed under: Java — admin @ 10:46 pm

For past few years, bashing Java has been really popular though some of the criticism has merits. But in general, due to its popularity, Java has become “the man” who tries to bring everyone down. There are millions of programmers who work for the “Java the man”. I saw recent post from NYU professor, who called Java-savvy college grads to tomorrow’s pizza delivery man. I know Joel Spolsky often mentions teaching C in unversities to help understand pointers and memory management. I agree with notion of teachings multiple languages in universities so that graduates have wide breadth of understanding with differant programming paradigms. I started learning programming back in 80’s on Atari and learned BASIC. I then moved to PC and learned GW-BASIC and then learned C, FORTRAN, Assembler, COBOL, Pascal, C++ in college. I also learned Lisp, Prolog, Perl on my own. In late 80’s and early 90s, I also learned DBase III, RPG and SAS, which was called 4th generation language. Similarly, C, FOTRAN, Pascal, etc. were called third languages, assembly languages were second generation languages. I learned Java in ’95 when it came out and found it to be much easier to program than C/C++. I also learned Python, Ruby and Erlang for past few years and have been learning Haskell and Scala these days.

For most part, Java has been my primary language with some use of C++, Perl, Ruby, Python/Jython (and Erlang on my own). Though I wish I could use more Erlang but I don’t have same experience with Erlang as I have with Java. Over time, Java managed to take a lot of C/C++ share of the market. Also, Java has managed to buid large ecosystem with open source and commercial suites of libraries and frameworks. I often hear that Java is so enterprisy and popular in large companies, but truth is that Java has proven itself to be reliable language. Steve Yeggie also mentioned in his blog how Google primarily uses Java, C++, Python and Javascript.

I like the polyglot environment, where I write performance critical code in system language like Java and use Ruby/Python for high level glue code or web tier.
I find often the criticism of Java is dishonest. For example, though people raves about metaprogramming in Ruby but forget to mention all the overhead that goes with it, not to mention security holes and memory leaks issues. The truth is that none of hot languages like Python, Ruby, Erlang, Haskell provide same performance as Java, in fact Java’s hotspot compiler beats C++ in production. I am going to ignore static vs dynamic language debate, but I’ve found static languages work better with large number of developers. Again, I like these languages, but I prefer to see some balanced comparison. The real reason Java is popular is because there are tons of jobs. Here is quick comparison of jobs in Java, C++, C#, Erlang, Haskell, OCaml, Ruby, Python and Factor:

As Bruce Lee said:


I fear not the man who has practiced 10,000 kicks once, but I fear the man who has practiced one kick 10,000 times.

I find it, the way you can distinguish yourself is by learning more about the design and architecture of developing system and learning more about the ecosystem. It takes years to learn the ins and outs of programming language and all the tools and libraries with it. Though, I totally agree with learning a number of different languages like Haskell, Factor, Erlang, Scala, Groovy and I have been trying to learn all those for many years. However, for system language my first choice is still Java, simply because I have found it to be reliable and efficient language. As James Gosling said Java is a blue collar language. Sure it does not have closures (yet), actors, transactional memory, metaprogramming or AST/macros but it is well suited for building large applications by hundreds or thousands of programmers. I just started a large project in my division at Amazon, and sure enough I chose Java because I have been using it for over twelve years and I know it can do the job. It wasn’t simply because Java is safe choice (no one got fired for choosing IBM), but practically Java has more matured solutions for business needs. For example, my project needs to integrate with 20+ applications and is aimed at reducing manual work so it needed portal server, workflow engine, rules engine and messaging service and there are tons of options for those in Java community.

Finally, JVM is proving to be neat platform for building new languages like JRuby, Jython, Groovy, Scala, Clojure, etc. that can bring cool features and high interoperability with existing system. As Guy Steele said in his recent interview, you can’t expect one language to solve all problems.

July 7, 2008

Reaction vs Preparedness

Filed under: Methodologies — admin @ 8:36 pm

I’ve had struggled with culture of fire-fighting and heroism and discussed these issues several time in my blog [1], [2], [3], [4]. Over the weekend, I saw Bruce Schneier’s talk on security. He talked about Hurricane Katrina’s devastation and said that politicians don’t like to invest in infrastructure because it does not get noticed. He showed how reaction is considered more important than preparedness. Another thing he mentioned was that investing in new projects is received better than investing in existing projects. I find that IT culture has very similar value system, where heros or martyrs who react (on daily basis) to emerging crisis and fires are noticed by the managers and receive promotions and accolades. The same culture does not value in investing to better prepare for disasters or emergencies. Unfortunately, once people get promoted they moved on and leave the pile of shit for someone else. Of course, the next team would rewrite rather than keeping the old system. In last two years, I had to rewrite two major systems where the previous systems were written less than two years ago. May be they will be rewritten by someone else when I leave. The cycle will continue…

July 1, 2008

Designing Microblogging system for Scalability

Filed under: Computing — admin @ 5:12 pm

Introduction

I have been a Twitter user for a while, have observed or heard about downtime and scalability problems with Twitter. The scalability of Twitter has become a topic for a lot of discussions and blogs and has also offered a useful excercise to design scalable systems. A common root cause as identified from Twitter’s blogs is that the architecture is based on CMS because it was written in Ruby on Rails and that is what Rails good at. The solution to the scalability problem as pointed by other people is messaging based architecture. There’s also been a lot of blame for Twitter’s problems on Ruby and Rails because Ruby is a slow language compared to other static and dynamic languages and Rails is not built for scalability. Though, there is some truth to it, but I don’t think there are the sole bottlenecks. In fact, I am going to show a small prototype written in Ruby and Rails (partially) that integrate with the messaging middlewares, which can be scaled easily. I have been using messaging middlwares such as CORBA Event service, IBM MQ series, Websphere, Weblogic, Tibco and ActiveMQ for over ten years and have long been proponent of messaging based sysems for scalable systems [1] [2]. So, I spent a few hours to put together a prototype based on messaging middleware and Ruby on Rails to see how such system can be developed.

Design Principles

Before describing my design, I am going to review some of the design principles that I have used for building large scale systems ([1], [2]), which are:

  • Coupling/Cohesion – loosely coupled and shared nothing architecture and partitioning based on functionality.
  • Messaging or SEDA architecture to implement reliable and scalable services and avoid temporal dependencies.
  • Resource management – good old practices of avoiding leaks of memory, database connections, etc.
  • Data replication especially read-only data.
  • Partition data (Sharding) – using multiple databases to partition the data.
  • Avoid single point of failure.
  • Bring processing closer to the data.
  • Design for service failures and crashes.
  • Dynamic Resources – Design service frameworks so that resources can be removed or added automatically and clients can automatically discover them. Use virtualization and horizontal scalability whenever possible.
  • Smart Caching – Cache expensive operations and contents as much as possible.
  • Avoid distributed transactions, use optimistic compensating transactions (See CAP principle).

Architecture & Design

Following is high level architecture for the Microblogging system:

First, I selected REST architecture as an entry point to our system for both Web UI and 3rd party applications and used messaging middleware for implementing the sevices. This gives us ease of access with REST APIs and scalability with messaging. In my implementation I chose JRuby/Rails to implement most of the code, Derby for the database and ActiveMQ for the messaging store. In addition to scalability, the messaging middleware gives you a lot of advantages from functional languages like Erlang such as immutability, message passing, fault tolerance (via persistence queues). You can even build support for versioning and hot code swapping by adding version number to each message and creating routers (See integration patterns) to direct messages to different handlers.

APIs

Following are REST APIs that will be exposed to 3rd party apps, Web and other kind of UI:

Create User

POST /users
where the user information is passed in the body in the form of parameters.

Login/Authenticate User

POST /users/userid/sessions
This will authenticate the user and create a session. Note that most of following APIs send back session-id, will be stored in the database (sharded) and will be used to retrieve user information.

Logout

HTTP-HEADER
session-id

DELETE /users/userid/sessions

Get User information

HTTP-HEADER
session-id

GET /users/userid
This API will return detailed user information

Anonymous User information

GET /users/userid
This API will return public user information

List of Followings

HTTP-HEADER
session-id

GET /followings/userid
This API will return summary of people, the user is following.

Create Followers

HTTP-HEADER
session-id

POST /followers/followerid
This API will create one-way follower relationship between the user and follower.

Enable notification for Followers

HTTP-HEADER
session-id

POST /followers/followerid/notifications

Disable notification for Followers

HTTP-HEADER
session-id

DELETE /followers/followerid/notifications

Block Followers

HTTP-HEADER
session-id

POST /followers/followerid/blocking

Unblock Followers

HTTP-HEADER
session-id

DELETE /followers/followerid/blocking

Follower Exist

HTTP-HEADER
session-id

GET /followers/followerid
This API will return 200 HTTP code if follower exist.

List of Followers

HTTP-HEADER
session-id

GET /followers
This API will return summary of people, the user is following.

Archive Messages

HTTP-HEADER
session-id

GET /messages?offset=xxx&limit=yyy&since=date
This API will return archived messages for the user, where offset and limit will be optional.

DELETE Messages

HTTP-HEADER
session-id

DELETE /messages/message-id
This API will return archived messages for the user, where offset and limit will be optional.

Send Direct Messages

HTTP-HEADER
session-id

POST /directmessages/targetuserid
This API will return send direct message to the given user.

Send Reply

HTTP-HEADER
session-id

POST /reply/message-id
This API will return reply for the given message-id and pass the contents of the message in the body (as parameters).

Direct Messages Received

HTTP-HEADER
session-id

GET /directmessages/userid?offset=xxx&limit=yyy&since=date
This API will return messages received by the user.

Replies Received

HTTP-HEADER
session-id

GET /replies?offset=xxx&limit=yyy&since=date
This API will return replies received by the user.

Update Status

HTTP-HEADER
session-id

POST /statuses
This API will update status of the user and pass the contents of the message in the body (as parameters).

Get Statuses

HTTP-HEADER
session-id

GET /statuses?offset=xxx&limit=yyy&since=date
This API will update status of the user and pass the contents of the message in the body (as parameters).

User Timeline

HTTP-HEADER
session-id

GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return timeline of the user. This API will compare given username with the authenticated username and will return detailed timeline if match, otherwise it will return public timeline.

Public Timeline

GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return public timeline of the user.

Friends Timeline

HTTP-HEADER
session-id

GET /friendstimeline?offset=xxx&limit=yyy&since=date
This API will return timeline of the friends of the user.

Request Flow

Here is an illustration of how information is flowed through different components:

Though, I am not showing request flow of all APIs, but they will follow similar pattern of flow.

Detailed Design

Domain Classes

Primary domain classes are:

  • User
  • Message, which has four subclasses DirectMessage, ReplyMessage, Tweet and Status for various kind of messages in the system.
  • Follower – creates one-way relationship between two users, where follower can choose to be notified when the user changes his/her status.

I identify each message with special GUID and using a simple scheme to generate GUID for request-ids and message-ids, but for real project I would recommend better libraries such as UUIDTools.

Schema

Followers

  1 
  2 class CreateFollowers < ActiveRecord::Migration
  3   def self.up
  4     
  5     create_table :followers do |t|
  6       t.column :username,            :string, :limit => 16
  7       t.column :follower_username,   :string, :limit => 16
  8       t.column :relation_type,       :string, :default => 'Follower', :limit => 32
  9       t.column :blocked,             :boolean, :default => false
 10       t.column :notifications,       :boolean, :default => false
 11       t.column :created_at,          :datetime
 12       t.column :updated_at,          :datetime
 13       t.column :deleted_at,          :datetime
 14     end
 15     add_index :followers, :username
 16     add_index :followers, :follower_username
 17   end
 18 
 19   def self.down
 20     drop_table :followers
 21   end
 22 end
 
 

Messages

  1 class CreateMessages < ActiveRecord::Migration
  2   def self.up
  3     create_table :messages do |t|
  4       t.column :message_id,          :string, :limit => 64
  5       t.column :type,                :string, :limit => 32
  6       t.column :message_type,        :string, :default => 'Say', :limit => 32
  7       t.column :reply_message_id,    :string, :limit => 64
  8       t.column :username,            :string, :limit => 16
  9       t.column :channel_name,        :string, :limit => 32
 10       t.column :message_body,        :string, :limit => 140
 11       t.column :favorite,            :boolean, :default => false
 12       t.column :sent_at,             :datetime, :default => Time.now.utc
 13       t.column :created_at,          :datetime
 14       t.column :deleted_at,          :datetime
 15     end
 16     add_index :messages, :message_id
 17     add_index :messages, :username
 18   end
 19 
 20   def self.down
 21     drop_table :messages
 22   end
 23 end
 
 

Users

  1 class CreateUsers < ActiveRecord::Migration
  2   def self.up
  3     create_table :users do |t|
  4       t.column :username,            :string, :limit => 16
  5       t.column :password,            :string, :limit => 16
  6       t.column :name,                :string, :limit => 64
  7       t.column :email,               :string, :limit => 64
  8       t.column :time_zone_id,        :string, :limit => 32
  9       t.column :created_at,          :datetime
 10       t.column :updated_at,          :datetime
 11       t.column :deleted_at,          :datetime
 12     end
 13     add_index :users, :username
 14   end
 15 
 16   def self.down
 17     drop_table :users
 18   end
 19 end
 
 

Persistence

I used Rails’ ActiveRecord library to provide persistence, though alternatively I could have used ActiveHibernate. These libraries provide a quick way to add persistence capabilities with minimum configuration and boilerplate. This prototype is using multiple levels of partitioning, first at the service level, second at the persistence level. I am using multiple databases of Derby to store objects using a simple hashing scheme for load distribution. This prototype also shows how to connect to multiple databases in Rails, which was difficult in early implementation of Twitter.

Domain Services

The core model and services use domain driven design and applies principles of fat model and thin service (as opposed to fat servicess and anemic model). The domain services implement external REST APIs and use underlying ActiveRecord for most of the functionality.

Messaging Middleware

The REST based web services don’t invoke domain services directly, instead they use messaging middleware. In real application, I might use ESB/integration patterns such as intelligent routing to partition the system across multiple machines and send the request to the suitable queue. In this prototype, I am simply using ActiveMQ, which is fairly robust and easy to use. I am also using separate queues for different kind of operations. Another lesson I have learned in building large systems is to separate reads from the writes so that you can scale them independently and also offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.

Business Delegate

The REST based web services don’t interact with the messaging middleware directly, instead they use business delegates that hides all details of sending out message, creating temporary queues and receiving messages. The interface of business delegates is same as services.

Benchmark Results

Though, performance was not the objective of my prototype, but I tried to check how many requests I can process on my development machine. I chose only to benchmark messaging middlewares and not REST server because JVM uses native threads and web containers such as Tomcat uses a small sized thread pool to perform requests. Since, our architecture is heavily IO-bound, that would not scale. Alternatively, I could have build reactive or event based APIs for HTTP or use Yaws/Mochiweb as a container for REST based web sevices because creating a process in Erlang is pretty cheap. For example, Erlang process takes 300 bytes, whereas Java thread take 2M by default (though, it can be reduced to 256K on most machines). Here are results of running a simple server with embedded ActiveMQ and load test both running JRuby on my Pentium Linux machine. I used default VM size for both JRuby processes and didn’t tune any options:

What Elapsed Time (secs) Throughput Invocation Times
load_test_create_users 99.041000 10.09682857311318/s 1000
load_test_bad_authenticate 79.407000 12.593348183199506/s 1000
load_test_good_authenticate 91.802000 10.893008861477503/s 1000
load_test_get_users 91.362000 10.945469671474584/s 1000
load_test_create_followers 143.560000 6.958714408811979/s 999
load_test_follower_exists 95.140000 10.50020495537709/s 999
load_test_get_followers 99.881000 10.002002391423325/s 999
load_test_get_followings 104.737000 9.538176576655392/s 999
load_test_block_follower 156.106000 6.399497779340769/s 999
load_test_unblock_follower 161.950000 6.168532449211547/s 999
load_test_follower_enable_notifications 157.637000 6.337344665142635/s 999
load_test_follower_disable_notifications 161.714000 6.1775727524053545/s 999
load_test_send_direct_message 1673.494000 5.969546350480188/s 9990
load_test_messages_sent_receive 157.832000 6.323179075798668/s 998
load_test_send_direct_message 1725.717000 5.788898179687536/s 9990
load_test_send_replies 21.546000 5.105592926919201/s 110
load_test_get_user_status 86.952000 11.500598043963544/s 1000
load_test_update_status 213.298000 4.688276498896948/s 1000
load_test_get_user_status 88.987000 11.23759650430517/s 1000
load_test_archive_messages 80.982000 12.348575563593377/s 1000
load_test_public_timeline 98.143000 10.189213685309506/s 1000
load_test_user_timeline 92.871000 10.767623902461311/s 1000
load_test_friends_timeline 140.155000 7.1349577268319555/s 1000
load_test_user_and_friends_timeline 193.257000 5.174430032905596/s 1000

I was not impressed with the results I got and I may implement similar prototype in Erlang using MochiWeb, Mnesia and Ejabber.

Summary

Though, the technologies I choose are somewhat arbitrary, the same design principles can be applied to other technology stack. Though, technologies are generaly selected due to political reasons or personal preference, it is important to consider team’s familiarity with the technology stack. For example, Twitter tried Ejabber but had problems troubleshooting becaues they were not familiar with Erlang.
Again, I used architecture principles such as stateless services, though in real life I may have to store some state which can be stored in the shareded database. I used embedded services such as embedded database server and messaging server because you can then easily start a single process on a machine and replicate machines as the load increases. I used partitioning/sharding heavily, which is key for scalability. I also used replication specially replicating messages for each follower so that the reads are fast and we don’t spend a lot of time querying the database. Though, it does add cost on the write side and adds disk requirements. I think we can control those by limiting number of messages per user per minute and maximum number of notifications. Also, we can remove old messages from the database. I also used principle of bringing computating closer to data by using embedded database.

Other Improvements

I tried to show that scalability problems are best solved from the architecture that is independent of a particular technology or language. In fact, I may implement similar design in Erlang using Mnesia as the database, Ejabber as the messaging middleware, Mochiweb for REST and OTP for implementing services. Other improvements that I would suggest be use of reverse proxy server for caching along with DMZ for security. We can also add some throttling to the reverse proxy servers. In order to have better fault tolerance, I might have multiple reverse proxy servers and use DNS round robin, though I don’t like DNS based load balance for real load distribution because it does not take server’s capacity and load into account, but it would suitable in this case. Also, I didn’t implement any caching, but we can use caching solutions such as EHCache, Tangosol, Terracotta, Memcache, etc. Though, caching highly dynamic contents may be difficult and less reusable. Also, I didn’t use any ESB (lightweight providers such as Mule, ServiceMix), but it can be used to abstract routing to the services, load balance, aggregation, replication, transformation, etc. Also, we can build support for versioning and hot code swapping by adding version number to each message and changing routing schemes. This prototype uses a simple scheme for partitioning by creating hash of business keys such as username, however it is difficult to manage when you need to add or remove servers because it requires migrating data. Another solution is to use MD5 scheme that we use at Amazon for S3, which may calculate MD5 of username and each replicated queue and select the queue whose md5-sum is higher than the username’s sum. For further fault tolerance, we could replicate data in Master/Slave fashion.

PS: I noticed “NativeException: javax.jms.JMSException: PermGen space” a couple of times running long load test, which is generally caused by reloading classes (as I have observed this for many years in Weblogic and Tomcat). I probably need to investigate this further but my guess is that JRuby is doing something dumb.

Download

Source Source Code

Appendix

Source Code Model

./model/user.rb
  1 ###
  2 # User of the system.
  3 # Though, in our system user can have many messages that it sent to update 
  4 # his/her status or other users and can also have followers and followings, but
  5 # we are not showing those relationships because we cannot populate them due to
  6 # sharding and scalability concerns.
  7 
  8
  9 ###
 10 class User < ActiveRecord::Base
 11     validates_presence_of       :name
 12     validates_presence_of       :username,         :within => 3..16
 13     validates_presence_of       :password,      :within => 3..16
 14     validates_length_of         :email,         :within => 3..64
 15     validates_uniqueness_of     :email,         :case_sensitive => false
 16     validates_format_of         :email,         :with => /^([^@\s]+)@((?:[-a-z0-9]+\.)+[a-z]{2,})$/i
 17     #
 18     ### all messages for the same user wil be stored in the same database.
 19     ### also, all messages from the users he/she is following will be copied to the user's database
 20     ### if they lie in different databases.
 21     ### limiting rows returned from these relationships to 100 so that we don't overwhelm server.
 22     #
 23     has_many :statuses, :class_name => 'Message', :foreign_key => :username, :limit => 100, :order => 'sent_at desc'
 24     has_many :tweets, :class_name => 'Message', :foreign_key => :channel_name, :limit => 100, :order => 'sent_at desc'
 25 
 26     #
 27     ### We will replicate follower/following to each database and though we may not have corresponding users,
 28     ### but we can have usernames
 29     #
 30     def self.follower_usernames(username, offset, limit)
 31        Follower.followers(username, offset, limit).map {|f| f.follower_username }
 32     end
 33 
 34     def self.following_usernames(username, offset, limit)
 35        Follower.followings(username, offset, limit).map {|f| f.username }
 36     end
 37 
 38     def self.follower_count(username)
 39        Follower.followers_count(username)
 40     end
 41     def self.following_count(username)
 42        Follower.followers_count(username)
 43     end
 44 
 45     #
 46     ### user's timeline
 47     #
 48     def self.timeline(username, offset, limit)
 49        Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit)
 50     end
 51 
 52     def to_s
 53       "#{username}"
 54     end
 55 
 56     def to_external_xml(xml)
 57       xml.user(:username => self.username, :name => self.name, :email => self.email)
 58     end
 59 end
 
 

./model/message.rb
  1 #
  2 ###
  3 # Message stores any message sent by user to update his/her status, tweets
  4 # from other users that he/she is following, direct message sent to another
  5 # user or reply message sent in response to tweet or direct message.
  6 # Note that we use a GUID based message_id in conjunction with the database column 'id'
  7 # because we are using sharding the database column can be duplicate, but the GUID
  8 # based message_id will always be unique across all databases.
  9 ###
 10 class Message < ActiveRecord::Base
 11     MESSAGE_TYPE_SAY = "Say"
 12     MESSAGE_TYPE_SHOUT = "Shout"
 13 #
 14     validates_presence_of       :message_id,    :within => 1..64
 15     validates_presence_of       :username
 16     #
 17     ### channel_name will be username of the target user for now,
 18     ### though, in future it could be a group or a topic
 19     #
 20     validates_presence_of       :channel_name,  :within => 1..32
 21     validates_presence_of       :message_body,  :within => 1..140
 22     validates_presence_of       :message_type,  :within => 1..32
 23     validates_presence_of       :sent_at
 24     validates_uniqueness_of     :message_id
 25     #
 26     ### all messages for the same user wil be stored in the same database.
 27     ### also, all messages from the users he/she is following will be copied to the user's database
 28     ### if they lie in different databases.
 29     #
 30     belongs_to :user, :class_name => 'User', :foreign_key => :username
 31 
 32     def self.messages_for(username, offset, limit)
 33        Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 34     end
 35 
 36     def self.destroy_message(id)
 37        message = Message.find_by_message_id(id)
 38        if message
 39           message.destroy
 40           message
 41        else
 42           nil
 43        end
 44     end
 45 
 46 
 47     def to_s
 48       "#{username} -> #{channel_name}: #{message_body}"
 49     end
 50 
 51     def to_external_xml(xml)
 52       xml.message(message_body, :message_id => self.message_id, :message_type => self.message_type, :from => self.username, :to => self.channel_name, :sent_at => sent_at.httpdate) 
 53     end
 54 end
 
 

./model/reply_message.rb
  1 ###
  2 # Message sent in response to another tweet, message or direct message.
  3 #
  4 # commented below because message could be in different database.
  5 # belongs_to :reply_message_id, :class_name => Message, :dependent => :destroy
  6 ###
  7 
  8 require 'direct_message'
  9 
 10 class ReplyMessage < DirectMessage
 11   validates_presence_of       :reply_message_id,    :within => 1..64
 12   def self.replies(username, offset, limit)
 13      ReplyMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 14   end
 15 end
 
 

./model/guid_generator.rb
  1 class GuidGenerator
  2   @@count = 1
  3   def self.guid(prefix)
  4     @@count += 1
  5     "#{prefix}#{@@count}#{Time.now.to_i}"
  6   end
  7   def self.next_message_id
  8     guid("message_id")
  9   end
 10   def self.next_request_id
 11     guid("request_id")
 12   end
 13 end
 
 

./model/follower.rb
  1 #
  2 ####
  3 # Follower stores one-way relation between two users. However, in order to 
  4 # support sharding, we don't directly connect users instead we only store
  5 # users' usernames.
  6 ###
  7 class Follower < ActiveRecord::Base
  8     RELATION_TYPE_FOLLOWER = "Follower"
  9     RELATION_TYPE_FRIEND = "Friend"
 10     RELATION_TYPE_FAMILY = "Family"
 11     RELATION_TYPE_COLLEAGUE = "Colleague"
 12     RELATION_TYPE_ACQUAINTENCE = "Acquaintance"
 13     RELATION_TYPE_FAN = "Fan"
 14     RELATION_TYPE_OTHER = "Other"
 15 
 16     validates_presence_of       :relation_type
 17     #validates_presence_of       :blocked
 18     #validates_presence_of       :notifications
 19 
 20     def self.create_follower(follower_attrs)
 21         Follower.create!(follower_attrs) unless exists?(follower_attrs[:username], follower_attrs[:follower_username])
 22     end
 23 
 24     def self.followers(username, offset, limit)
 25         self.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit)
 26     end
 27     def self.follower_count(username)
 28         self.count(:conditions => ['username = ?', username])
 29     end
 30     def self.following_count(username)
 31         self.count(:conditions => ['follower_username = ?', username])
 32     end
 33 
 34     def self.followings(username, offset, limit)
 35         self.find(:all, :conditions => ['follower_username = ?', username], :offset => offset, :limit => limit)
 36     end
 37     def self.destroy_follower(username, follower_username)
 38         self.get_record(username, follower_username).destroy
 39     end
 40     def self.exists?(username, follower_username)
 41         !self.get_record(username, follower_username).nil?
 42     end
 43     def self.enable_notifications(username, follower_username)
 44         self.get_record(username, follower_username).update_attributes(:notifications => true)
 45     end
 46     def self.disable_notifications(username, follower_username)
 47         self.get_record(username, follower_username).update_attributes(:notifications => false)
 48     end
 49     def self.block(username, follower_username)
 50         self.get_record(username, follower_username).update_attributes(:blocked => true)
 51     end
 52     def self.unblock(username, follower_username)
 53         self.get_record(username, follower_username).update_attributes(:blocked => false)
 54     end
 55     def self.get_record(username, follower_username)
 56         self.find(:first, :conditions => ['username = ? and follower_username = ?', username, follower_username])
 57     end
 58     def to_s
 59       "#{username} -> #{follower_username}"
 60     end
 61     def to_external_xml(xml)
 62       xml.follower(:username => self.username, :follower_username => self.follower_username)
 63     end
 64 
 65 end
 
 

./model/direct_message.rb
  1 ####
  2 # Direct message is sent directly to another user
  3 ####
  4 class DirectMessage < Message
  5   def to_user(user)
  6      self.channel_name = user.username
  7   end
  8   def self.direct_messages_sent(username, offset, limit)
  9      DirectMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 10   end
 11   def self.direct_messages_received(username, offset, limit)
 12      DirectMessage.find(:all, :conditions => ['channel_name = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc')
 13   end
 14 end
 
 

Source Code Services

./service/base_service.rb
  1 class BaseService
  2 protected
  3   def initialize(username)
  4     SchemaHelper.setup_schema_for(username)
  5     
  6   end
  7 
  8   def serialize(obj, format)
  9     if obj
 10       format ||= 'xml'
 11       case format
 12         when 'xml'
 13           obj.to_xml
 14         when 'yaml'
 15           obj.to_yaml
 16         when 'json'
 17           obj.to_json
 18         else
 19           obj
 20       end
 21     end
 22   end
 23 end
 
 

./service/users_service.rb
  1 #
  2 ## UserService provides basic functionality for quering/storing users
  3 ## It assumes connection to the right database is already setup.
  4 #
  5 class UsersService < BaseService
  6   #
  7   ### initialize service
  8   #
  9   def initialize(username)
 10     super
 11   end
 12 
 13 
 14   #
 15   ### retrieves user record
 16   #
 17   def get_user(username, options={})
 18     User.find_by_username(username)
 19   end
 20 
 21   #
 22   ### return usernames of people whom the user is following.
 23   #
 24   def followings(username, options={})
 25     offset = options[:offset] || 0
 26     limit = options[:limit] || 100
 27     User.following_usernames(username, offset, limit)
 28   end
 29 
 30 
 31   #
 32   ### return usernames of followers of authenticated user
 33   #
 34   def followers(username, options={})
 35     offset = options[:offset] || 0
 36     limit = options[:limit] || 100
 37     User.follower_usernames(username, offset, limit)
 38   end
 39 
 40 
 41   def authenticate(username, password, options={})
 42     User.find_by_username_and_password(username, password)
 43   end
 44 
 45 
 46   def create_user(user_attrs, options={})
 47     user = User.new()
 48     user.attributes = user_attrs
 49     user.save!
 50     user
 51   end
 52 
 53   def archive_messages(username, options={})
 54     offset = options[:offset] || 0
 55     limit = options[:limit] || 100
 56     Message.messages_for(username, offset, limit)
 57   end
 58 end
 
 

./service/followers_service.rb
  1 #
  2 ## FollowersService provides basic functionality for creating/quering followers
  3 ## It assumes all users are already authenticated before calling these methods.
  4 #
  5 class FollowersService < BaseService
  6   #
  7   ### initialize service
  8   #
  9   def initialize(username)
 10     super
 11   end
 12   #
 13   ### create follower
 14   #
 15   def create_follower(follower_attrs, options={})
 16     Follower.create_follower(follower_attrs)
 17   end
 18 
 19   #
 20   ### enable notifications
 21   #
 22   def enable_notifications(username, follower_username, options={})
 23     Follower.enable_notifications(username, follower_username)
 24   end
 25 
 26   #
 27   ### disable notifications
 28   #
 29   def disable_notifications(username, follower_username, options={})
 30     Follower.disable_notifications(username, follower_username)
 31   end
 32 
 33 
 34   #
 35   ### blocks a user
 36   #
 37   def block(username, follower_username, options={})
 38     Follower.block(username, follower_username)
 39   end
 40 
 41   #
 42   ### unblock user
 43   #
 44   def unblock(username, follower_username, options={})
 45     Follower.unblock(username, follower_username)
 46   end
 47 
 48 
 49   #
 50   ### delete status
 51   #
 52   def destroy_follower(username, follower_username, options={})
 53     Follower.destroy_follower(username, follower_username)
 54     true
 55   rescue ActiveRecord::RecordNotFound => e
 56     false
 57   end
 58 
 59 
 60   #
 61   ### get a single status
 62   #
 63   def exists?(username, follower_username, options={})
 64     Follower.exists?(username, follower_username)
 65   end
 66 end
 
 

./service/messages_service.rb
   1 #
   2 ## MessagesService provides basic functionality for quering/storing statuses/messages
   3 ## It assumes all users are already authenticated before calling these methods.
   4 #
   5 class MessagesService < BaseService
   6   #
   7   ### initialize service
   8   #
   9   def initialize(username)
  10     super
  11   end
  12 
  13 
  14   #
  15   ### get a single status
  16   #
  17   def get_status(message_id, options={})
  18     Message.find_by_message_id(message_id)
  19   end
  20 
  21   #
  22   ### update status
  23   #
  24   def update_status(message_attrs, options={})
  25     message = Tweet.new
  26     message.attributes = message_attrs
  27     safe_save(message)
  28   end
  29 
  30   #
  31   ### delete status
  32   #
  33   def destroy_message(message_id, options={})
  34     Message.destroy_message(message_id)
  35   end
  36 
  37 
  38 
  39   #
  40   ### retrieves most recent statuses for public
  41   #
  42   def public_timeline(username, options={})
  43     offset = options[:offset] || 0
  44     limit = options[:limit] || 10
  45     User.timeline(username, offset, limit)
  46   end
  47 
  48   #
  49   ### retrieves most recent statuses for user
  50   #
  51   def user_timeline(username, options={})
  52     offset = options[:offset] || 0
  53     limit = options[:limit] || 100
  54     User.timeline(username, offset, limit)
  55   end
  56 
  57 
  58   #
  59   ### retrieves most recent statuses for friends
  60   #
  61   def friends_timeline(username, options={})
  62     offset = options[:offset]
  63     limit = options[:limit]
  64     get_friends_timeline(username, offset, limit)
  65   end
  66 
  67 
  68   #
  69   ### retrieves most recent statuses for user and his/her friends
  70   #
  71   def user_and_friends_timeline(username, options={})
  72     offset = options[:offset]
  73     limit = options[:limit]
  74     messages = User.timeline(username, offset, limit)
  75     messages += get_friends_timeline(username, offset, limit)
  76   end
  77 
  78 private
  79 
  80   #
  81   ### retrieves most recent statuses for friends without any conversion
  82   #
  83   def get_friends_timeline(username, offset, limit)
  84     offset = offset || 0
  85     following_count = Follower.following_count(username)
  86     limit = limit || (following_count < 100 ? following_count / 100 + 1 : 1)
  87 
  88     following = User.following_usernames(username, 0, 200)
  89     messages = []
  90     following.each do |username|
  91       messages += Message.messages_for(username, offset, limit)
  92     end
  93     messages
  94   end
  95 protected
  96   def safe_save(message)
  97     old_message = Message.find_by_message_id(message.message_id)
  98     if old_message.nil?
  99        message.save!
 100        message
 101     else
 102        puts "Message with id #{message.message_id} already exists #{message.inspect}" 
 103        nil
 104     end
 105   end
 106 end
 
 

./service/direct_messages_service.rb
  1 #
  2 ## DirectMessagesService provides functionality for sending/receiving direct messages
  3 #
  4 class DirectMessagesService < MessagesService
  5   #
  6   ### initialize service
  7   #
  8   def initialize(username)
  9     super
 10   end
 11   #
 12   ### sents back 20 most recent direct messages for the user
 13   ### it assumes user is already authenticated.
 14   #
 15   def direct_messages_received(username, options={})
 16     offset = options[:offset] || 0
 17     limit = options[:limit] || 20
 18     DirectMessage.direct_messages_received(username, offset, limit)
 19   end
 20   #
 21   ### sents back 20 most recent direct messages for the user
 22   ### it assumes user is already authenticated.
 23   #
 24   def direct_messages_sent(username, options={})
 25     offset = options[:offset] || 0
 26     limit = options[:limit] || 20
 27     DirectMessage.direct_messages_sent(username, offset, limit)
 28   end
 29 
 30   #
 31   ### send direct message
 32   #
 33   def send_direct_message(message_attrs, options={})
 34     message = DirectMessage.new()
 35     message.attributes = message_attrs
 36     safe_save(message)
 37   end
 38 
 39   #
 40   ### send reply message
 41   #
 42   def send_reply(message_attrs, options={})
 43     message = ReplyMessage.new()
 44     message.attributes = message_attrs
 45     safe_save(message)
 46   end
 47 
 48   #
 49   ### sents back 20 most recent replies for the user
 50   ### it assumes user is already authenticated.
 51   #
 52   def replies(username, options={})
 53     offset = options[:offset] || 0
 54     limit = options[:limit] || 100
 55     ReplyMessage.replies(username, offset, limit)
 56   end
 57 
 58 
 59   #
 60   ### delete direct message
 61   #
 62   def destroy_direct_message(id, options={})
 63     DirectMessage.destroy_message(id)
 64   end
 65 end
 
 

Source Code Business Delegate

./delegate/base_delegate.rb
  1 #
  2 ## BaseDelegate provides client side interface for connecting to the business 
  3 ## service using BusinessDelegate pattern.
  4 #
  5 class BaseDelegate
  6 protected
  7   def initialize(jms_helper, read_queue_name, write_queue_name)
  8      @read_queue_name = read_queue_name
  9      @write_queue_name = write_queue_name
 10      @jms_helper = jms_helper 
 11      @jms_helper.create_producers(@read_queue_name, @write_queue_name)
 12   end
 13 
 14 
 15   def new_jms_message(properties={})
 16     jms_msg = @jms_helper.create_message("")    ### ActiveMQTextMessage.new
 17     jms_msg.setStringProperty('request_id', GuidGenerator.next_request_id)
 18     properties.each do |name, value|
 19         jms_msg.setStringProperty(name.to_s, value.to_s)
 20     end
 21     jms_msg
 22   end
 23 
 24 
 25   def send_read_request(jms_msg)
 26     @jms_helper.send_message(@read_queue_name, jms_msg, true).text
 27   end
 28 
 29 
 30   def send_write_request(jms_msg, reply=true)
 31     response =  @jms_helper.send_message(@write_queue_name, jms_msg, reply)
 32     if response.respond_to? :text
 33         response.text
 34     else
 35         response
 36     end
 37   end
 38 end
 
 
./delegate/users_delegate.rb
  1 #
  2 ## UserDelegate provides basic functionality for quering/storing users
  3 ## It assumes connection to the right database is already setup.
  4 #
  5 class UsersDelegate < BaseDelegate
  6   def initialize(jms_helper)
  7     super(jms_helper, JmsHelper::QUEUE_READ_USERS, JmsHelper::QUEUE_WRITE_USERS)
  8   end
  9   def create_user(user_attrs, options={})
 10       args = options.merge(user_attrs).merge(:action => 'Users.create_user')
 11       jms_msg = new_jms_message(args)
 12       send_write_request(jms_msg)
 13       
 14   end
 15 
 16   #
 17   ### retrieves user record
 18   #
 19   def get_user(username, options={})
 20       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.get_user'))
 21       send_read_request(jms_msg)
 22       
 23   end
 24 
 25   #
 26   ### return usernames of people the user is following
 27   #
 28   def followings(username, options={})
 29       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followings'))
 30       send_read_request(jms_msg)
 31       
 32   end
 33 
 34 
 35   #
 36   ### return 100 usernames of followers of authenticated user
 37   #
 38   def followers(username, options={})
 39       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followers'))
 40       send_read_request(jms_msg)
 41       
 42   end
 43 
 44 
 45   def authenticate(username, password, options={})
 46       jms_msg = new_jms_message(options.merge(:username => username, :password => password, :action => 'Users.authenticate'))
 47       send_read_request(jms_msg)
 48       
 49   end
 50 
 51 
 52   def archive_messages(username, options={})
 53       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.archive_messages'))
 54       send_read_request(jms_msg)
 55       
 56   end
 57 end
 
 

./delegate/messages_delegate.rb
  1 #
  2 ## MessagesDelegate provides basic functionality for quering/storing statuses/messages
  3 ## It assumes all users are already authenticated before calling these methods.
  4 #
  5 class MessagesDelegate < BaseDelegate
  6   def initialize(jms_helper)
  7     super(jms_helper, JmsHelper::QUEUE_READ_MESSAGES, JmsHelper::QUEUE_WRITE_MESSAGES)
  8   end
  9   #
 10   ### get a single status
 11   #
 12   def get_status(message_id, options={})
 13       jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.get_status'))
 14       send_read_request(jms_msg)
 15       
 16   end
 17 
 18   #
 19   ### update status
 20   #
 21   def update_status(message_attrs, options={})
 22       jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'Messages.update_status'}.merge(options).merge(message_attrs))
 23       send_write_request(jms_msg)
 24       
 25   end
 26 
 27   #
 28   ### delete status
 29   #
 30   def destroy_message(message_id, options={})
 31       jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.destroy_message'))
 32       send_write_request(jms_msg)
 33       
 34   end
 35 
 36 
 37 
 38   #
 39   ### retrieves most recent statuses for public
 40   #
 41   def public_timeline(username, options={})
 42       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.public_timeline'))
 43       send_read_request(jms_msg)
 44       
 45   end
 46 
 47   #
 48   ### retrieves most recent statuses for user
 49   #
 50   def user_timeline(username, options={})
 51       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_timeline'))
 52       send_read_request(jms_msg)
 53       
 54   end
 55 
 56 
 57   #
 58   ### retrieves most recent statuses for friends
 59   #
 60   def friends_timeline(username, options={})
 61       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.friends_timeline'))
 62       send_read_request(jms_msg)
 63       
 64   end
 65 
 66 
 67   #
 68   ### retrieves most recent statuses for user and his/her friends
 69   #
 70   def user_and_friends_timeline(username, options={})
 71       jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_and_friends_timeline'))
 72       send_read_request(jms_msg)
 73       
 74   end
 75 end
 
 

./delegate/followers_delegate.rb
  1 #
  2 ## FollowersDelegate provides basic functionality for creating/quering followers
  3 ## It assumes all users are already authenticated before calling these methods.
  4 #
  5 class FollowersDelegate < BaseDelegate
  6   def initialize(jms_helper)
  7     super(jms_helper, JmsHelper::QUEUE_READ_FOLLOWERS, JmsHelper::QUEUE_WRITE_FOLLOWERS)
  8   end
  9   #
 10   ### create follower
 11   #
 12   def create_follower(follower_attrs, options={})
 13       jms_msg = new_jms_message(options.merge(follower_attrs).merge(:action => 'Followers.create_follower'))
 14       send_write_request(jms_msg)
 15       
 16   end
 17 
 18   #
 19   ### enable notifications
 20   #
 21   def enable_notifications(username, follower_username, options={})
 22       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.enable_notifications'))
 23       send_write_request(jms_msg)
 24       
 25   end
 26 
 27   #
 28   ### disable notifications
 29   #
 30   def disable_notifications(username, follower_username, options={})
 31       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.disable_notifications'))
 32       send_write_request(jms_msg)
 33       
 34   end
 35 
 36 
 37   #
 38   ### blocks a user
 39   #
 40   def block(username, follower_username, options={})
 41       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.block'))
 42       send_write_request(jms_msg)
 43       
 44   end
 45 
 46   #
 47   ### unblock user
 48   #
 49   def unblock(username, follower_username, options={})
 50       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.unblock'))
 51       send_write_request(jms_msg)
 52       
 53   end
 54 
 55 
 56   #
 57   ### delete status
 58   #
 59   def destroy_follower(username, follower_username, options={})
 60       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.destroy_follower'))
 61       send_write_request(jms_msg)
 62       
 63   end
 64 
 65 
 66   #
 67   ### get a single status
 68   #
 69   def exists?(username, follower_username, options={})
 70       jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.exists?'))
 71       send_read_request(jms_msg)
 72       
 73   end
 74 end
 
 

./delegate/direct_messages_delegate.rb
  1 #
  2 ## DirectMessagesDelegate provides functionality for sending/receiving direct messages
  3 #
  4 class DirectMessagesDelegate < MessagesDelegate
  5   def initialize(jms_helper)
  6     super(jms_helper)
  7   end
  8 
  9   #
 10   ### sents back 20 most recent direct messages for the user
 11   ### it assumes user is already authenticated.
 12   #
 13   def direct_messages_received(username, options={})
 14       jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_received'))
 15       send_read_request(jms_msg)
 16       
 17   end
 18   #
 19   ### sents back 20 most recent direct messages for the user
 20   ### it assumes user is already authenticated.
 21   #
 22   def direct_messages_sent(username, options={})
 23       jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_sent'))
 24       send_read_request(jms_msg)
 25       
 26   end
 27 
 28   #
 29   ### send direct message
 30   #
 31   def send_direct_message(message_attrs, options={})
 32       jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_direct_message'}.merge(options).merge(message_attrs))
 33       send_write_request(jms_msg)
 34       
 35   end
 36 
 37   #
 38   ### send reply message
 39   #
 40   def send_reply(message_attrs, options={})
 41       jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_reply'}.merge(options).merge(message_attrs))
 42       send_write_request(jms_msg)
 43       
 44   end
 45 
 46   #
 47   ### sents back 20 most recent replies for the user
 48   ### it assumes user is already authenticated.
 49   #
 50   def replies(username, options={})
 51       jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.replies'))
 52       send_read_request(jms_msg)
 53       
 54   end
 55 
 56 
 57   #
 58   ### delete direct message
 59   #
 60   def destroy_direct_message(message_id, options={})
 61       jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'DirectMessages.destroy_direct_message'))
 62       send_write_request(jms_msg)
 63       
 64   end
 65 end
 
 

Source Code Message Handlers

./messaging/base_handler.rb
   1 #
   2 ## BaseHandler provides common functionality for handling messages
   3 #
   4 class BaseHandler 
   5   attr_reader :username
   6   attr_reader :message_text
   7   attr_reader :timings
   8   def initialize
   9     @timings = []
  10     @started_at = Time.now
  11     @host_id = Digest::MD5.hexdigest("localhost")       #lookup real host
  12   end
  13 
  14   def handle(jms_msg, options={})
  15     add_timing_begin
  16     msg_attrs = {}
  17     pnames = jms_msg.getPropertyNames() 
  18     for pname in pnames
  19         msg_attrs[pname] = msg_attrs[pname.to_sym] = jms_msg.getObjectProperty(pname)
  20         case pname 
  21         when 'username'
  22           @username = msg_attrs[pname]
  23         when 'auth_user'
  24           @auth_user = msg_attrs[pname]
  25         when 'request_id'
  26           @request_id = msg_attrs[pname]
  27         end
  28     end
  29     if @username.nil?
  30         raise "username missing in #{jms_msg.inspect} options #{options.inspect}"
  31     end
  32     if jms_msg.respond_to? :text
  33         #msg_attrs.merge!(xml_to_dict(jms_msg.text)) 
  34         msg_attrs[:message_text] = jms_msg.text
  35     end
  36     ##
  37     add_timing_for_input_xml
  38     response = do_handle(msg_attrs)
  39     ##
  40     add_timing_end
  41     response
  42   rescue ActiveRecord::RecordInvalid => invalid
  43     response = xml_error("400", "handle", invalid.record.errors)
  44     add_timing_end
  45     response
  46   rescue java.sql.SQLException => e
  47     puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n"
  48     while (e) 
  49       puts e.backtrace << "\n\n"
  50       e = e.getNextException() 
  51     end
  52     response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s})
  53     add_timing_end
  54     response
  55   rescue Exception => e
  56     puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n#{e.backtrace}"
  57     response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s})
  58     add_timing_end
  59     response
  60   end
  61 
  62 protected
  63   def username_for_service
  64     @auth_user || @username
  65   end
  66 
  67   def add_timing(what)
  68     @timings << {what, (Time.now.to_i-@started_at.to_i)}
  69   end
  70   def add_timing_begin
  71     add_timing "#{self.class.name} begin"
  72   end
  73   def add_timing_end
  74     add_timing "#{self.class.name} end"
  75   end
  76   def add_timing_for_db
  77     add_timing "#{self.class.name} connected to db"
  78   end
  79   def add_timing_for_output_xml
  80     add_timing "#{self.class.name} converting output xml"
  81   end
  82   def add_timing_for_input_xml
  83     add_timing "#{self.class.name} converting input xml"
  84   end
  85 
  86   def do_handle(msg_attrs, options={})
  87     raise "Implement handle"
  88   end
  89   def xml_to_dict(xml)
  90     Hash.from_xml(xml)
  91   end
  92   def xml_error(response_code, action, errors, options={})
  93     #logger.error "Failed to #{action} due to #{error_code} by #{username} -- #{error_message}"
  94     to_xml(response_code, "Error response for #{action}", options) do |xml|
  95       xml.errors do 
  96         errors.each do |error_code, error_message|
  97           xml.error(:error_code => error_code, :error_message => error_message)
  98         end
  99       end
 100     end
 101   end
 102 
 103   def to_xml(response_code, comment="", options={})
 104     buffer = options[:buffer] ||= ''
 105 
 106     xml = options[:builder] ||= Builder::XmlMarkup.new(:target => buffer, :indent => options[:indent])
 107     xml.instruct! unless options[:skip_instruct]
 108     xml.comment! "Response #{comment} in reply for #{username} as of #{Time.now.utc}"
 109     xml.response(:request_id => @request_id, :response_code => response_code, :host_id => @host_id, :version => '1.0') do
 110       yield xml if block_given?
 111       xml.timings do
 112          @timings.each do |what, duration|
 113             xml.timing(:what => what, :duration_millis => duration)
 114          end
 115       end
 116     end
 117     buffer
 118   end
 119 end
 
 

./messaging/archive_messages_handler.rb
  1 #
  2 ## ArchiveMessagesHandler sends back archive messages for the user
  3 #
  4 class ArchiveMessagesHandler < BaseHandler
  5 protected
  6   def do_handle(msg_attrs, options={})
  7     svc = UsersService.new(self.username_for_service)
  8     add_timing_for_db
  9     messages = svc.archive_messages(self.username, options)
 10     add_timing_for_output_xml
 11     to_xml("200", "Archived Messages", options) do |xml|
 12       xml.messages do
 13          messages.each do |message|
 14            message.to_external_xml(xml)
 15          end
 16       end
 17     end
 18   end
 19 end
 
 

./messaging/authenticate_handler.rb
  1 ## AuthenticateHandler authenticates user
  2 #
  3 class AuthenticateHandler < BaseHandler
  4 protected
  5   def do_handle(msg_attrs, options={})
  6     password = msg_attrs[:password]
  7     svc = UsersService.new(self.username_for_service)
  8     add_timing_for_db
  9     user = svc.authenticate(username, password, options)
 10     add_timing_for_output_xml
 11     if user
 12       to_xml("200", "Authentication", options) do |xml|
 13          user.to_external_xml(xml)
 14       end
 15     else
 16       xml_error("401", "Authentication", {"AUTH_FAILURE" => "Invalid username/password"})
 17     end
 18   end
 19 end
 
 

./messaging/block_follower_handler.rb
  1 ## BlockFollowerHandler blocks follower
  2 #
  3 class BlockFollowerHandler < BaseHandler
  4 protected
  5   def do_handle(msg_attrs, options={})
  6     svc = FollowersService.new(self.username_for_service)
  7     follower_username = msg_attrs[:follower_username]
  8     add_timing_for_db
  9     blocked = svc.block(self.username, follower_username, options)
 10     response_code = blocked ? "200" : "404"
 11     add_timing_for_output_xml
 12     to_xml(response_code, "Block Follower", options) do |xml|
 13       xml.follower(:username => self.username, :follower_username => follower_username)
 14     end
 15   end
 16 end
 
 

Messaging Helpers

./messaging/message_forwarder.rb
  1 class MessageForwarder
  2   include java.lang.Runnable
  3   include javax.jms.MessageListener
  4 
  5   def initialize(jms_helper, queue_name, handlers)
  6     @jms_helper = jms_helper
  7     @queue_name = queue_name
  8     @handlers = handlers
  9   end
 10 
 11   def run
 12     @consumer = @jms_helper.get_consumer(@queue_name)
 13     @consumer.set_message_listener(self);
 14     puts "Starting listener for queue #{@queue_name}"
 15   end
 16 
 17   def onMessage(jms_msg)
 18     action = jms_msg.getProperty('action')
 19     if action.nil?
 20        puts "Unknown message #{jms_msg.inspect}"
 21     else
 22        handler = @handlers[action]
 23        if handler.nil?
 24           puts "No handler for #{action} -- #{jms_msg.inspect}"
 25        else
 26           
 27           response = handler.new().handle(jms_msg)
 28           
 29           if jms_msg.getJMSReplyTo()
 30              reply_queue = jms_msg.getJMSReplyTo()
 31              reply_producer = @jms_helper.create_producer(reply_queue)
 32              reply_message = @jms_helper.create_message(response)
 33              reply_message.setJMSCorrelationID(jms_msg.getJMSMessageID())
 34              reply_producer.send(reply_message)
 35           end
 36        end
 37     end
 38   end
 39 
 40   def close
 41     @consumer.close();
 42   end
 43 end
 
 

./messaging/jms_helper.rb
   1 
   2 class JmsHelper
   3   #Object.module_eval("::#{i}")
   4   #http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS6.html
   5   #
   6   ### A key tip for scalability is to separate your reads from writes that way you can scale them independently and also
   7   ### offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.
   8   ###
   9   #
  10     QUEUE_READ_MESSAGES = 'read_messages'
  11     QUEUE_WRITE_MESSAGES = 'write_messages'
  12     QUEUE_READ_FOLLOWERS = 'read_followers'
  13     QUEUE_WRITE_FOLLOWERS = 'write_followers'
  14     QUEUE_READ_USERS = 'read_users'
  15     QUEUE_WRITE_USERS = 'write_users'
  16 
  17     READ_MESSAGES_HANDLERS = {
  18 	'DirectMessages.direct_messages_received' => DirectMessagesReceivedHandler,
  19 	'DirectMessages.direct_messages_sent' => DirectMessagesSentHandler,
  20 	'DirectMessages.replies' => RepliesHandler,
  21 	'Messages.get_status' => GetStatusHandler,
  22 	'Messages.public_timeline' => PublicTimelineHandler,
  23 	'Messages.user_timeline' => UserTimelineHandler,
  24 	'Messages.friends_timeline' => FriendsTimelineHandler,
  25 	'Messages.user_and_friends_timeline' => UserFriendTimelineHandler,
  26     }
  27     WRITE_MESSAGES_HANDLERS = {
  28 	'DirectMessages.send_direct_message' => SendDirectMessageHandler,
  29 	'DirectMessages.send_reply' => SendReplyMessageHandler,
  30 	'DirectMessages.destroy_direct_message' => DestroyDirectMessageHandler,
  31 	'Messages.update_status' => UpdateStatusHandler,
  32 	'Messages.destroy_message' => DestroyMessageHandler,
  33     }
  34     READ_FOLLOWERS_HANDLERS = {
  35 	'Followers.exists?' => FollowerExistsHandler,
  36     }
  37     WRITE_FOLLOWERS_HANDLERS = {
  38 	'Followers.create_follower' => CreateFollowerHandler,
  39 	'Followers.enable_notifications' => EnableNotificationsHandler,
  40 	'Followers.disable_notifications' => DisableNotificationsHandler,
  41 	'Followers.block' => BlockFollowerHandler,
  42 	'Followers.unblock' => UnblockFollowerHandler,
  43 	'Followers.destroy_follower' => DestroyFollowerHandler,
  44     }
  45     READ_USERS_HANDLERS = {
  46 	'Users.get_user' => GetUserHandler,
  47 	'Users.followings' => FollowingsHandler,
  48 	'Users.followers' => FollowersHandler,
  49 	'Users.authenticate' => AuthenticateHandler,
  50 	'Users.archive_messages' => ArchiveMessagesHandler,
  51     }
  52     WRITE_USERS_HANDLERS = {
  53 	'Users.create_user' => CreateUserHandler,
  54     }
  55 
  56   def initialize(connection=nil)
  57     if connection
  58        @connection = connection
  59     else
  60        factory = ActiveMQConnectionFactory.new("tcp://localhost:61616")
  61        @connection = factory.create_connection()
  62        @connection.set_exception_listener(self)
  63        @connection.start();
  64     end
  65     @queues = {}
  66     @producers = {}
  67     @consumers = {}
  68   end
  69 
  70   def start_consumers
  71     ### Creating another instance of jms helper because it keeps private session which cannot be shared
  72     ### across threads, though it can share connection.
  73     start_forwarder(QUEUE_READ_MESSAGES, READ_MESSAGES_HANDLERS)
  74     start_forwarder(QUEUE_WRITE_MESSAGES, WRITE_MESSAGES_HANDLERS)
  75     start_forwarder(QUEUE_READ_FOLLOWERS, READ_FOLLOWERS_HANDLERS)
  76     start_forwarder(QUEUE_WRITE_FOLLOWERS, WRITE_FOLLOWERS_HANDLERS)
  77     start_forwarder(QUEUE_READ_USERS, READ_USERS_HANDLERS)
  78     start_forwarder(QUEUE_WRITE_USERS, WRITE_USERS_HANDLERS)
  79     #
  80     java.lang.Thread.currentThread().join
  81   end
  82   
  83   def onException(jmsEx)
  84     puts "JMS Exception occured #{jmsEx.inspect}, shutting down"
  85   end
  86 
  87   def close
  88     @connection.close()
  89     @session.close if @session
  90   end
  91 
  92   def create_message(text)
  93     get_session().createTextMessage(text)
  94   end
  95 
  96   def send_message(queue_name, jms_msg, reply=false)
  97     ## TODO add error_queue/dead letter queue
  98     if reply
  99       reply_queue = get_session().createTemporaryQueue() 
 100       jms_msg.setJMSReplyTo(reply_queue)
 101     end
 102     
 103     get_producer(queue_name).send(get_queue(queue_name), jms_msg)
 104     if reply_queue 
 105         reply_consumer = get_session().createConsumer(reply_queue)
 106         reply_consumer.receive()
 107     end
 108   end
 109 
 110   def get_producer(queue_name)
 111       producer = @producers[queue_name]
 112       if producer.nil?
 113           producer = @producers[queue_name] = get_session().createProducer(get_queue(queue_name))
 114           #producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)     # this can be changed for write queues
 115           #producer.setTimeToLive(60000)
 116       end
 117       producer
 118   end
 119   def get_queue(queue_name)
 120      queue = @queues[queue_name]
 121      if queue.nil?
 122         queue = @queues[queue_name] = get_session().create_queue(queue_name)
 123      end
 124      queue
 125   end
 126 
 127   def get_consumer(queue_name)
 128       consumer = @consumers[queue_name]
 129       if consumer.nil?
 130           consumer = @consumers[queue_name] = get_session().create_consumer(get_queue(queue_name))
 131       end
 132       consumer
 133   end
 134 
 135   def create_producers(*queue_names)
 136     for queue_name in queue_names
 137       queue = get_queue(queue_name)
 138       producer = get_producer(queue_name)
 139     end
 140   end
 141 
 142   def create_producer(queue)
 143      get_session().createProducer(queue)
 144   end
 145 
 146   def self.start_broker
 147     broker = BrokerService.new()
 148     broker.addConnector("tcp://localhost:61616")
 149     broker.start()
 150     sleep 2    # let broker startup
 151   end
 152 protected    
 153   def get_session
 154     @session ||= @connection.create_session(false, Session::AUTO_ACKNOWLEDGE)
 155   end
 156   def start_forwarder(queue_name, handlers)
 157     raise "null connection" if @connection.nil?
 158     jms_helper = JmsHelper.new(@connection)
 159     Thread.new(){MessageForwarder.new(jms_helper, queue_name, handlers).run}
 160   end
 161 end
 162 
 163 if $PROGRAM_NAME == __FILE__
 164   JmsHelper.start_broker
 165   JmsHelper.new.start_consumers
 166 end
 
 

I am not showing all message handlers because they look pretty much the same.

Database

./db/schema_helper.rb
  1 require 'rubygems'
  2 require 'un'
  3 require 'active_record'
  4 require 'create_followers'
  5 require 'create_messages'
  6 require 'create_users'
  7 
  8 if defined?(JRUBY_VERSION)
  9   gem 'activerecord-jdbc-adapter'
 10   require 'jdbc_adapter'
 11   require 'java'
 12 end
 13 
 14 
 15 class SchemaHelper
 16   MAX_DB = 10
 17   #
 18   def self.create_schema()
 19     for i in 0 ... MAX_DB
 20        connect_to(i)
 21     end
 22   end
 23 
 24   def self.hash_for(username)
 25      hash = username.hash % MAX_DB
 26   end
 27 
 28   def self.setup_schema_for(username)
 29      hash = hash_for(username)
 30      connect_to(hash)
 31   end
 32 
 33   def self.connect_to(hash)
 34      dbdir = "#{FileUtils.pwd}/data/db#{hash}"
 35      dbfile = "#{dbdir}/database.sqlite"
 36      #ActiveRecord::Base.establish_connection(:adapter => "sqlite", :dbfile => dbfile)
 37      ActiveRecord::Base.establish_connection(
 38                 :adapter => "jdbc", 
 39                 :dbfile => dbfile,
 40                 #:driver => "SQLite.JDBCDriver",         ####:driver => "org.sqlite.JDBC", 
 41                 #:url => "jdbc:sqlite:#{dbfile}"
 42                 :driver => "org.apache.derby.jdbc.EmbeddedDriver",
 43                 :url => "jdbc:derby:#{dbdir};create=true"
 44         )
 45      unless File.directory?(dbdir)
 46         
 47         
 48         CreateUsers.migrate(:up)
 49         CreateMessages.migrate(:up)
 50         CreateFollowers.migrate(:up)
 51      end
 52   end
 53   def self.test
 54      SchemaHelper.setup_schema_for("shahbhat")
 55      require 'user'
 56      for i in 0 ... 10 do
 57        SchemaHelper.connect_to(i)
 58        puts User.count
 59      end
 60   end
 61 end
 
 

May 30, 2008

Challenges of multicore programming

Filed under: Computing — admin @ 12:28 pm

The multicore processors have put parallel and concurrent programming at the forefront. In A Fundamental Turn Toward Concurrency in Software article in Dr Dobb’s warned programmers that the free lunch is over. This has spurred ongoing debates about future languages and how they will rescue software development. Here are a few features that are being postulated as the panacea of multicores:

Multi-threading

Java, C++, C# camp has had support for native threads for a while and they claim that these native threads will allow programmers to take advantage of the multitude of cores. After having done multi-threading for over ten years, I must admit multi-threading is not easy. Concurrent programming based on threads and locks is error prone and can lead to nasty problems of deadlocks, starvation, idle spinning, etc. As number of cores reaches thousands or millions, the shared memory will become single point of bottleneck.

Software Transactional Memory

Languages like Haskell and Clojure are adding support for STM, which treat memory like database and use optimistic transactions. Instead of locking, each thread can change any data, but when it commits it verifies the data has not been changed and retries transaction if the data is changed. This area is relative new, but resemebles more like shared memory so it probably will face same scalability issues.

Actor Based Model with Message Passing

I learned about Actor based programming from reading Agha Gul’s papers in school. In this model each process owns a private mailbox and processes communicate to each other by sending messages to each other. Languages like Erlang and more recently Scala use Actor based model for parallel programming. This model is very scalable because there is no locking involved. Also, the messages are immutable in Erlang (though may not be in Scala), so data corruption is not likely.The only drawback of this model is that spliting your application or algorithm into independent processes that communicate via message passing is not trivial.

TupleSpace based model

In tuple space, processes communicate to each other by storing messages in tuple spaces. The tuple space provides simple APIs like get, put, read and eval. Where get/read are blocking operations to implement dataflow based application and eval creates a new process to execute a task and store result in the tuple space. I built a similar system called JavaNOW for my Ph.D. project. Though, there are a number of open source and commercial frameworks availeble such as JavaSpaces, GigaSpaces, Blitz.

Fork/Join

This is standard pattern from grid computing, also known as Master-Slave, Leader/Follower, JobJar, and somewhat similar to Map/Reduce where a master process adds work in a queue and workers pick up the work and store result in another queue and master process picks up the result. It also uses other constructs of concurrent/parallel programming such as barriers, futures, etc. There are plenty of libraries and frameworks available for this such as Globus, Hadoop, MPI/MPL, PVM. Java 7 will also have Fork/Join framework.

Messaging Middleware(ESB)

Messaging middlewares allow another way to build actor based model where a thread listens on queue and other threads/processes communicate via pub/sub. Java’s JMS has a lot of support for this and frameworks like Mule, Camel, ServiceMix can help build applications that use SEDA (Staged Event-driven Architecture). Though, this is more prominent in service oriented architectures but there is not reason why it can’t be used for leveraging multicores.

Compiler based parallelism

High performance Fortran or Parallel Fortran use data flow analysis to create parallel applications. Though, this might be simplest solution but often the code generated is not as efficient as hand coded algorithm.

Conclusion

Though, parallel and concurrent programming is new model for vast majority of programmers, but this problem is not new to the high performance computing area. They have already tackled this problem and know what works and what does not work. In early 90s, I worked on Silicon Graphics machine that built large computers based on NUMA architecture and provided shared memory model for computing. Due to inherent nature of shared memory, they were not as scalable. As opposed to those, IBM built SP1 and SP2 systems that used messaging passing based APIs (MPL similar to MPI), where you could add as many nodes as you need. These systems were much more scalable. These two different systems quite nicely show difference between shared based model and messaging based model of programming. A key advantage of message passing is that it avoids any kind of locking. This is the reason I like Erlang because it supports immutability and efficient message passing. However, biggest challenge I found with parallel programming was to breaking an algorithm or application down to smaller parts that run in parallel. Amdahl’s law shows that speedup in an application from multiple processors is limited by its serial nature. Donuld Knuth in recent interview pointed that vast majority of the programs are serial in nature. In the end, there are no simple solutions in the horizon, though there are some proven techniques from HPC and functional langages that can help.

May 23, 2008

Chasing the bright lights

Filed under: Computing — admin @ 10:59 am

I have been IT enthusiast and professional for over twenty years and I consider my self to be “Innovators” type when it comes to technology and programming. I have seen a number of changes over the years. One of my manager used to say that we like to chase bright lights. Unfortunately, many of the trends die off naturally or fail to cross the chasm. Here are some of those things that I chased that died off or faded away:

Mainframe

I worked on a mainframes for a couple of years early in my career and did programming in COBOL and CICS. Though, mainframes are not quite dead, but I am actually glad that they have faded away.

VAX/OpenVMS

I also worked on VAX/VMS and OpenVMS systems, they were rock solid and I am a bit disappointed that they could not evolve.

PASCAL/ICON/PROLOG/FORTRAN/BASIC

BASIC was my first programming language, but hasn’t used it since early DOS days. I learned PASCAL in college and found it better than C, but in real life didn’t see a lot of usage. I also learned ICON and PROLOG, but didn’t find any real use and have not used FORTRAN since old VAX days.

NUMA based servers

In early 90s, Silicon Graphics built very powerful machines based on NUMA architecture, that gave shared memory model of programming on a number of processors. Unfortunately, they had some limits to how big they could become and not to mention all the locking slowed down shared memory access. Around the same time, IBM SP2 built systems based on message passing (MPL), which were a lot more scalable. These two programming models are now coming to the front row as multicore programming is becoming essential. I get to play both of these systems at Fermilab, Metromail, TransUnion and Argonne Lab. I am sure, lessons from these early models will not be lost and message passing based programming will win.

PowerPC NT/Solaris/AIX

Back in 94-95, Motorola created these PowerPC machines that could run NT or Solaris and I thought they were pretty cool. So, I spent all my savings and bought one. Unfortunately, Sun abandoned Solaris soon after and Microsoft did same. I finally got AIX to run on it, but it just didn’t go anywhere.

BeOS

Back when Apple was looking for next generation operating systems for Macs, they seriously considered BeOs, which was pretty cool. I played with it and bought a few books to program in it. Unfortunately, Steve Jobs went with his NextStep system and BeOS just faded away.

Java Ring

In early days of Java, Sun announced huge support for Smart cards, which came with strong cryptography and small memory. I spent several hundred dollars and bought SDK kits, Java rings and smart cards from ibutton.com. This too just didn’t cross the chasm and died off.

CORBA

I did a lot of CORBA in 90s, which I thought was a lot better than socket based networking I did before that. They had a lot of limitations and despite having standards, there was very hard to integrate. Now, they have moved out of the limelight.

Voyager

Voyager was an ORB from ObjectSpace that had very nice features for agent based computing. It also had nice concepts like Facets or dynamic composition. It inspired me to built my own ORB “JavaNOW” that supported seamless networking and agent based computing. It too failed to cross the chasm and died off.

EJB

One of the difficult thing with CORBA was maintenance of servers, because each server ran in its own process. I had to write pretty elaborated monitoring and liefcycle system to start these servers in right order and restart if they fail. I thought, application servers like Weblogic and Websphere solved that problem. A lot of people who were not familiar with the distributed systems tried to use EJBs as local methods and failed miserably. I built proper value objects before there was pattern after them and used EJBGEN to create all stubs. I actuallyI don’t miss the elaborate programming, but still see need for application servers to host services.

MDA/MDD/Software Factories

In early 2000, I was very interested in model driven architecture and development and thought that they may improve software development process. Though, I had seen failure of CASE tools in early 90s, but I thought these techniques were better. I still hope better generative and metaprogramming help cut some of the development cost.

Aspect Oriented Programming

I leared about AOP in 90s when I was looking for some PhD projects, it became popular in early 2000. Now, it too has been faded away.

Methodologies/UML

Agile methodologies have killed a number of methodologies like Rational Unified Process (RUP), Catalyst, ICONIX, UML modeling, etc. I never liked heavy weight processes, but do see value of some high level architecture and modeling.

May 20, 2008

Rebooting philosophy in Erlang

Filed under: Erlang — admin @ 10:49 am

I just read “Let It Crash” Programming, which talks about how Erlang is designed as a fault tolerant language from ground up. I have been learning Erlang since Joe Armstrong’s book came out and have heard Joe a few times talk about fault tolerance. Steve Vionski has also talked about Erlang: It.s About Reliability in flame war between him and Ted Neward. For me, Erlang reminds of Microsoft Windows, i.e. when Windows stops working I just reboot the machine. Erlang does the same thing, when some process fails, it just restarts the processes. About sixteen years ago, I started my career in old VAX, Mainframe and UNIX environments and my managers used to say that he never had to restart Mainframe if something fails, but somehow bugs on Windows get fixed after reboot. When I worked at Fermilab in mid 90s, we had server farms of hundreds of machines and fault tolerance was quite important. Though, Google didn’t invent server farms, but it scaled them to new level, where failure of machines don’t stop the entire application. Erlang takes the same philosophy to the programming language. Obviously, in order to make truly fault tolerant application, the Erlang processes will need to be spawned on separate machines. Erlang’s support of CSP style communication and distributed computing such as OTP makes it trivial. You can further increase fault tolerance and high availibility by using machines on separate racks, networks, power sources or data centers. No wonder, Facebook is using Erlang in its Chat application.

May 19, 2008

Sprint like Hell!

Filed under: Methodologies — admin @ 9:44 pm

I think Sprint is wrong metaphore for iteration, because managers seem to think that it means developers will run like hell to finish the work. Though, my project has adopted Scrum recently, but it is far from the true practices, principles and values of agility. For past many months, we have been developing like hell and as a result our quality has suffered.

« Newer PostsOlder Posts »

Powered by WordPress