In our daily scrum meeting at work last week, we identified one of application that had been crashing often. The application was pretty simple web application written in Java using Spring-MVC, Hibernate, Spring and Apache-commons libraries. However, the developer who released it embedded that application in another much bigger web application in order to save some deployment issues, which can be real pain at amazon. So, I volunteered to address the issue and rewrite the application in JRuby on Rails (2.1). It took me a day to rewrite all application and migrate all data from the old application. Along the way, I also added a couple of some additional features. The original application took a couple of weeks to develop and was consisted of over 6500 lines of code. The ruby version consisted a little less than 800 lines of code. I have been using Ruby on my own since 2004 and started learning Rails in 2005. Though, I started using Rails a bit more on some side projects after I attended Rails conference of 2006. I still use Java for most of the work, and was delighted to show speed of development with Rails at work. Since, the deployment issues are still pain in Rails so I chose JRuby that allows Rails apps to deploy on Tomcat, which we are using for most of the projects. Hopefuly, that will fix the stability issues and save me from pagers as I will be oncall next week.
September 6, 2008
August 27, 2008
Implementing HTTP Proxy Service with XSL Transformation
Recently I had to implement a portal that embedded contents from other web applications. I tried to build a framework for easily adding remote applications without any changes to the remote applications. The framework had following design:
- For different functionality and pages we stored feature-link and url in configuration, that configuration basically allowed us to find where to
get the HTML/XHTML render page or form. - However, instead of displaying raw HTML, I first converted it into valid XHTML using Tidy and then transformed such that all form submissions
go through our proxy server. As part of the transformation, I added original form URLs, methods and cookies as hidden fields. This allowed
me to keep the proxy server without any client state, which was nice for clustering. - When the form is submitted, the proxy server intercepts it and then makes the request to the remote application and sends back response. The proxy
server reads the original target URLs, method types, cookies from the form so that remote application can manage state if it’s using Sessions.
Following are a few simplified classes that I developed for the proxy framework:
Content Transformation
First, I developed HTML to XHTML conversion and transformation to modify forms. Following is the XSLT that I used:
1 <xsl:stylesheet 2 xmlns:xsl="http://www.w3.org/1999/XSL/Transform" 3 version="1.0" 4 xmlns:xhtml="http://www.w3.org/1999/xhtml" 5 xmlns="http://www.w3.org/1999/xhtml" 6 exclude-result-prefixes="xhtml"> 7 8 <xsl:param name="callbackHandler"/> 9 <xsl:param name="callbackUser"/> 10 <xsl:param name="callbackState"/> 11 12 <xsl:template match="@* | node()"> 13 <xsl:copy> 14 <xsl:apply-templates select="@* | node()"/> 15 </xsl:copy> 16 </xsl:template> 17 18 <xsl:template match="form"> 19 <xsl:copy> 20 <xsl:apply-templates select="@*"/> 21 <xsl:attribute name="action" xmlns:java="http://xml.apache.org/xslt/java"> 22 <xsl:param name="callbackOriginalActionUrl" select="@action"/> 23 <xsl:text disable-output-escaping="yes">/web/myproxy?myarg=xxx&otherarg=2</xsl:text> 24 <xsl:value-of select="java:com.plexobject.transform.XslContentTransformer.setAction($callbackHandler, string(@action))" /> 25 </xsl:attribute> 26 <xsl:attribute name="method" xmlns:java="http://xml.apache.org/xslt/java"> 27 <xsl:param name="callbackOriginalMethodType" select="@method"/> 28 <xsl:text disable-output-escaping="yes">POST</xsl:text> 29 <xsl:value-of select="java:com.plexobject.transform.XslContentTransformer.setMethod($callbackHandler, string(@method))" /> 30 </xsl:attribute> 31 <xsl:attribute name="id">_Form</xsl:attribute> 32 <xsl:attribute name="name">_Form</xsl:attribute> 33 <input type="hidden" name="_user" value="{$callbackUser}"/> 34 <input type="hidden" name="_originalActionUrl" value="{@action}"/> 35 <input type="hidden" name="_orginalMethodType" value="{@method}"/> 36 <input type="hidden" name="_userState" value="{$callbackState}"/> 37 <xsl:apply-templates select="node()"/> 38 </xsl:copy> 39 </xsl:template> 40 41 42 <xsl:template match="title"/> 43 44 45 </xsl:stylesheet> 46 47 48
A few things note
- xsl:param allows passing parameters from the runtime (Java)
- xsl:template is matching for “form” tag and replaces action/method attributes and adds id/name attributes. It then adds a few input hidden fields
- Finally, I am removing title tag
ContentTransformer interface
1 package com.plexobject.transform; 2 3 import java.util.Map; 4 5 public interface ContentTransformer { 6 /** 7 * This method transforms given contents 8 * 9 * @param contents 10 * - input contents 11 * @param properties 12 * - input/output properties for transformation 13 * @return transformed contents 14 * @throws TransformationException 15 * - when error occurs while transforming content. 16 */ 17 public String transform(String contents, Map<String, String> properties) 18 throws TransformationException; 19 } 20 21
ContentTransformer implementation
A few things to note in following implementation:
- I use JTidy to convert HTML to XHTML
- I pass some of the parameters to the XSL stylesheet and I also read a few properties back. Though, reading properties back is a bit kludgy but it works.
1 package com.plexobject.transform; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ByteArrayOutputStream; 5 import java.io.InputStream; 6 import java.util.HashMap; 7 import java.util.Map; 8 9 import javax.xml.transform.Result; 10 import javax.xml.transform.Source; 11 import javax.xml.transform.Transformer; 12 import javax.xml.transform.TransformerException; 13 import javax.xml.transform.TransformerFactory; 14 import javax.xml.transform.stream.StreamResult; 15 import javax.xml.transform.stream.StreamSource; 16 17 import org.w3c.tidy.Tidy; 18 19 public class XslContentTransformer implements ContentTransformer { 20 public static final String ACTION = "form_action"; 21 public static final String METHOD = "form_method"; 22 23 private static final Map<String, Map<String, String>> xslProperties = new HashMap<String, Map<String, String>>(); 24 private volatile Transformer transformer; 25 private final String xslUri; 26 private final boolean useTidy; 27 28 public XslContentTransformer(final String xslUri, final boolean useTidy) { 29 this.xslUri = xslUri; 30 this.useTidy = useTidy; 31 } 32 33 public static final String setAction(final String callbackHandler, 34 final String action) { 35 getPropertiesForCallback(callbackHandler).put(ACTION, action); 36 return ""; 37 } 38 39 public static final String getAction(final String callbackHandler) { 40 return getPropertiesForCallback(callbackHandler).get(ACTION); 41 } 42 43 public static final String setMethod(final String callbackHandler, 44 final String method) { 45 getPropertiesForCallback(callbackHandler).put(METHOD, method); 46 return ""; 47 } 48 49 public static final String getMethod(final String callbackHandler) { 50 return getPropertiesForCallback(callbackHandler).get(METHOD); 51 } 52 53 /** 54 * This method transforms given contents 55 * 56 * @param contents 57 * - input contents 58 * @param properties 59 * - input/output properties for transformation 60 * @return transformed contents 61 * @throws TransformationException 62 * - when error occurs while transforming content. 63 */ 64 public String transform(String contents, Map<String, String> properties) 65 throws TransformationException { 66 initTransformer(); 67 final long started = System.currentTimeMillis(); 68 69 contents = contents.replaceAll("<!--.*?-->", ""); 70 InputStream in = new ByteArrayInputStream(contents.getBytes()); 71 if (useTidy) { 72 in = tidy(in, (int) contents.length()); 73 } 74 75 // 76 final Source xmlSource = new StreamSource(in); 77 final ByteArrayOutputStream out = new ByteArrayOutputStream( 78 (int) contents.length()); 79 80 final Result result = new StreamResult(out); 81 String callbackHandler = properties.get("callbackHandler"); 82 if (callbackHandler == null) { 83 callbackHandler = Thread.currentThread().getName(); 84 } 85 final Map<String, String> props = new HashMap<String, String>(); 86 xslProperties.put(callbackHandler, props); 87 transformer.setParameter("callbackHandler", callbackHandler); 88 for (Map.Entry<String, String> e : properties.entrySet()) { 89 transformer.setParameter(e.getKey(), e.getValue()); 90 } 91 try { 92 transformer.transform(xmlSource, result); 93 } catch (TransformerException e) { 94 throw new TransformationException("Failed to transform " + contents, e); 95 } 96 properties.put(ACTION, getAction(callbackHandler)); 97 properties.put(METHOD, getMethod(callbackHandler)); 98 xslProperties.remove(callbackHandler); 99 return new String(out.toByteArray()); 100 } 101 102 private static final Map<String, String> getPropertiesForCallback( 103 String callbackHandler) { 104 final Map<String, String> props = xslProperties.get(callbackHandler); 105 if (props == null) { 106 throw new NullPointerException( 107 "Failed to find properties for callback " + callbackHandler); 108 } 109 return props; 110 } 111 112 // no synchronization needed, multiple initialization is acceptable 113 private final void initTransformer() { 114 if (transformer == null) { 115 try { 116 TransformerFactory transFact = TransformerFactory.newInstance(); 117 InputStream in = getClass().getResourceAsStream(xslUri); 118 if (in == null) { 119 throw new TransformationException("failed to find xslt " 120 + xslUri); 121 } 122 Source xsltSource = new StreamSource(in); 123 transformer = transFact.newTransformer(xsltSource); 124 } catch (TransformationException e) { 125 throw e; 126 } catch (RuntimeException e) { 127 throw e; 128 } catch (Exception e) { 129 throw new TransformationException( 130 "Failed to initialize XSL transformer", e); 131 } 132 } 133 } 134 135 private final InputStream tidy(InputStream in, int length) { 136 ByteArrayOutputStream out = new ByteArrayOutputStream(length); 137 Tidy converter = new Tidy(); 138 converter.setTidyMark(false); 139 converter.setXmlOut(true); 140 converter.setXmlPi(true); 141 converter.setXmlPIs(true); 142 converter.setNumEntities(true); 143 converter.setDocType("omit"); 144 converter.setUpperCaseTags(false); 145 converter.setUpperCaseAttrs(false); 146 converter.setFixComments(true); 147 converter.parse(in, out); 148 return new ByteArrayInputStream(out.toByteArray()); 149 } 150 } 151 152
Proxy
Following interfaces and classes show how GET/POST requests are proxied:
Proxy Interface
1 package com.plexobject.web.proxy; 2 3 import java.io.IOException; 4 import java.util.Map; 5 6 public interface HttpProxy { 7 /** 8 * This method issues a GET or POST request based on method and URI URI specified in the ProxyState 9 * and adds given parameters to the request. 10 * 11 * @param state 12 * - proxy state 13 * @param params 14 * - name/value pairs of parameters that are sent to the get 15 * request 16 */ 17 public ProxyResponse request(ProxyState state, Map<String, String[]> params) 18 throws IOException; 19 } 20 21
Proxy Implementation
Following class implements HttpProxy interface using HTTPClient library:
1 package com.plexobject.web.proxy; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.List; 6 import java.util.Map; 7 8 import org.apache.commons.httpclient.Cookie; 9 import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; 10 import org.apache.commons.httpclient.HttpClient; 11 import org.apache.commons.httpclient.HttpMethodBase; 12 import org.apache.commons.httpclient.HttpState; 13 import org.apache.commons.httpclient.NameValuePair; 14 import org.apache.commons.httpclient.cookie.CookiePolicy; 15 import org.apache.commons.httpclient.methods.GetMethod; 16 import org.apache.commons.httpclient.methods.PostMethod; 17 import org.apache.commons.httpclient.params.HttpMethodParams; 18 19 import com.plexobject.io.IoUtil; 20 21 public class HttpProxyImpl implements HttpProxy { 22 private static final int CONNECTION_TIMEOUT_MILLIS = 30000; 23 24 /** 25 * This method issues a GET or POST request based on method and URI URI specified in the ProxyState 26 * and adds given parameters to the request. 27 * 28 * @param state 29 * - proxy state 30 * @param params 31 * - name/value pairs of parameters that are sent to the get 32 * request 33 */ 34 public ProxyResponse request(ProxyState state, Map<String, String[]> params) 35 throws IOException { 36 if (state.getMethod() == MethodType.GET) { 37 return get(state, params); 38 } else { 39 return post(state, params); 40 } 41 } 42 43 44 /** 45 * This method issues a GET request on the URI specified in the ProxyState 46 * and adds given parameters to the request. 47 * 48 * @param state 49 * - proxy state 50 * @param params 51 * - name/value pairs of parameters that are sent to the get 52 * request 53 */ 54 private ProxyResponse get(ProxyState state, Map<String, String[]> params) 55 throws IOException { 56 GetMethod method = new GetMethod(state.getUri()); 57 method.setQueryString(toNameValues(params)); 58 return doRequest(state, params, method); 59 } 60 61 /** 62 * This method issues a POST request on the URI specified in the ProxyState 63 * and adds given parameters to the request. 64 * 65 * @param state 66 * - proxy state 67 * @param params 68 * - name/value pairs of parameters that are sent to the POST 69 * request 70 */ 71 private ProxyResponse post(ProxyState state, Map<String, String[]> params) 72 throws IOException { 73 PostMethod method = new PostMethod(state.getUri()); 74 method.setRequestBody(toNameValues(params)); 75 return doRequest(state, params, method); 76 } 77 78 private ProxyResponse doRequest(ProxyState proxyState, 79 Map<String, String[]> params, HttpMethodBase method) 80 throws IOException { 81 long started = System.currentTimeMillis(); 82 HttpClient client = new HttpClient(); 83 client.getHttpConnectionManager().getParams().setConnectionTimeout( 84 CONNECTION_TIMEOUT_MILLIS); 85 client.getParams().setCookiePolicy(CookiePolicy.BROWSER_COMPATIBILITY); 86 method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, 87 new DefaultHttpMethodRetryHandler(3, false)); 88 89 HttpState initialState = new HttpState(); 90 for (Cookie cookie : proxyState.getCookies()) { 91 initialState.addCookie(cookie); 92 } 93 client.setState(initialState); 94 95 try { 96 int statusCode = client.executeMethod(method); 97 String contents = IoUtil.read(method.getResponseBodyAsStream()); 98 // 99 Cookie[] cookies = client.getState().getCookies(); 100 for (Cookie cookie : cookies) { 101 proxyState.addCookie(cookie); 102 } 103 104 return new ProxyResponse(statusCode, contents, proxyState); 105 } catch (RuntimeException e) { 106 throw e; 107 } catch (IOException e) { 108 throw e; 109 } catch (Exception e) { 110 throw new IOException("failed to process request", e); 111 } finally { 112 method.releaseConnection(); 113 } 114 } 115 116 private NameValuePair[] toNameValues(Map<String, String[]> params) { 117 if (params == null || params.size() == 0) { 118 return new NameValuePair[0]; 119 } 120 List<NameValuePair> nvPairs = new ArrayList<NameValuePair>(); 121 for (Map.Entry<String, String[]> e : params.entrySet()) { 122 String[] values = e.getValue(); 123 for (int j = 0; j < values.length; j++) { 124 nvPairs.add(new NameValuePair(e.getKey(), values[j])); 125 } 126 } 127 return (NameValuePair[]) nvPairs.toArray(new NameValuePair[nvPairs 128 .size()]); 129 } 130 } 131 132
ProxyState
Following class maintains URL, cookies, headers, and other information related to web request:
1 package com.plexobject.web.proxy; 2 3 import java.io.Serializable; 4 import java.io.UnsupportedEncodingException; 5 import java.net.URLDecoder; 6 import java.net.URLEncoder; 7 import java.util.Collection; 8 import java.util.Date; 9 import java.util.HashMap; 10 import java.util.Map; 11 12 import org.apache.commons.httpclient.Cookie; 13 14 /** 15 * Class: ProxyState 16 * 17 * Description: This class stores state needed to make a proxy request including 18 * method type and cookies. 19 * 20 */ 21 public class ProxyState implements Serializable { 22 private static final long serialVersionUID = 1L; 23 private static final String DATA_DELIMITER = "\n"; 24 private static final String COOKIE_DELIMITER = ";"; 25 private static final String NULL = "null"; 26 27 private String uri; 28 private MethodType method; 29 private Map<String, Cookie> cookies; 30 31 /** 32 * Constructors for ProxyState 33 */ 34 public ProxyState(String uri, String method) { 35 this(uri, MethodType.valueOf(method)); 36 } 37 38 public ProxyState(String uri, MethodType method) { 39 this.uri = uri; 40 this.method = method; 41 this.cookies = new HashMap<String, Cookie>(); 42 } 43 44 /** 45 * @return uri 46 */ 47 public String getUri() { 48 return this.uri; 49 } 50 51 /** 52 * @return method 53 */ 54 public MethodType getMethod() { 55 return this.method; 56 } 57 58 /** 59 * @return cookies 60 */ 61 public Collection<Cookie> getCookies() { 62 return this.cookies.values(); 63 } 64 65 66 /** 67 * @param cookies 68 */ 69 public void addCookies(Collection<Cookie> cookies) { 70 for (Cookie cookie : cookies) { 71 addCookie(cookie); 72 } 73 } 74 75 /** 76 * @param cookie 77 * - to add 78 */ 79 public void addCookie(Cookie cookie) { 80 this.cookies.put(cookie.getName(), cookie); 81 } 82 83 public String getCookieString() { 84 StringBuilder sb = new StringBuilder(512); 85 for (Cookie cookie : cookies.values()) { 86 if (cookie.getDomain() != null) { 87 sb.append(cookie.getDomain()).append(COOKIE_DELIMITER); 88 } else { 89 sb.append(NULL).append(COOKIE_DELIMITER); 90 } 91 sb.append(cookie.getName()).append(COOKIE_DELIMITER).append( 92 cookie.getValue()).append(COOKIE_DELIMITER); 93 94 if (cookie.getPath() != null) { 95 sb.append(cookie.getPath()).append(COOKIE_DELIMITER); 96 } else { 97 sb.append(NULL).append(COOKIE_DELIMITER); 98 } 99 if (cookie.getExpiryDate() != null) { 100 sb.append(String.valueOf(cookie.getExpiryDate().getTime())) 101 .append(COOKIE_DELIMITER); 102 } else { 103 sb.append(NULL).append(COOKIE_DELIMITER); 104 } 105 sb.append(String.valueOf(cookie.getSecure())) 106 .append(DATA_DELIMITER); 107 } 108 return sb.toString(); 109 } 110 111 112 @Override 113 public String toString() { 114 StringBuilder sb = new StringBuilder(512); 115 sb.append(uri.toString()).append(DATA_DELIMITER); 116 sb.append(method.toString()).append(DATA_DELIMITER); 117 sb.append(getCookieString()); 118 return sb.toString(); 119 } 120 121 /** 122 * This method converts proxy state into string based serialized state 123 * 124 * @return string based serialized state 125 */ 126 public String toExternalFormat() { 127 try { 128 return URLEncoder.encode(toString(), "UTF8"); 129 } catch (UnsupportedEncodingException e) { 130 throw new IllegalStateException("failed to encode", e); 131 } 132 } 133 134 /** 135 * This method converts a string based serialized state into the proxy state 136 * 137 * @param ser 138 * - string based serialized state 139 * @return ProxyState 140 * @throws IllegalArgumentException 141 * - if serialized state is null or corrupted. 142 */ 143 public static ProxyState valueOf(String ser) { 144 if (ser == null) 145 throw new IllegalArgumentException("Null serialized object"); 146 String decoded; 147 try { 148 decoded = URLDecoder.decode(ser, "UTF8"); 149 } catch (UnsupportedEncodingException e) { 150 throw new IllegalArgumentException("Unsupported encoding " + ser, e); 151 } 152 String[] lines = decoded.split(DATA_DELIMITER); 153 if (lines.length < 2) 154 throw new IllegalArgumentException( 155 "Insufficient number of tokens in serialized object [" 156 + decoded + "]"); 157 ProxyState state = new ProxyState(lines[0], lines[1]); 158 for (int i = 2; i < lines.length; i++) { 159 String[] cookieFields = lines[i].split(COOKIE_DELIMITER); 160 if (cookieFields.length < 6) 161 throw new IllegalArgumentException( 162 "Insufficient number of tokens 6 in serialized cookies [" 163 + lines[i] + "]/[" + decoded + "]"); 164 String domain = cookieFields[0]; 165 if (NULL.equals(domain)) { 166 domain = null; 167 } 168 String name = cookieFields[1]; 169 String value = cookieFields[2]; 170 String path = cookieFields[3]; 171 if (NULL.equals(path)) { 172 path = null; 173 } 174 Date expires = null; 175 if (!NULL.equals(cookieFields[4])) { 176 expires = new Date(Long.parseLong(cookieFields[4])); 177 } 178 boolean secure = new Boolean(cookieFields[5]).booleanValue(); 179 Cookie cookie = new Cookie(domain, name, value, path, expires, 180 secure); 181 state.addCookie(cookie); 182 } 183 return state; 184 } 185 186 @Override 187 public boolean equals(Object o) { 188 if (this == o) 189 return true; 190 if (!(o instanceof ProxyState)) 191 return false; 192 final ProxyState other = (ProxyState) o; 193 if (uri != null ? !uri.equals(other.uri) : other.uri != null) 194 return false; 195 if (method != null ? !method.equals(other.method) 196 : other.method != null) 197 return false; 198 return true; 199 } 200 201 @Override 202 public int hashCode() { 203 int result; 204 result = (uri != null ? uri.hashCode() : 0); 205 result = 29 * result + (method != null ? method.hashCode() : 0); 206 return result; 207 } 208 } 209 210
ProxyResponse
Following class stores response from the HttpProxy interface:
1 package com.plexobject.web.proxy; 2 3 import java.io.Serializable; 4 5 6 /** 7 * Class: ProxyResponse 8 * 9 * Description: This class stores proxy state and response. 10 */ 11 public class ProxyResponse implements Serializable { 12 private static final long serialVersionUID = 1L; 13 private int responseCode; 14 private String contents; 15 private ProxyState state; 16 17 /** 18 * Constructor for ProxyResponse 19 */ 20 public ProxyResponse(int responseCode, String contents, ProxyState state) { 21 this.responseCode = responseCode; 22 this.contents = contents; 23 this.state = state; 24 } 25 26 /** 27 * @return http response code 28 */ 29 public int getResponseCode() { 30 return this.responseCode; 31 } 32 33 /** 34 * @return XHTML contents 35 */ 36 public String getContents() { 37 return this.contents; 38 } 39 40 /** 41 * @return state associated with the proxy web request 42 */ 43 public ProxyState getState() { 44 return this.state; 45 } 46 47 @Override 48 public String toString() { 49 return this.responseCode + "\n" + this.state + "\n" + this.contents; 50 } 51 } 52 53
MethodType
Following class defines enum for http method types:
1 package com.plexobject.web.proxy; 2 3 /** 4 * Class: MethodType 5 * 6 * Description: Defines supported method types for proxy request. 7 * 8 */ 9 public enum MethodType { 10 GET, POST; 11 } 12 13
Service Example
Following classes show how above HTTPProxy and ContentTransfomer interfaces can be used with Servlet/Portlet APIs:
ProxyService Interface
1 package com.plexobject.web.service; 2 import javax.servlet.http.*; 3 import java.io.*; 4 5 public interface ProxyService { 6 public void render(HttpServletRequest request, HttpServletResponse response) throws IOException ; 7 public void submit(HttpServletRequest request, HttpServletResponse response) throws IOException ; 8 } 9 0
ProxyService Implementation
1 package com.plexobject.web.service; 2 import com.plexobject.web.proxy.*; 3 import com.plexobject.transform.ContentTransformer; 4 import javax.servlet.http.*; 5 import java.io.*; 6 import java.util.*; 7 8 9 public class ProxyServiceImpl implements ProxyService { 10 private HttpProxy httpProxy; 11 private ContentTransformer contentTransformer; 12 public ProxyServiceImpl(HttpProxy httpProxy, ContentTransformer contentTransformer) { 13 this.httpProxy = httpProxy; 14 this.contentTransformer = contentTransformer; 15 } 16 17 public void render(HttpServletRequest request, HttpServletResponse response) throws IOException { 18 String url = "http://plexrails.plexobject.com/guest_book/sign"; 19 ProxyState state = new ProxyState(url, MethodType.GET); 20 String inputXhtml = httpProxy.request(state, null).getContents(); 21 Map<String, String> properties = new HashMap<String, String>(); 22 properties.put("callbackState", state.toExternalFormat()); 23 String transformedXhtml = contentTransformer.transform(inputXhtml, properties); 24 response.getWriter().println(transformedXhtml); 25 } 26 27 public void submit(HttpServletRequest request, HttpServletResponse response) throws IOException { 28 String originalActionUrl = request.getParameter("originalActionUrl"); 29 String orginalMethodType = request.getParameter("orginalMethodType"); 30 ProxyState userState = ProxyState.valueOf(request.getParameter("userState")); 31 Map<String, String[]> params = request.getParameterMap(); 32 ProxyState state = new ProxyState(originalActionUrl, orginalMethodType); 33 state.addCookies(userState.getCookies()); 34 ProxyResponse proxyResponse = httpProxy.request(state, params); 35 response.getWriter().println(proxyResponse.getContents()); 36 } 37 } 38 39
Download Code
You can download above code from here.
Acknowledgement
I would like to thank the folks at XSLT forum of Programmer-to-Programmer (http://p2p.wrox.com/forum.asp?FORUM_ID=79) for answering my XSLT questions.
August 14, 2008
Benefits of REST based services
I saw Damien Katz blog on REST, I just don’t get it and I was a bit surprised that he doesn’t get REST especially since he wrote CouchDB based on REST. Though, I admit there are a lot of bad examples of REST services that use REST sort of like RPC over POST, but resource oriented services can be quite simple and powerful. REST principles are building blocks for the web and it has proven to be quite scalable and efficient. I have been developing REST based services for a number of years, in some ways before I learned about Roy Fieldings’ thesis and REST principles. Back in 90s, I worked on building traffic sites and used CORBA to subscribe and publish traffic events. We also published that data on the website, but soon we found a number of people were scraping the website so I wrote a simple XML over HTTP service to download the data that other groups can use. I have found following benefits when using REST based services:
- separating reads from writes. I have worked on large ecommerce and travel website and one of the lesson is to keep your read/query services separate from your transactional services. REST APIs define separate operations for reads and write/updates.
- caching: you can find tons of off the shelf solutions for caching GET requests including hardware solutions. There are tons of features like ETags and cache headers that provide this feature.
- compression: Since REST uses HTTP, you can use compression such as gzip. This can improve the performance of the services.
- idempotency: GET, PUT, DELETE and HEAD are idempotent, which means if designed correctly the request can be retried without any worries about side effects. POST on the other hand is not idempotent and may have side effects.
- bookmarking: GET requests can be easily bookmarked. It is important not to use GET to change state of application.
- security: Though, security has been weakest area of REST as compared to SOAP, but HTTPS and simple authentication surfice. Though, there are better standards like oauth.
- big response size: REST/HTTP is the only service platform that I have seen supports gigabytes of responses. I have done a lot of CORBA based services in 90s, EJBs/SOAP in early 2000s and messaging based services for over ten years. None of those platforms support large size responses.
- simplicity: I find this is the biggest reason for using REST. I can use browser to call GET based requests and write client in any language.
- resources: REST response can include URIs for other APIs and client can change state through these resources. You can use XHTML to embed all these resources that can be easily tested with browsers.
- No need for additional jars: When I used CORBA, EJBs, RMI or JINI, I always had to put client/skeleton jar files. Having worked in large companies where I had to import dozens of these jar files became maintenance problem. With REST, I can simply call the service without importing anything.
- Error codes: HTTP comes with a number of response codes for real life services including thrashing requests such as server busy (503).
- Meta data: As opposed to CORBA, JINI, RMI services, I can pass meta data easily as HTTP supports predefined and user-defined headers. These headers can include information on authentication, quality of service, timeout or other context related data. Occasionally, I add Map<String,String> to APIs when I use Java based services, but it polutes pure APIs.
The only real drawbacks I see for REST based services are that they are generally synchronous and blocking which can waste threads (though some of it can be solved with async I/O or event based dispatching). Personally I like to use messaging underneath REST services that provide asynchronous, persistence, and better reliability.
July 31, 2008
Love and Hate with Java
For past few years, bashing Java has been really popular though some of the criticism has merits. But in general, due to its popularity, Java has become “the man” who tries to bring everyone down. There are millions of programmers who work for the “Java the man”. I saw recent post from NYU professor, who called Java-savvy college grads to tomorrow’s pizza delivery man. I know Joel Spolsky often mentions teaching C in unversities to help understand pointers and memory management. I agree with notion of teachings multiple languages in universities so that graduates have wide breadth of understanding with differant programming paradigms. I started learning programming back in 80’s on Atari and learned BASIC. I then moved to PC and learned GW-BASIC and then learned C, FORTRAN, Assembler, COBOL, Pascal, C++ in college. I also learned Lisp, Prolog, Perl on my own. In late 80’s and early 90s, I also learned DBase III, RPG and SAS, which was called 4th generation language. Similarly, C, FOTRAN, Pascal, etc. were called third languages, assembly languages were second generation languages. I learned Java in ’95 when it came out and found it to be much easier to program than C/C++. I also learned Python, Ruby and Erlang for past few years and have been learning Haskell and Scala these days.
For most part, Java has been my primary language with some use of C++, Perl, Ruby, Python/Jython (and Erlang on my own). Though I wish I could use more Erlang but I don’t have same experience with Erlang as I have with Java. Over time, Java managed to take a lot of C/C++ share of the market. Also, Java has managed to buid large ecosystem with open source and commercial suites of libraries and frameworks. I often hear that Java is so enterprisy and popular in large companies, but truth is that Java has proven itself to be reliable language. Steve Yeggie also mentioned in his blog how Google primarily uses Java, C++, Python and Javascript.
I like the polyglot environment, where I write performance critical code in system language like Java and use Ruby/Python for high level glue code or web tier.
I find often the criticism of Java is dishonest. For example, though people raves about metaprogramming in Ruby but forget to mention all the overhead that goes with it, not to mention security holes and memory leaks issues. The truth is that none of hot languages like Python, Ruby, Erlang, Haskell provide same performance as Java, in fact Java’s hotspot compiler beats C++ in production. I am going to ignore static vs dynamic language debate, but I’ve found static languages work better with large number of developers. Again, I like these languages, but I prefer to see some balanced comparison. The real reason Java is popular is because there are tons of jobs. Here is quick comparison of jobs in Java, C++, C#, Erlang, Haskell, OCaml, Ruby, Python and Factor:

As Bruce Lee said:
|
I fear not the man who has practiced 10,000 kicks once, but I fear the man who has practiced one kick 10,000 times. |
I find it, the way you can distinguish yourself is by learning more about the design and architecture of developing system and learning more about the ecosystem. It takes years to learn the ins and outs of programming language and all the tools and libraries with it. Though, I totally agree with learning a number of different languages like Haskell, Factor, Erlang, Scala, Groovy and I have been trying to learn all those for many years. However, for system language my first choice is still Java, simply because I have found it to be reliable and efficient language. As James Gosling said Java is a blue collar language. Sure it does not have closures (yet), actors, transactional memory, metaprogramming or AST/macros but it is well suited for building large applications by hundreds or thousands of programmers. I just started a large project in my division at Amazon, and sure enough I chose Java because I have been using it for over twelve years and I know it can do the job. It wasn’t simply because Java is safe choice (no one got fired for choosing IBM), but practically Java has more matured solutions for business needs. For example, my project needs to integrate with 20+ applications and is aimed at reducing manual work so it needed portal server, workflow engine, rules engine and messaging service and there are tons of options for those in Java community.
Finally, JVM is proving to be neat platform for building new languages like JRuby, Jython, Groovy, Scala, Clojure, etc. that can bring cool features and high interoperability with existing system. As Guy Steele said in his recent interview, you can’t expect one language to solve all problems.
July 7, 2008
Reaction vs Preparedness
I’ve had struggled with culture of fire-fighting and heroism and discussed these issues several time in my blog [1], [2], [3], [4]. Over the weekend, I saw Bruce Schneier’s talk on security. He talked about Hurricane Katrina’s devastation and said that politicians don’t like to invest in infrastructure because it does not get noticed. He showed how reaction is considered more important than preparedness. Another thing he mentioned was that investing in new projects is received better than investing in existing projects. I find that IT culture has very similar value system, where heros or martyrs who react (on daily basis) to emerging crisis and fires are noticed by the managers and receive promotions and accolades. The same culture does not value in investing to better prepare for disasters or emergencies. Unfortunately, once people get promoted they moved on and leave the pile of shit for someone else. Of course, the next team would rewrite rather than keeping the old system. In last two years, I had to rewrite two major systems where the previous systems were written less than two years ago. May be they will be rewritten by someone else when I leave. The cycle will continue…
July 1, 2008
Designing Microblogging system for Scalability
Introduction
I have been a Twitter user for a while, have observed or heard about downtime and scalability problems with Twitter. The scalability of Twitter has become a topic for a lot of discussions and blogs and has also offered a useful excercise to design scalable systems. A common root cause as identified from Twitter’s blogs is that the architecture is based on CMS because it was written in Ruby on Rails and that is what Rails good at. The solution to the scalability problem as pointed by other people is messaging based architecture. There’s also been a lot of blame for Twitter’s problems on Ruby and Rails because Ruby is a slow language compared to other static and dynamic languages and Rails is not built for scalability. Though, there is some truth to it, but I don’t think there are the sole bottlenecks. In fact, I am going to show a small prototype written in Ruby and Rails (partially) that integrate with the messaging middlewares, which can be scaled easily. I have been using messaging middlwares such as CORBA Event service, IBM MQ series, Websphere, Weblogic, Tibco and ActiveMQ for over ten years and have long been proponent of messaging based sysems for scalable systems [1] [2]. So, I spent a few hours to put together a prototype based on messaging middleware and Ruby on Rails to see how such system can be developed.
Design Principles
Before describing my design, I am going to review some of the design principles that I have used for building large scale systems ([1], [2]), which are:
- Coupling/Cohesion – loosely coupled and shared nothing architecture and partitioning based on functionality.
- Messaging or SEDA architecture to implement reliable and scalable services and avoid temporal dependencies.
- Resource management – good old practices of avoiding leaks of memory, database connections, etc.
- Data replication especially read-only data.
- Partition data (Sharding) – using multiple databases to partition the data.
- Avoid single point of failure.
- Bring processing closer to the data.
- Design for service failures and crashes.
- Dynamic Resources – Design service frameworks so that resources can be removed or added automatically and clients can automatically discover them. Use virtualization and horizontal scalability whenever possible.
- Smart Caching – Cache expensive operations and contents as much as possible.
- Avoid distributed transactions, use optimistic compensating transactions (See CAP principle).
Architecture & Design
Following is high level architecture for the Microblogging system:

First, I selected REST architecture as an entry point to our system for both Web UI and 3rd party applications and used messaging middleware for implementing the sevices. This gives us ease of access with REST APIs and scalability with messaging. In my implementation I chose JRuby/Rails to implement most of the code, Derby for the database and ActiveMQ for the messaging store. In addition to scalability, the messaging middleware gives you a lot of advantages from functional languages like Erlang such as immutability, message passing, fault tolerance (via persistence queues). You can even build support for versioning and hot code swapping by adding version number to each message and creating routers (See integration patterns) to direct messages to different handlers.
APIs
Following are REST APIs that will be exposed to 3rd party apps, Web and other kind of UI:
Create User
POST /users
where the user information is passed in the body in the form of parameters.
Login/Authenticate User
POST /users/userid/sessions
This will authenticate the user and create a session. Note that most of following APIs send back session-id, will be stored in the database (sharded) and will be used to retrieve user information.
Logout
HTTP-HEADER
session-id
DELETE /users/userid/sessions
Get User information
HTTP-HEADER
session-id
GET /users/userid
This API will return detailed user information
Anonymous User information
GET /users/userid
This API will return public user information
List of Followings
HTTP-HEADER
session-id
GET /followings/userid
This API will return summary of people, the user is following.
Create Followers
HTTP-HEADER
session-id
POST /followers/followerid
This API will create one-way follower relationship between the user and follower.
Enable notification for Followers
HTTP-HEADER
session-id
POST /followers/followerid/notifications
Disable notification for Followers
HTTP-HEADER
session-id
DELETE /followers/followerid/notifications
Block Followers
HTTP-HEADER
session-id
POST /followers/followerid/blocking
Unblock Followers
HTTP-HEADER
session-id
DELETE /followers/followerid/blocking
Follower Exist
HTTP-HEADER
session-id
GET /followers/followerid
This API will return 200 HTTP code if follower exist.
List of Followers
HTTP-HEADER
session-id
GET /followers
This API will return summary of people, the user is following.
Archive Messages
HTTP-HEADER
session-id
GET /messages?offset=xxx&limit=yyy&since=date
This API will return archived messages for the user, where offset and limit will be optional.
DELETE Messages
HTTP-HEADER
session-id
DELETE /messages/message-id
This API will return archived messages for the user, where offset and limit will be optional.
Send Direct Messages
HTTP-HEADER
session-id
POST /directmessages/targetuserid
This API will return send direct message to the given user.
Send Reply
HTTP-HEADER
session-id
POST /reply/message-id
This API will return reply for the given message-id and pass the contents of the message in the body (as parameters).
Direct Messages Received
HTTP-HEADER
session-id
GET /directmessages/userid?offset=xxx&limit=yyy&since=date
This API will return messages received by the user.
Replies Received
HTTP-HEADER
session-id
GET /replies?offset=xxx&limit=yyy&since=date
This API will return replies received by the user.
Update Status
HTTP-HEADER
session-id
POST /statuses
This API will update status of the user and pass the contents of the message in the body (as parameters).
Get Statuses
HTTP-HEADER
session-id
GET /statuses?offset=xxx&limit=yyy&since=date
This API will update status of the user and pass the contents of the message in the body (as parameters).
User Timeline
HTTP-HEADER
session-id
GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return timeline of the user. This API will compare given username with the authenticated username and will return detailed timeline if match, otherwise it will return public timeline.
Public Timeline
GET /timeline/username?offset=xxx&limit=yyy&since=date
This API will return public timeline of the user.
Friends Timeline
HTTP-HEADER
session-id
GET /friendstimeline?offset=xxx&limit=yyy&since=date
This API will return timeline of the friends of the user.
Request Flow
Here is an illustration of how information is flowed through different components:
Though, I am not showing request flow of all APIs, but they will follow similar pattern of flow.
Detailed Design
Domain Classes
Primary domain classes are:
- User
- Message, which has four subclasses DirectMessage, ReplyMessage, Tweet and Status for various kind of messages in the system.
- Follower – creates one-way relationship between two users, where follower can choose to be notified when the user changes his/her status.
I identify each message with special GUID and using a simple scheme to generate GUID for request-ids and message-ids, but for real project I would recommend better libraries such as UUIDTools.
Schema
Followers
1 2 class CreateFollowers < ActiveRecord::Migration 3 def self.up 4 5 create_table :followers do |t| 6 t.column :username, :string, :limit => 16 7 t.column :follower_username, :string, :limit => 16 8 t.column :relation_type, :string, :default => 'Follower', :limit => 32 9 t.column :blocked, :boolean, :default => false 10 t.column :notifications, :boolean, :default => false 11 t.column :created_at, :datetime 12 t.column :updated_at, :datetime 13 t.column :deleted_at, :datetime 14 end 15 add_index :followers, :username 16 add_index :followers, :follower_username 17 end 18 19 def self.down 20 drop_table :followers 21 end 22 end
Messages
1 class CreateMessages < ActiveRecord::Migration 2 def self.up 3 create_table :messages do |t| 4 t.column :message_id, :string, :limit => 64 5 t.column :type, :string, :limit => 32 6 t.column :message_type, :string, :default => 'Say', :limit => 32 7 t.column :reply_message_id, :string, :limit => 64 8 t.column :username, :string, :limit => 16 9 t.column :channel_name, :string, :limit => 32 10 t.column :message_body, :string, :limit => 140 11 t.column :favorite, :boolean, :default => false 12 t.column :sent_at, :datetime, :default => Time.now.utc 13 t.column :created_at, :datetime 14 t.column :deleted_at, :datetime 15 end 16 add_index :messages, :message_id 17 add_index :messages, :username 18 end 19 20 def self.down 21 drop_table :messages 22 end 23 end
Users
1 class CreateUsers < ActiveRecord::Migration 2 def self.up 3 create_table :users do |t| 4 t.column :username, :string, :limit => 16 5 t.column :password, :string, :limit => 16 6 t.column :name, :string, :limit => 64 7 t.column :email, :string, :limit => 64 8 t.column :time_zone_id, :string, :limit => 32 9 t.column :created_at, :datetime 10 t.column :updated_at, :datetime 11 t.column :deleted_at, :datetime 12 end 13 add_index :users, :username 14 end 15 16 def self.down 17 drop_table :users 18 end 19 end
Persistence
I used Rails’ ActiveRecord library to provide persistence, though alternatively I could have used ActiveHibernate. These libraries provide a quick way to add persistence capabilities with minimum configuration and boilerplate. This prototype is using multiple levels of partitioning, first at the service level, second at the persistence level. I am using multiple databases of Derby to store objects using a simple hashing scheme for load distribution. This prototype also shows how to connect to multiple databases in Rails, which was difficult in early implementation of Twitter.
Domain Services
The core model and services use domain driven design and applies principles of fat model and thin service (as opposed to fat servicess and anemic model). The domain services implement external REST APIs and use underlying ActiveRecord for most of the functionality.
Messaging Middleware
The REST based web services don’t invoke domain services directly, instead they use messaging middleware. In real application, I might use ESB/integration patterns such as intelligent routing to partition the system across multiple machines and send the request to the suitable queue. In this prototype, I am simply using ActiveMQ, which is fairly robust and easy to use. I am also using separate queues for different kind of operations. Another lesson I have learned in building large systems is to separate reads from the writes so that you can scale them independently and also offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent.
Business Delegate
The REST based web services don’t interact with the messaging middleware directly, instead they use business delegates that hides all details of sending out message, creating temporary queues and receiving messages. The interface of business delegates is same as services.
Benchmark Results
Though, performance was not the objective of my prototype, but I tried to check how many requests I can process on my development machine. I chose only to benchmark messaging middlewares and not REST server because JVM uses native threads and web containers such as Tomcat uses a small sized thread pool to perform requests. Since, our architecture is heavily IO-bound, that would not scale. Alternatively, I could have build reactive or event based APIs for HTTP or use Yaws/Mochiweb as a container for REST based web sevices because creating a process in Erlang is pretty cheap. For example, Erlang process takes 300 bytes, whereas Java thread take 2M by default (though, it can be reduced to 256K on most machines). Here are results of running a simple server with embedded ActiveMQ and load test both running JRuby on my Pentium Linux machine. I used default VM size for both JRuby processes and didn’t tune any options:
| What | Elapsed Time (secs) | Throughput | Invocation Times |
| load_test_create_users | 99.041000 | 10.09682857311318/s | 1000 |
| load_test_bad_authenticate | 79.407000 | 12.593348183199506/s | 1000 |
| load_test_good_authenticate | 91.802000 | 10.893008861477503/s | 1000 |
| load_test_get_users | 91.362000 | 10.945469671474584/s | 1000 |
| load_test_create_followers | 143.560000 | 6.958714408811979/s | 999 |
| load_test_follower_exists | 95.140000 | 10.50020495537709/s | 999 |
| load_test_get_followers | 99.881000 | 10.002002391423325/s | 999 |
| load_test_get_followings | 104.737000 | 9.538176576655392/s | 999 |
| load_test_block_follower | 156.106000 | 6.399497779340769/s | 999 |
| load_test_unblock_follower | 161.950000 | 6.168532449211547/s | 999 |
| load_test_follower_enable_notifications | 157.637000 | 6.337344665142635/s | 999 |
| load_test_follower_disable_notifications | 161.714000 | 6.1775727524053545/s | 999 |
| load_test_send_direct_message | 1673.494000 | 5.969546350480188/s | 9990 |
| load_test_messages_sent_receive | 157.832000 | 6.323179075798668/s | 998 |
| load_test_send_direct_message | 1725.717000 | 5.788898179687536/s | 9990 |
| load_test_send_replies | 21.546000 | 5.105592926919201/s | 110 |
| load_test_get_user_status | 86.952000 | 11.500598043963544/s | 1000 |
| load_test_update_status | 213.298000 | 4.688276498896948/s | 1000 |
| load_test_get_user_status | 88.987000 | 11.23759650430517/s | 1000 |
| load_test_archive_messages | 80.982000 | 12.348575563593377/s | 1000 |
| load_test_public_timeline | 98.143000 | 10.189213685309506/s | 1000 |
| load_test_user_timeline | 92.871000 | 10.767623902461311/s | 1000 |
| load_test_friends_timeline | 140.155000 | 7.1349577268319555/s | 1000 |
| load_test_user_and_friends_timeline | 193.257000 | 5.174430032905596/s | 1000 |
I was not impressed with the results I got and I may implement similar prototype in Erlang using MochiWeb, Mnesia and Ejabber.
Summary
Though, the technologies I choose are somewhat arbitrary, the same design principles can be applied to other technology stack. Though, technologies are generaly selected due to political reasons or personal preference, it is important to consider team’s familiarity with the technology stack. For example, Twitter tried Ejabber but had problems troubleshooting becaues they were not familiar with Erlang.
Again, I used architecture principles such as stateless services, though in real life I may have to store some state which can be stored in the shareded database. I used embedded services such as embedded database server and messaging server because you can then easily start a single process on a machine and replicate machines as the load increases. I used partitioning/sharding heavily, which is key for scalability. I also used replication specially replicating messages for each follower so that the reads are fast and we don’t spend a lot of time querying the database. Though, it does add cost on the write side and adds disk requirements. I think we can control those by limiting number of messages per user per minute and maximum number of notifications. Also, we can remove old messages from the database. I also used principle of bringing computating closer to data by using embedded database.
Other Improvements
I tried to show that scalability problems are best solved from the architecture that is independent of a particular technology or language. In fact, I may implement similar design in Erlang using Mnesia as the database, Ejabber as the messaging middleware, Mochiweb for REST and OTP for implementing services. Other improvements that I would suggest be use of reverse proxy server for caching along with DMZ for security. We can also add some throttling to the reverse proxy servers. In order to have better fault tolerance, I might have multiple reverse proxy servers and use DNS round robin, though I don’t like DNS based load balance for real load distribution because it does not take server’s capacity and load into account, but it would suitable in this case. Also, I didn’t implement any caching, but we can use caching solutions such as EHCache, Tangosol, Terracotta, Memcache, etc. Though, caching highly dynamic contents may be difficult and less reusable. Also, I didn’t use any ESB (lightweight providers such as Mule, ServiceMix), but it can be used to abstract routing to the services, load balance, aggregation, replication, transformation, etc. Also, we can build support for versioning and hot code swapping by adding version number to each message and changing routing schemes. This prototype uses a simple scheme for partitioning by creating hash of business keys such as username, however it is difficult to manage when you need to add or remove servers because it requires migrating data. Another solution is to use MD5 scheme that we use at Amazon for S3, which may calculate MD5 of username and each replicated queue and select the queue whose md5-sum is higher than the username’s sum. For further fault tolerance, we could replicate data in Master/Slave fashion.
PS: I noticed “NativeException: javax.jms.JMSException: PermGen space” a couple of times running long load test, which is generally caused by reloading classes (as I have observed this for many years in Weblogic and Tomcat). I probably need to investigate this further but my guess is that JRuby is doing something dumb.
Download
Appendix
Source Code Model
./model/user.rb
1 ### 2 # User of the system. 3 # Though, in our system user can have many messages that it sent to update 4 # his/her status or other users and can also have followers and followings, but 5 # we are not showing those relationships because we cannot populate them due to 6 # sharding and scalability concerns. 7 8 9 ### 10 class User < ActiveRecord::Base 11 validates_presence_of :name 12 validates_presence_of :username, :within => 3..16 13 validates_presence_of :password, :within => 3..16 14 validates_length_of :email, :within => 3..64 15 validates_uniqueness_of :email, :case_sensitive => false 16 validates_format_of :email, :with => /^([^@\s]+)@((?:[-a-z0-9]+\.)+[a-z]{2,})$/i 17 # 18 ### all messages for the same user wil be stored in the same database. 19 ### also, all messages from the users he/she is following will be copied to the user's database 20 ### if they lie in different databases. 21 ### limiting rows returned from these relationships to 100 so that we don't overwhelm server. 22 # 23 has_many :statuses, :class_name => 'Message', :foreign_key => :username, :limit => 100, :order => 'sent_at desc' 24 has_many :tweets, :class_name => 'Message', :foreign_key => :channel_name, :limit => 100, :order => 'sent_at desc' 25 26 # 27 ### We will replicate follower/following to each database and though we may not have corresponding users, 28 ### but we can have usernames 29 # 30 def self.follower_usernames(username, offset, limit) 31 Follower.followers(username, offset, limit).map {|f| f.follower_username } 32 end 33 34 def self.following_usernames(username, offset, limit) 35 Follower.followings(username, offset, limit).map {|f| f.username } 36 end 37 38 def self.follower_count(username) 39 Follower.followers_count(username) 40 end 41 def self.following_count(username) 42 Follower.followers_count(username) 43 end 44 45 # 46 ### user's timeline 47 # 48 def self.timeline(username, offset, limit) 49 Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit) 50 end 51 52 def to_s 53 "#{username}" 54 end 55 56 def to_external_xml(xml) 57 xml.user(:username => self.username, :name => self.name, :email => self.email) 58 end 59 end
./model/message.rb
1 # 2 ### 3 # Message stores any message sent by user to update his/her status, tweets 4 # from other users that he/she is following, direct message sent to another 5 # user or reply message sent in response to tweet or direct message. 6 # Note that we use a GUID based message_id in conjunction with the database column 'id' 7 # because we are using sharding the database column can be duplicate, but the GUID 8 # based message_id will always be unique across all databases. 9 ### 10 class Message < ActiveRecord::Base 11 MESSAGE_TYPE_SAY = "Say" 12 MESSAGE_TYPE_SHOUT = "Shout" 13 # 14 validates_presence_of :message_id, :within => 1..64 15 validates_presence_of :username 16 # 17 ### channel_name will be username of the target user for now, 18 ### though, in future it could be a group or a topic 19 # 20 validates_presence_of :channel_name, :within => 1..32 21 validates_presence_of :message_body, :within => 1..140 22 validates_presence_of :message_type, :within => 1..32 23 validates_presence_of :sent_at 24 validates_uniqueness_of :message_id 25 # 26 ### all messages for the same user wil be stored in the same database. 27 ### also, all messages from the users he/she is following will be copied to the user's database 28 ### if they lie in different databases. 29 # 30 belongs_to :user, :class_name => 'User', :foreign_key => :username 31 32 def self.messages_for(username, offset, limit) 33 Message.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 34 end 35 36 def self.destroy_message(id) 37 message = Message.find_by_message_id(id) 38 if message 39 message.destroy 40 message 41 else 42 nil 43 end 44 end 45 46 47 def to_s 48 "#{username} -> #{channel_name}: #{message_body}" 49 end 50 51 def to_external_xml(xml) 52 xml.message(message_body, :message_id => self.message_id, :message_type => self.message_type, :from => self.username, :to => self.channel_name, :sent_at => sent_at.httpdate) 53 end 54 end
./model/reply_message.rb
1 ### 2 # Message sent in response to another tweet, message or direct message. 3 # 4 # commented below because message could be in different database. 5 # belongs_to :reply_message_id, :class_name => Message, :dependent => :destroy 6 ### 7 8 require 'direct_message' 9 10 class ReplyMessage < DirectMessage 11 validates_presence_of :reply_message_id, :within => 1..64 12 def self.replies(username, offset, limit) 13 ReplyMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 14 end 15 end
./model/guid_generator.rb
1 class GuidGenerator 2 @@count = 1 3 def self.guid(prefix) 4 @@count += 1 5 "#{prefix}#{@@count}#{Time.now.to_i}" 6 end 7 def self.next_message_id 8 guid("message_id") 9 end 10 def self.next_request_id 11 guid("request_id") 12 end 13 end
./model/follower.rb
1 # 2 #### 3 # Follower stores one-way relation between two users. However, in order to 4 # support sharding, we don't directly connect users instead we only store 5 # users' usernames. 6 ### 7 class Follower < ActiveRecord::Base 8 RELATION_TYPE_FOLLOWER = "Follower" 9 RELATION_TYPE_FRIEND = "Friend" 10 RELATION_TYPE_FAMILY = "Family" 11 RELATION_TYPE_COLLEAGUE = "Colleague" 12 RELATION_TYPE_ACQUAINTENCE = "Acquaintance" 13 RELATION_TYPE_FAN = "Fan" 14 RELATION_TYPE_OTHER = "Other" 15 16 validates_presence_of :relation_type 17 #validates_presence_of :blocked 18 #validates_presence_of :notifications 19 20 def self.create_follower(follower_attrs) 21 Follower.create!(follower_attrs) unless exists?(follower_attrs[:username], follower_attrs[:follower_username]) 22 end 23 24 def self.followers(username, offset, limit) 25 self.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit) 26 end 27 def self.follower_count(username) 28 self.count(:conditions => ['username = ?', username]) 29 end 30 def self.following_count(username) 31 self.count(:conditions => ['follower_username = ?', username]) 32 end 33 34 def self.followings(username, offset, limit) 35 self.find(:all, :conditions => ['follower_username = ?', username], :offset => offset, :limit => limit) 36 end 37 def self.destroy_follower(username, follower_username) 38 self.get_record(username, follower_username).destroy 39 end 40 def self.exists?(username, follower_username) 41 !self.get_record(username, follower_username).nil? 42 end 43 def self.enable_notifications(username, follower_username) 44 self.get_record(username, follower_username).update_attributes(:notifications => true) 45 end 46 def self.disable_notifications(username, follower_username) 47 self.get_record(username, follower_username).update_attributes(:notifications => false) 48 end 49 def self.block(username, follower_username) 50 self.get_record(username, follower_username).update_attributes(:blocked => true) 51 end 52 def self.unblock(username, follower_username) 53 self.get_record(username, follower_username).update_attributes(:blocked => false) 54 end 55 def self.get_record(username, follower_username) 56 self.find(:first, :conditions => ['username = ? and follower_username = ?', username, follower_username]) 57 end 58 def to_s 59 "#{username} -> #{follower_username}" 60 end 61 def to_external_xml(xml) 62 xml.follower(:username => self.username, :follower_username => self.follower_username) 63 end 64 65 end
./model/direct_message.rb
1 #### 2 # Direct message is sent directly to another user 3 #### 4 class DirectMessage < Message 5 def to_user(user) 6 self.channel_name = user.username 7 end 8 def self.direct_messages_sent(username, offset, limit) 9 DirectMessage.find(:all, :conditions => ['username = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 10 end 11 def self.direct_messages_received(username, offset, limit) 12 DirectMessage.find(:all, :conditions => ['channel_name = ?', username], :offset => offset, :limit => limit, :order => 'sent_at desc') 13 end 14 end
Source Code Services
./service/base_service.rb
1 class BaseService 2 protected 3 def initialize(username) 4 SchemaHelper.setup_schema_for(username) 5 6 end 7 8 def serialize(obj, format) 9 if obj 10 format ||= 'xml' 11 case format 12 when 'xml' 13 obj.to_xml 14 when 'yaml' 15 obj.to_yaml 16 when 'json' 17 obj.to_json 18 else 19 obj 20 end 21 end 22 end 23 end
./service/users_service.rb
1 # 2 ## UserService provides basic functionality for quering/storing users 3 ## It assumes connection to the right database is already setup. 4 # 5 class UsersService < BaseService 6 # 7 ### initialize service 8 # 9 def initialize(username) 10 super 11 end 12 13 14 # 15 ### retrieves user record 16 # 17 def get_user(username, options={}) 18 User.find_by_username(username) 19 end 20 21 # 22 ### return usernames of people whom the user is following. 23 # 24 def followings(username, options={}) 25 offset = options[:offset] || 0 26 limit = options[:limit] || 100 27 User.following_usernames(username, offset, limit) 28 end 29 30 31 # 32 ### return usernames of followers of authenticated user 33 # 34 def followers(username, options={}) 35 offset = options[:offset] || 0 36 limit = options[:limit] || 100 37 User.follower_usernames(username, offset, limit) 38 end 39 40 41 def authenticate(username, password, options={}) 42 User.find_by_username_and_password(username, password) 43 end 44 45 46 def create_user(user_attrs, options={}) 47 user = User.new() 48 user.attributes = user_attrs 49 user.save! 50 user 51 end 52 53 def archive_messages(username, options={}) 54 offset = options[:offset] || 0 55 limit = options[:limit] || 100 56 Message.messages_for(username, offset, limit) 57 end 58 end
./service/followers_service.rb
1 # 2 ## FollowersService provides basic functionality for creating/quering followers 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class FollowersService < BaseService 6 # 7 ### initialize service 8 # 9 def initialize(username) 10 super 11 end 12 # 13 ### create follower 14 # 15 def create_follower(follower_attrs, options={}) 16 Follower.create_follower(follower_attrs) 17 end 18 19 # 20 ### enable notifications 21 # 22 def enable_notifications(username, follower_username, options={}) 23 Follower.enable_notifications(username, follower_username) 24 end 25 26 # 27 ### disable notifications 28 # 29 def disable_notifications(username, follower_username, options={}) 30 Follower.disable_notifications(username, follower_username) 31 end 32 33 34 # 35 ### blocks a user 36 # 37 def block(username, follower_username, options={}) 38 Follower.block(username, follower_username) 39 end 40 41 # 42 ### unblock user 43 # 44 def unblock(username, follower_username, options={}) 45 Follower.unblock(username, follower_username) 46 end 47 48 49 # 50 ### delete status 51 # 52 def destroy_follower(username, follower_username, options={}) 53 Follower.destroy_follower(username, follower_username) 54 true 55 rescue ActiveRecord::RecordNotFound => e 56 false 57 end 58 59 60 # 61 ### get a single status 62 # 63 def exists?(username, follower_username, options={}) 64 Follower.exists?(username, follower_username) 65 end 66 end
./service/messages_service.rb
1 # 2 ## MessagesService provides basic functionality for quering/storing statuses/messages 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class MessagesService < BaseService 6 # 7 ### initialize service 8 # 9 def initialize(username) 10 super 11 end 12 13 14 # 15 ### get a single status 16 # 17 def get_status(message_id, options={}) 18 Message.find_by_message_id(message_id) 19 end 20 21 # 22 ### update status 23 # 24 def update_status(message_attrs, options={}) 25 message = Tweet.new 26 message.attributes = message_attrs 27 safe_save(message) 28 end 29 30 # 31 ### delete status 32 # 33 def destroy_message(message_id, options={}) 34 Message.destroy_message(message_id) 35 end 36 37 38 39 # 40 ### retrieves most recent statuses for public 41 # 42 def public_timeline(username, options={}) 43 offset = options[:offset] || 0 44 limit = options[:limit] || 10 45 User.timeline(username, offset, limit) 46 end 47 48 # 49 ### retrieves most recent statuses for user 50 # 51 def user_timeline(username, options={}) 52 offset = options[:offset] || 0 53 limit = options[:limit] || 100 54 User.timeline(username, offset, limit) 55 end 56 57 58 # 59 ### retrieves most recent statuses for friends 60 # 61 def friends_timeline(username, options={}) 62 offset = options[:offset] 63 limit = options[:limit] 64 get_friends_timeline(username, offset, limit) 65 end 66 67 68 # 69 ### retrieves most recent statuses for user and his/her friends 70 # 71 def user_and_friends_timeline(username, options={}) 72 offset = options[:offset] 73 limit = options[:limit] 74 messages = User.timeline(username, offset, limit) 75 messages += get_friends_timeline(username, offset, limit) 76 end 77 78 private 79 80 # 81 ### retrieves most recent statuses for friends without any conversion 82 # 83 def get_friends_timeline(username, offset, limit) 84 offset = offset || 0 85 following_count = Follower.following_count(username) 86 limit = limit || (following_count < 100 ? following_count / 100 + 1 : 1) 87 88 following = User.following_usernames(username, 0, 200) 89 messages = [] 90 following.each do |username| 91 messages += Message.messages_for(username, offset, limit) 92 end 93 messages 94 end 95 protected 96 def safe_save(message) 97 old_message = Message.find_by_message_id(message.message_id) 98 if old_message.nil? 99 message.save! 100 message 101 else 102 puts "Message with id #{message.message_id} already exists #{message.inspect}" 103 nil 104 end 105 end 106 end
./service/direct_messages_service.rb
1 # 2 ## DirectMessagesService provides functionality for sending/receiving direct messages 3 # 4 class DirectMessagesService < MessagesService 5 # 6 ### initialize service 7 # 8 def initialize(username) 9 super 10 end 11 # 12 ### sents back 20 most recent direct messages for the user 13 ### it assumes user is already authenticated. 14 # 15 def direct_messages_received(username, options={}) 16 offset = options[:offset] || 0 17 limit = options[:limit] || 20 18 DirectMessage.direct_messages_received(username, offset, limit) 19 end 20 # 21 ### sents back 20 most recent direct messages for the user 22 ### it assumes user is already authenticated. 23 # 24 def direct_messages_sent(username, options={}) 25 offset = options[:offset] || 0 26 limit = options[:limit] || 20 27 DirectMessage.direct_messages_sent(username, offset, limit) 28 end 29 30 # 31 ### send direct message 32 # 33 def send_direct_message(message_attrs, options={}) 34 message = DirectMessage.new() 35 message.attributes = message_attrs 36 safe_save(message) 37 end 38 39 # 40 ### send reply message 41 # 42 def send_reply(message_attrs, options={}) 43 message = ReplyMessage.new() 44 message.attributes = message_attrs 45 safe_save(message) 46 end 47 48 # 49 ### sents back 20 most recent replies for the user 50 ### it assumes user is already authenticated. 51 # 52 def replies(username, options={}) 53 offset = options[:offset] || 0 54 limit = options[:limit] || 100 55 ReplyMessage.replies(username, offset, limit) 56 end 57 58 59 # 60 ### delete direct message 61 # 62 def destroy_direct_message(id, options={}) 63 DirectMessage.destroy_message(id) 64 end 65 end
Source Code Business Delegate
./delegate/base_delegate.rb
1 # 2 ## BaseDelegate provides client side interface for connecting to the business 3 ## service using BusinessDelegate pattern. 4 # 5 class BaseDelegate 6 protected 7 def initialize(jms_helper, read_queue_name, write_queue_name) 8 @read_queue_name = read_queue_name 9 @write_queue_name = write_queue_name 10 @jms_helper = jms_helper 11 @jms_helper.create_producers(@read_queue_name, @write_queue_name) 12 end 13 14 15 def new_jms_message(properties={}) 16 jms_msg = @jms_helper.create_message("") ### ActiveMQTextMessage.new 17 jms_msg.setStringProperty('request_id', GuidGenerator.next_request_id) 18 properties.each do |name, value| 19 jms_msg.setStringProperty(name.to_s, value.to_s) 20 end 21 jms_msg 22 end 23 24 25 def send_read_request(jms_msg) 26 @jms_helper.send_message(@read_queue_name, jms_msg, true).text 27 end 28 29 30 def send_write_request(jms_msg, reply=true) 31 response = @jms_helper.send_message(@write_queue_name, jms_msg, reply) 32 if response.respond_to? :text 33 response.text 34 else 35 response 36 end 37 end 38 end
./delegate/users_delegate.rb
1 # 2 ## UserDelegate provides basic functionality for quering/storing users 3 ## It assumes connection to the right database is already setup. 4 # 5 class UsersDelegate < BaseDelegate 6 def initialize(jms_helper) 7 super(jms_helper, JmsHelper::QUEUE_READ_USERS, JmsHelper::QUEUE_WRITE_USERS) 8 end 9 def create_user(user_attrs, options={}) 10 args = options.merge(user_attrs).merge(:action => 'Users.create_user') 11 jms_msg = new_jms_message(args) 12 send_write_request(jms_msg) 13 14 end 15 16 # 17 ### retrieves user record 18 # 19 def get_user(username, options={}) 20 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.get_user')) 21 send_read_request(jms_msg) 22 23 end 24 25 # 26 ### return usernames of people the user is following 27 # 28 def followings(username, options={}) 29 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followings')) 30 send_read_request(jms_msg) 31 32 end 33 34 35 # 36 ### return 100 usernames of followers of authenticated user 37 # 38 def followers(username, options={}) 39 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.followers')) 40 send_read_request(jms_msg) 41 42 end 43 44 45 def authenticate(username, password, options={}) 46 jms_msg = new_jms_message(options.merge(:username => username, :password => password, :action => 'Users.authenticate')) 47 send_read_request(jms_msg) 48 49 end 50 51 52 def archive_messages(username, options={}) 53 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Users.archive_messages')) 54 send_read_request(jms_msg) 55 56 end 57 end
./delegate/messages_delegate.rb
1 # 2 ## MessagesDelegate provides basic functionality for quering/storing statuses/messages 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class MessagesDelegate < BaseDelegate 6 def initialize(jms_helper) 7 super(jms_helper, JmsHelper::QUEUE_READ_MESSAGES, JmsHelper::QUEUE_WRITE_MESSAGES) 8 end 9 # 10 ### get a single status 11 # 12 def get_status(message_id, options={}) 13 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.get_status')) 14 send_read_request(jms_msg) 15 16 end 17 18 # 19 ### update status 20 # 21 def update_status(message_attrs, options={}) 22 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'Messages.update_status'}.merge(options).merge(message_attrs)) 23 send_write_request(jms_msg) 24 25 end 26 27 # 28 ### delete status 29 # 30 def destroy_message(message_id, options={}) 31 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'Messages.destroy_message')) 32 send_write_request(jms_msg) 33 34 end 35 36 37 38 # 39 ### retrieves most recent statuses for public 40 # 41 def public_timeline(username, options={}) 42 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.public_timeline')) 43 send_read_request(jms_msg) 44 45 end 46 47 # 48 ### retrieves most recent statuses for user 49 # 50 def user_timeline(username, options={}) 51 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_timeline')) 52 send_read_request(jms_msg) 53 54 end 55 56 57 # 58 ### retrieves most recent statuses for friends 59 # 60 def friends_timeline(username, options={}) 61 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.friends_timeline')) 62 send_read_request(jms_msg) 63 64 end 65 66 67 # 68 ### retrieves most recent statuses for user and his/her friends 69 # 70 def user_and_friends_timeline(username, options={}) 71 jms_msg = new_jms_message(options.merge(:username => username, :action => 'Messages.user_and_friends_timeline')) 72 send_read_request(jms_msg) 73 74 end 75 end
./delegate/followers_delegate.rb
1 # 2 ## FollowersDelegate provides basic functionality for creating/quering followers 3 ## It assumes all users are already authenticated before calling these methods. 4 # 5 class FollowersDelegate < BaseDelegate 6 def initialize(jms_helper) 7 super(jms_helper, JmsHelper::QUEUE_READ_FOLLOWERS, JmsHelper::QUEUE_WRITE_FOLLOWERS) 8 end 9 # 10 ### create follower 11 # 12 def create_follower(follower_attrs, options={}) 13 jms_msg = new_jms_message(options.merge(follower_attrs).merge(:action => 'Followers.create_follower')) 14 send_write_request(jms_msg) 15 16 end 17 18 # 19 ### enable notifications 20 # 21 def enable_notifications(username, follower_username, options={}) 22 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.enable_notifications')) 23 send_write_request(jms_msg) 24 25 end 26 27 # 28 ### disable notifications 29 # 30 def disable_notifications(username, follower_username, options={}) 31 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.disable_notifications')) 32 send_write_request(jms_msg) 33 34 end 35 36 37 # 38 ### blocks a user 39 # 40 def block(username, follower_username, options={}) 41 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.block')) 42 send_write_request(jms_msg) 43 44 end 45 46 # 47 ### unblock user 48 # 49 def unblock(username, follower_username, options={}) 50 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.unblock')) 51 send_write_request(jms_msg) 52 53 end 54 55 56 # 57 ### delete status 58 # 59 def destroy_follower(username, follower_username, options={}) 60 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.destroy_follower')) 61 send_write_request(jms_msg) 62 63 end 64 65 66 # 67 ### get a single status 68 # 69 def exists?(username, follower_username, options={}) 70 jms_msg = new_jms_message(options.merge(:username => username, :follower_username => follower_username, :action => 'Followers.exists?')) 71 send_read_request(jms_msg) 72 73 end 74 end
./delegate/direct_messages_delegate.rb
1 # 2 ## DirectMessagesDelegate provides functionality for sending/receiving direct messages 3 # 4 class DirectMessagesDelegate < MessagesDelegate 5 def initialize(jms_helper) 6 super(jms_helper) 7 end 8 9 # 10 ### sents back 20 most recent direct messages for the user 11 ### it assumes user is already authenticated. 12 # 13 def direct_messages_received(username, options={}) 14 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_received')) 15 send_read_request(jms_msg) 16 17 end 18 # 19 ### sents back 20 most recent direct messages for the user 20 ### it assumes user is already authenticated. 21 # 22 def direct_messages_sent(username, options={}) 23 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.direct_messages_sent')) 24 send_read_request(jms_msg) 25 26 end 27 28 # 29 ### send direct message 30 # 31 def send_direct_message(message_attrs, options={}) 32 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_direct_message'}.merge(options).merge(message_attrs)) 33 send_write_request(jms_msg) 34 35 end 36 37 # 38 ### send reply message 39 # 40 def send_reply(message_attrs, options={}) 41 jms_msg = new_jms_message({:message_id => GuidGenerator.next_message_id, :action => 'DirectMessages.send_reply'}.merge(options).merge(message_attrs)) 42 send_write_request(jms_msg) 43 44 end 45 46 # 47 ### sents back 20 most recent replies for the user 48 ### it assumes user is already authenticated. 49 # 50 def replies(username, options={}) 51 jms_msg = new_jms_message(options.merge(:username => username, :action => 'DirectMessages.replies')) 52 send_read_request(jms_msg) 53 54 end 55 56 57 # 58 ### delete direct message 59 # 60 def destroy_direct_message(message_id, options={}) 61 jms_msg = new_jms_message(options.merge(:message_id => message_id, :action => 'DirectMessages.destroy_direct_message')) 62 send_write_request(jms_msg) 63 64 end 65 end
Source Code Message Handlers
./messaging/base_handler.rb
1 # 2 ## BaseHandler provides common functionality for handling messages 3 # 4 class BaseHandler 5 attr_reader :username 6 attr_reader :message_text 7 attr_reader :timings 8 def initialize 9 @timings = [] 10 @started_at = Time.now 11 @host_id = Digest::MD5.hexdigest("localhost") #lookup real host 12 end 13 14 def handle(jms_msg, options={}) 15 add_timing_begin 16 msg_attrs = {} 17 pnames = jms_msg.getPropertyNames() 18 for pname in pnames 19 msg_attrs[pname] = msg_attrs[pname.to_sym] = jms_msg.getObjectProperty(pname) 20 case pname 21 when 'username' 22 @username = msg_attrs[pname] 23 when 'auth_user' 24 @auth_user = msg_attrs[pname] 25 when 'request_id' 26 @request_id = msg_attrs[pname] 27 end 28 end 29 if @username.nil? 30 raise "username missing in #{jms_msg.inspect} options #{options.inspect}" 31 end 32 if jms_msg.respond_to? :text 33 #msg_attrs.merge!(xml_to_dict(jms_msg.text)) 34 msg_attrs[:message_text] = jms_msg.text 35 end 36 ## 37 add_timing_for_input_xml 38 response = do_handle(msg_attrs) 39 ## 40 add_timing_end 41 response 42 rescue ActiveRecord::RecordInvalid => invalid 43 response = xml_error("400", "handle", invalid.record.errors) 44 add_timing_end 45 response 46 rescue java.sql.SQLException => e 47 puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n" 48 while (e) 49 puts e.backtrace << "\n\n" 50 e = e.getNextException() 51 end 52 response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s}) 53 add_timing_end 54 response 55 rescue Exception => e 56 puts "Failed to handle #{self.class.name}(#{jms_msg} due to #{e.inspect}\n\n#{e.backtrace}" 57 response = xml_error("500", "handle", {"UNKNOWN_ERROR" => e.to_s}) 58 add_timing_end 59 response 60 end 61 62 protected 63 def username_for_service 64 @auth_user || @username 65 end 66 67 def add_timing(what) 68 @timings << {what, (Time.now.to_i-@started_at.to_i)} 69 end 70 def add_timing_begin 71 add_timing "#{self.class.name} begin" 72 end 73 def add_timing_end 74 add_timing "#{self.class.name} end" 75 end 76 def add_timing_for_db 77 add_timing "#{self.class.name} connected to db" 78 end 79 def add_timing_for_output_xml 80 add_timing "#{self.class.name} converting output xml" 81 end 82 def add_timing_for_input_xml 83 add_timing "#{self.class.name} converting input xml" 84 end 85 86 def do_handle(msg_attrs, options={}) 87 raise "Implement handle" 88 end 89 def xml_to_dict(xml) 90 Hash.from_xml(xml) 91 end 92 def xml_error(response_code, action, errors, options={}) 93 #logger.error "Failed to #{action} due to #{error_code} by #{username} -- #{error_message}" 94 to_xml(response_code, "Error response for #{action}", options) do |xml| 95 xml.errors do 96 errors.each do |error_code, error_message| 97 xml.error(:error_code => error_code, :error_message => error_message) 98 end 99 end 100 end 101 end 102 103 def to_xml(response_code, comment="", options={}) 104 buffer = options[:buffer] ||= '' 105 106 xml = options[:builder] ||= Builder::XmlMarkup.new(:target => buffer, :indent => options[:indent]) 107 xml.instruct! unless options[:skip_instruct] 108 xml.comment! "Response #{comment} in reply for #{username} as of #{Time.now.utc}" 109 xml.response(:request_id => @request_id, :response_code => response_code, :host_id => @host_id, :version => '1.0') do 110 yield xml if block_given? 111 xml.timings do 112 @timings.each do |what, duration| 113 xml.timing(:what => what, :duration_millis => duration) 114 end 115 end 116 end 117 buffer 118 end 119 end
./messaging/archive_messages_handler.rb
1 # 2 ## ArchiveMessagesHandler sends back archive messages for the user 3 # 4 class ArchiveMessagesHandler < BaseHandler 5 protected 6 def do_handle(msg_attrs, options={}) 7 svc = UsersService.new(self.username_for_service) 8 add_timing_for_db 9 messages = svc.archive_messages(self.username, options) 10 add_timing_for_output_xml 11 to_xml("200", "Archived Messages", options) do |xml| 12 xml.messages do 13 messages.each do |message| 14 message.to_external_xml(xml) 15 end 16 end 17 end 18 end 19 end
./messaging/authenticate_handler.rb
1 ## AuthenticateHandler authenticates user 2 # 3 class AuthenticateHandler < BaseHandler 4 protected 5 def do_handle(msg_attrs, options={}) 6 password = msg_attrs[:password] 7 svc = UsersService.new(self.username_for_service) 8 add_timing_for_db 9 user = svc.authenticate(username, password, options) 10 add_timing_for_output_xml 11 if user 12 to_xml("200", "Authentication", options) do |xml| 13 user.to_external_xml(xml) 14 end 15 else 16 xml_error("401", "Authentication", {"AUTH_FAILURE" => "Invalid username/password"}) 17 end 18 end 19 end
./messaging/block_follower_handler.rb
1 ## BlockFollowerHandler blocks follower 2 # 3 class BlockFollowerHandler < BaseHandler 4 protected 5 def do_handle(msg_attrs, options={}) 6 svc = FollowersService.new(self.username_for_service) 7 follower_username = msg_attrs[:follower_username] 8 add_timing_for_db 9 blocked = svc.block(self.username, follower_username, options) 10 response_code = blocked ? "200" : "404" 11 add_timing_for_output_xml 12 to_xml(response_code, "Block Follower", options) do |xml| 13 xml.follower(:username => self.username, :follower_username => follower_username) 14 end 15 end 16 end
Messaging Helpers
./messaging/message_forwarder.rb
1 class MessageForwarder 2 include java.lang.Runnable 3 include javax.jms.MessageListener 4 5 def initialize(jms_helper, queue_name, handlers) 6 @jms_helper = jms_helper 7 @queue_name = queue_name 8 @handlers = handlers 9 end 10 11 def run 12 @consumer = @jms_helper.get_consumer(@queue_name) 13 @consumer.set_message_listener(self); 14 puts "Starting listener for queue #{@queue_name}" 15 end 16 17 def onMessage(jms_msg) 18 action = jms_msg.getProperty('action') 19 if action.nil? 20 puts "Unknown message #{jms_msg.inspect}" 21 else 22 handler = @handlers[action] 23 if handler.nil? 24 puts "No handler for #{action} -- #{jms_msg.inspect}" 25 else 26 27 response = handler.new().handle(jms_msg) 28 29 if jms_msg.getJMSReplyTo() 30 reply_queue = jms_msg.getJMSReplyTo() 31 reply_producer = @jms_helper.create_producer(reply_queue) 32 reply_message = @jms_helper.create_message(response) 33 reply_message.setJMSCorrelationID(jms_msg.getJMSMessageID()) 34 reply_producer.send(reply_message) 35 end 36 end 37 end 38 end 39 40 def close 41 @consumer.close(); 42 end 43 end
./messaging/jms_helper.rb
1 2 class JmsHelper 3 #Object.module_eval("::#{i}") 4 #http://java.sun.com/j2ee/1.4/docs/tutorial/doc/JMS6.html 5 # 6 ### A key tip for scalability is to separate your reads from writes that way you can scale them independently and also 7 ### offer different quality of services, e.g. read queues can be non-persistent, but write queues can be persistent. 8 ### 9 # 10 QUEUE_READ_MESSAGES = 'read_messages' 11 QUEUE_WRITE_MESSAGES = 'write_messages' 12 QUEUE_READ_FOLLOWERS = 'read_followers' 13 QUEUE_WRITE_FOLLOWERS = 'write_followers' 14 QUEUE_READ_USERS = 'read_users' 15 QUEUE_WRITE_USERS = 'write_users' 16 17 READ_MESSAGES_HANDLERS = { 18 'DirectMessages.direct_messages_received' => DirectMessagesReceivedHandler, 19 'DirectMessages.direct_messages_sent' => DirectMessagesSentHandler, 20 'DirectMessages.replies' => RepliesHandler, 21 'Messages.get_status' => GetStatusHandler, 22 'Messages.public_timeline' => PublicTimelineHandler, 23 'Messages.user_timeline' => UserTimelineHandler, 24 'Messages.friends_timeline' => FriendsTimelineHandler, 25 'Messages.user_and_friends_timeline' => UserFriendTimelineHandler, 26 } 27 WRITE_MESSAGES_HANDLERS = { 28 'DirectMessages.send_direct_message' => SendDirectMessageHandler, 29 'DirectMessages.send_reply' => SendReplyMessageHandler, 30 'DirectMessages.destroy_direct_message' => DestroyDirectMessageHandler, 31 'Messages.update_status' => UpdateStatusHandler, 32 'Messages.destroy_message' => DestroyMessageHandler, 33 } 34 READ_FOLLOWERS_HANDLERS = { 35 'Followers.exists?' => FollowerExistsHandler, 36 } 37 WRITE_FOLLOWERS_HANDLERS = { 38 'Followers.create_follower' => CreateFollowerHandler, 39 'Followers.enable_notifications' => EnableNotificationsHandler, 40 'Followers.disable_notifications' => DisableNotificationsHandler, 41 'Followers.block' => BlockFollowerHandler, 42 'Followers.unblock' => UnblockFollowerHandler, 43 'Followers.destroy_follower' => DestroyFollowerHandler, 44 } 45 READ_USERS_HANDLERS = { 46 'Users.get_user' => GetUserHandler, 47 'Users.followings' => FollowingsHandler, 48 'Users.followers' => FollowersHandler, 49 'Users.authenticate' => AuthenticateHandler, 50 'Users.archive_messages' => ArchiveMessagesHandler, 51 } 52 WRITE_USERS_HANDLERS = { 53 'Users.create_user' => CreateUserHandler, 54 } 55 56 def initialize(connection=nil) 57 if connection 58 @connection = connection 59 else 60 factory = ActiveMQConnectionFactory.new("tcp://localhost:61616") 61 @connection = factory.create_connection() 62 @connection.set_exception_listener(self) 63 @connection.start(); 64 end 65 @queues = {} 66 @producers = {} 67 @consumers = {} 68 end 69 70 def start_consumers 71 ### Creating another instance of jms helper because it keeps private session which cannot be shared 72 ### across threads, though it can share connection. 73 start_forwarder(QUEUE_READ_MESSAGES, READ_MESSAGES_HANDLERS) 74 start_forwarder(QUEUE_WRITE_MESSAGES, WRITE_MESSAGES_HANDLERS) 75 start_forwarder(QUEUE_READ_FOLLOWERS, READ_FOLLOWERS_HANDLERS) 76 start_forwarder(QUEUE_WRITE_FOLLOWERS, WRITE_FOLLOWERS_HANDLERS) 77 start_forwarder(QUEUE_READ_USERS, READ_USERS_HANDLERS) 78 start_forwarder(QUEUE_WRITE_USERS, WRITE_USERS_HANDLERS) 79 # 80 java.lang.Thread.currentThread().join 81 end 82 83 def onException(jmsEx) 84 puts "JMS Exception occured #{jmsEx.inspect}, shutting down" 85 end 86 87 def close 88 @connection.close() 89 @session.close if @session 90 end 91 92 def create_message(text) 93 get_session().createTextMessage(text) 94 end 95 96 def send_message(queue_name, jms_msg, reply=false) 97 ## TODO add error_queue/dead letter queue 98 if reply 99 reply_queue = get_session().createTemporaryQueue() 100 jms_msg.setJMSReplyTo(reply_queue) 101 end 102 103 get_producer(queue_name).send(get_queue(queue_name), jms_msg) 104 if reply_queue 105 reply_consumer = get_session().createConsumer(reply_queue) 106 reply_consumer.receive() 107 end 108 end 109 110 def get_producer(queue_name) 111 producer = @producers[queue_name] 112 if producer.nil? 113 producer = @producers[queue_name] = get_session().createProducer(get_queue(queue_name)) 114 #producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT) # this can be changed for write queues 115 #producer.setTimeToLive(60000) 116 end 117 producer 118 end 119 def get_queue(queue_name) 120 queue = @queues[queue_name] 121 if queue.nil? 122 queue = @queues[queue_name] = get_session().create_queue(queue_name) 123 end 124 queue 125 end 126 127 def get_consumer(queue_name) 128 consumer = @consumers[queue_name] 129 if consumer.nil? 130 consumer = @consumers[queue_name] = get_session().create_consumer(get_queue(queue_name)) 131 end 132 consumer 133 end 134 135 def create_producers(*queue_names) 136 for queue_name in queue_names 137 queue = get_queue(queue_name) 138 producer = get_producer(queue_name) 139 end 140 end 141 142 def create_producer(queue) 143 get_session().createProducer(queue) 144 end 145 146 def self.start_broker 147 broker = BrokerService.new() 148 broker.addConnector("tcp://localhost:61616") 149 broker.start() 150 sleep 2 # let broker startup 151 end 152 protected 153 def get_session 154 @session ||= @connection.create_session(false, Session::AUTO_ACKNOWLEDGE) 155 end 156 def start_forwarder(queue_name, handlers) 157 raise "null connection" if @connection.nil? 158 jms_helper = JmsHelper.new(@connection) 159 Thread.new(){MessageForwarder.new(jms_helper, queue_name, handlers).run} 160 end 161 end 162 163 if $PROGRAM_NAME == __FILE__ 164 JmsHelper.start_broker 165 JmsHelper.new.start_consumers 166 end
I am not showing all message handlers because they look pretty much the same.
Database
./db/schema_helper.rb
1 require 'rubygems' 2 require 'un' 3 require 'active_record' 4 require 'create_followers' 5 require 'create_messages' 6 require 'create_users' 7 8 if defined?(JRUBY_VERSION) 9 gem 'activerecord-jdbc-adapter' 10 require 'jdbc_adapter' 11 require 'java' 12 end 13 14 15 class SchemaHelper 16 MAX_DB = 10 17 # 18 def self.create_schema() 19 for i in 0 ... MAX_DB 20 connect_to(i) 21 end 22 end 23 24 def self.hash_for(username) 25 hash = username.hash % MAX_DB 26 end 27 28 def self.setup_schema_for(username) 29 hash = hash_for(username) 30 connect_to(hash) 31 end 32 33 def self.connect_to(hash) 34 dbdir = "#{FileUtils.pwd}/data/db#{hash}" 35 dbfile = "#{dbdir}/database.sqlite" 36 #ActiveRecord::Base.establish_connection(:adapter => "sqlite", :dbfile => dbfile) 37 ActiveRecord::Base.establish_connection( 38 :adapter => "jdbc", 39 :dbfile => dbfile, 40 #:driver => "SQLite.JDBCDriver", ####:driver => "org.sqlite.JDBC", 41 #:url => "jdbc:sqlite:#{dbfile}" 42 :driver => "org.apache.derby.jdbc.EmbeddedDriver", 43 :url => "jdbc:derby:#{dbdir};create=true" 44 ) 45 unless File.directory?(dbdir) 46 47 48 CreateUsers.migrate(:up) 49 CreateMessages.migrate(:up) 50 CreateFollowers.migrate(:up) 51 end 52 end 53 def self.test 54 SchemaHelper.setup_schema_for("shahbhat") 55 require 'user' 56 for i in 0 ... 10 do 57 SchemaHelper.connect_to(i) 58 puts User.count 59 end 60 end 61 end
May 30, 2008
Challenges of multicore programming
The multicore processors have put parallel and concurrent programming at the forefront. In A Fundamental Turn Toward Concurrency in Software article in Dr Dobb’s warned programmers that the free lunch is over. This has spurred ongoing debates about future languages and how they will rescue software development. Here are a few features that are being postulated as the panacea of multicores:
Multi-threading
Java, C++, C# camp has had support for native threads for a while and they claim that these native threads will allow programmers to take advantage of the multitude of cores. After having done multi-threading for over ten years, I must admit multi-threading is not easy. Concurrent programming based on threads and locks is error prone and can lead to nasty problems of deadlocks, starvation, idle spinning, etc. As number of cores reaches thousands or millions, the shared memory will become single point of bottleneck.
Software Transactional Memory
Languages like Haskell and Clojure are adding support for STM, which treat memory like database and use optimistic transactions. Instead of locking, each thread can change any data, but when it commits it verifies the data has not been changed and retries transaction if the data is changed. This area is relative new, but resemebles more like shared memory so it probably will face same scalability issues.
Actor Based Model with Message Passing
I learned about Actor based programming from reading Agha Gul’s papers in school. In this model each process owns a private mailbox and processes communicate to each other by sending messages to each other. Languages like Erlang and more recently Scala use Actor based model for parallel programming. This model is very scalable because there is no locking involved. Also, the messages are immutable in Erlang (though may not be in Scala), so data corruption is not likely.The only drawback of this model is that spliting your application or algorithm into independent processes that communicate via message passing is not trivial.
TupleSpace based model
In tuple space, processes communicate to each other by storing messages in tuple spaces. The tuple space provides simple APIs like get, put, read and eval. Where get/read are blocking operations to implement dataflow based application and eval creates a new process to execute a task and store result in the tuple space. I built a similar system called JavaNOW for my Ph.D. project. Though, there are a number of open source and commercial frameworks availeble such as JavaSpaces, GigaSpaces, Blitz.
Fork/Join
This is standard pattern from grid computing, also known as Master-Slave, Leader/Follower, JobJar, and somewhat similar to Map/Reduce where a master process adds work in a queue and workers pick up the work and store result in another queue and master process picks up the result. It also uses other constructs of concurrent/parallel programming such as barriers, futures, etc. There are plenty of libraries and frameworks available for this such as Globus, Hadoop, MPI/MPL, PVM. Java 7 will also have Fork/Join framework.
Messaging Middleware(ESB)
Messaging middlewares allow another way to build actor based model where a thread listens on queue and other threads/processes communicate via pub/sub. Java’s JMS has a lot of support for this and frameworks like Mule, Camel, ServiceMix can help build applications that use SEDA (Staged Event-driven Architecture). Though, this is more prominent in service oriented architectures but there is not reason why it can’t be used for leveraging multicores.
Compiler based parallelism
High performance Fortran or Parallel Fortran use data flow analysis to create parallel applications. Though, this might be simplest solution but often the code generated is not as efficient as hand coded algorithm.
Conclusion
Though, parallel and concurrent programming is new model for vast majority of programmers, but this problem is not new to the high performance computing area. They have already tackled this problem and know what works and what does not work. In early 90s, I worked on Silicon Graphics machine that built large computers based on NUMA architecture and provided shared memory model for computing. Due to inherent nature of shared memory, they were not as scalable. As opposed to those, IBM built SP1 and SP2 systems that used messaging passing based APIs (MPL similar to MPI), where you could add as many nodes as you need. These systems were much more scalable. These two different systems quite nicely show difference between shared based model and messaging based model of programming. A key advantage of message passing is that it avoids any kind of locking. This is the reason I like Erlang because it supports immutability and efficient message passing. However, biggest challenge I found with parallel programming was to breaking an algorithm or application down to smaller parts that run in parallel. Amdahl’s law shows that speedup in an application from multiple processors is limited by its serial nature. Donuld Knuth in recent interview pointed that vast majority of the programs are serial in nature. In the end, there are no simple solutions in the horizon, though there are some proven techniques from HPC and functional langages that can help.
May 23, 2008
Chasing the bright lights
I have been IT enthusiast and professional for over twenty years and I consider my self to be “Innovators” type when it comes to technology and programming. I have seen a number of changes over the years. One of my manager used to say that we like to chase bright lights. Unfortunately, many of the trends die off naturally or fail to cross the chasm. Here are some of those things that I chased that died off or faded away:
Mainframe
I worked on a mainframes for a couple of years early in my career and did programming in COBOL and CICS. Though, mainframes are not quite dead, but I am actually glad that they have faded away.
VAX/OpenVMS
I also worked on VAX/VMS and OpenVMS systems, they were rock solid and I am a bit disappointed that they could not evolve.
PASCAL/ICON/PROLOG/FORTRAN/BASIC
BASIC was my first programming language, but hasn’t used it since early DOS days. I learned PASCAL in college and found it better than C, but in real life didn’t see a lot of usage. I also learned ICON and PROLOG, but didn’t find any real use and have not used FORTRAN since old VAX days.
NUMA based servers
In early 90s, Silicon Graphics built very powerful machines based on NUMA architecture, that gave shared memory model of programming on a number of processors. Unfortunately, they had some limits to how big they could become and not to mention all the locking slowed down shared memory access. Around the same time, IBM SP2 built systems based on message passing (MPL), which were a lot more scalable. These two programming models are now coming to the front row as multicore programming is becoming essential. I get to play both of these systems at Fermilab, Metromail, TransUnion and Argonne Lab. I am sure, lessons from these early models will not be lost and message passing based programming will win.
PowerPC NT/Solaris/AIX
Back in 94-95, Motorola created these PowerPC machines that could run NT or Solaris and I thought they were pretty cool. So, I spent all my savings and bought one. Unfortunately, Sun abandoned Solaris soon after and Microsoft did same. I finally got AIX to run on it, but it just didn’t go anywhere.
BeOS
Back when Apple was looking for next generation operating systems for Macs, they seriously considered BeOs, which was pretty cool. I played with it and bought a few books to program in it. Unfortunately, Steve Jobs went with his NextStep system and BeOS just faded away.
Java Ring
In early days of Java, Sun announced huge support for Smart cards, which came with strong cryptography and small memory. I spent several hundred dollars and bought SDK kits, Java rings and smart cards from ibutton.com. This too just didn’t cross the chasm and died off.
CORBA
I did a lot of CORBA in 90s, which I thought was a lot better than socket based networking I did before that. They had a lot of limitations and despite having standards, there was very hard to integrate. Now, they have moved out of the limelight.
Voyager
Voyager was an ORB from ObjectSpace that had very nice features for agent based computing. It also had nice concepts like Facets or dynamic composition. It inspired me to built my own ORB “JavaNOW” that supported seamless networking and agent based computing. It too failed to cross the chasm and died off.
EJB
One of the difficult thing with CORBA was maintenance of servers, because each server ran in its own process. I had to write pretty elaborated monitoring and liefcycle system to start these servers in right order and restart if they fail. I thought, application servers like Weblogic and Websphere solved that problem. A lot of people who were not familiar with the distributed systems tried to use EJBs as local methods and failed miserably. I built proper value objects before there was pattern after them and used EJBGEN to create all stubs. I actuallyI don’t miss the elaborate programming, but still see need for application servers to host services.
MDA/MDD/Software Factories
In early 2000, I was very interested in model driven architecture and development and thought that they may improve software development process. Though, I had seen failure of CASE tools in early 90s, but I thought these techniques were better. I still hope better generative and metaprogramming help cut some of the development cost.
Aspect Oriented Programming
I leared about AOP in 90s when I was looking for some PhD projects, it became popular in early 2000. Now, it too has been faded away.
Methodologies/UML
Agile methodologies have killed a number of methodologies like Rational Unified Process (RUP), Catalyst, ICONIX, UML modeling, etc. I never liked heavy weight processes, but do see value of some high level architecture and modeling.
May 20, 2008
Rebooting philosophy in Erlang
I just read “Let It Crash” Programming, which talks about how Erlang is designed as a fault tolerant language from ground up. I have been learning Erlang since Joe Armstrong’s book came out and have heard Joe a few times talk about fault tolerance. Steve Vionski has also talked about Erlang: It.s About Reliability in flame war between him and Ted Neward. For me, Erlang reminds of Microsoft Windows, i.e. when Windows stops working I just reboot the machine. Erlang does the same thing, when some process fails, it just restarts the processes. About sixteen years ago, I started my career in old VAX, Mainframe and UNIX environments and my managers used to say that he never had to restart Mainframe if something fails, but somehow bugs on Windows get fixed after reboot. When I worked at Fermilab in mid 90s, we had server farms of hundreds of machines and fault tolerance was quite important. Though, Google didn’t invent server farms, but it scaled them to new level, where failure of machines don’t stop the entire application. Erlang takes the same philosophy to the programming language. Obviously, in order to make truly fault tolerant application, the Erlang processes will need to be spawned on separate machines. Erlang’s support of CSP style communication and distributed computing such as OTP makes it trivial. You can further increase fault tolerance and high availibility by using machines on separate racks, networks, power sources or data centers. No wonder, Facebook is using Erlang in its Chat application.
May 19, 2008
Sprint like Hell!
I think Sprint is wrong metaphore for iteration, because managers seem to think that it means developers will run like hell to finish the work. Though, my project has adopted Scrum recently, but it is far from the true practices, principles and values of agility. For past many months, we have been developing like hell and as a result our quality has suffered.

