October 8, 2012

Comparing Server side Websockets using Atmosphere, Netty, Node.js and

My last two bogs (Pub/sub in Node.js and Websockets and Scaling Node.js) described how I have been evaluating Node.js and Websockets for streaming data. I would describe results of that evaluation along with a few other frameworks that I have been testing.

Use Case

The primary use case for my application is pub/sub where clients subscribe to some channel and receive updates such as subscribing for stock quotes or notifications about orders.


In my last blog, I described all the issues I had with library for Node.js and after a lot of frustration I gave up on it. Instead, I used WS library for WebSockets and though it doesn’t provide fallback options for other protocols but’s it much more stable and efficient.

Pub/Sub Server

Here is the pub/sub server that provides methods to subscribe some channel identifier and sends update to a random channel continuously.

var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 5;
 var MAX_THREADS = process.env.MAX_THREADS || 5;
 var MAX_ROWS = process.env.MAX_ROWS || 10;
 var nextClientId = 0;
 var errors = 0;
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')(OUTLIER_SIZE);
 var cluster = require('cluster');
 if (cluster.isMaster) {
     for (var i = 0; i < MAX_THREADS; i++) {
     cluster.on('exit', function(worker, code, signal) {
     console.log('clients, errors, ' + metrics.heading() + ',' + hwUsage.heading());
 } else {
     var pubsubController = require('pubsub_controller')(, MAX_ROWS, hwUsage, metrics);
     var WebSocketServer = require('ws').Server
     , wss = new WebSocketServer({port: 8124});
     wss.on('connection', function(ws) {
         ws.key = ++nextClientId;
         ws.on('close', function() {
         ws.on('error', function() {
         ws.on('message', function(text) {
             var msg = JSON.parse(text);
             if (msg.action === 'subscribe') {
                 pubsubController.subscribe(ws, msg.identifier);
             } else if (msg.action === 'unsubscribe') {
                 pubsubController.subscribe(ws, msg.identifier);
     setInterval(function() {
     setInterval(function() {
         hwUsage.stop(function(usage) {
             console.log(nextClientId+ ',' + errors + metrics.summary().toString() + ',' + usage.toString());
     }, 15000);
 process.on('uncaughtException', function(err) {
     console.log("GLOBAL " + err + "\n" + err.stack);

Subscription management

Following module defines APIs for managing subscriptions and as I mentioned when number of messages received by client reaches maxMessages, it unsubscribes from the server.

var helper = require('helper');
 module.exports = function(workerId, numElements, hwUsage, metrics) {
     var channelSubscribers = {};
     var nextRequestId = 0;
     var allKeys = {};
     allKeys[workerId] = [];
     var generatePayload = function(identifier) {
         var elements = [];
         var date = new Date();
         for (var i=0; i<numElements; i++) {            elements.push({sequence:i, price: helper.random(100), identifierId:identifier, symbol:"QQQ", transaction:"STO", qty: helper.random(200)});
         var reqId = Number(++nextRequestId + '.' + workerId);
         var payload = {request: reqId, rows:numElements, timestamp : date.getTime(), identifier:Number(identifier), elements: elements};
         return payload;
     var makeKey = function(socket) {
         return workerId + '_' + socket.key;
     var getSubscriber = function(socket) {
         return channelSubscribers[makeKey(socket)];
     var setSubscriber = function(socket, rec) {
         channelSubscribers[makeKey(socket)] = rec;
     var removeSubscribers = function(socket) {
         delete channelSubscribers[makeKey(socket)];
     return {
         subscribe: function(socket, identifier) {
             var rec = getSubscriber(socket);
             if (typeof rec === "undefined") {
                 rec = {socket : socket, identifiers : []};
             setSubscriber(socket, rec);
         unsubscribe: function(socket, identifier) {
             var rec = getSubscriber(socket);
             if (typeof rec === "undefined") {
                 return false;
             var ndx = rec.identifiers.indexOf(identifier);
             rec.identifiers.splice(ndx, 1);
             if (rec.identifiers.length == 0) {
             } else {
                 setSubscriber(socket, rec);
         unsubscribeAll: function(socket) {
         count: function() {
             return helper.size(channelSubscribers);
         push: function() {
             if (allKeys[workerId].length == 0) return 0;
             var key = allKeys[workerId][helper.random(allKeys[workerId].length)];
             var rec = channelSubscribers[key];
             if (typeof rec === "undefined") {
                 return -1;
             for (var i=0; i<rec.identifiers.length; i++) {
                 var identifier = rec.identifiers[i];
                 try {
                     var request = generatePayload(identifier);
                     var text = JSON.stringify(request);
                     metrics.update(nextRequestId, new Date().getTime(), text.length);
                 } catch (err) {
                     console.log('failed to send execution report' + err);
                     try {
                     } catch (ex) {
                         console.log('failed to close socket ' + ex);
                     delete channelSubscribers[key];

I am skipping some of helper modules, but you can find them from the source code.

The framework follows Node.js design for single-thread event loop with asynchronous I/O but is written in Java. It also provides polyglot support for a number of languages such as Java, Javascript, Ruby, Groovy, and Python. Here is similar implementation of Pub/Sub server in Java:

 package com.plexobject.vertx;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 public class VertxServer extends Verticle {
     private static final Logger logger = LoggerFactory.getLogger(VertxServer.class);
     private final ObjectMapper mapper = new ObjectMapper();
     private final JsonFactory jsonFactory = mapper.getJsonFactory();
     private Map<String, Set<ServerWebSocket>> subscriptions = new ConcurrentHashMap<String, Set<ServerWebSocket>>();
     private Timer sendTimer = new Timer(true);
     private Timer metricsTimer = new Timer(true);
     private final AtomicLong nextRequestId = new AtomicLong();
     private final Metrics metrics = new Metrics();
     private final StringBuilder elements = new StringBuilder();
     private final Random random = new Random();
     private int maxIdentifier;
     public VertxServer() {"identifiers," + Metrics.getHeading());
         for (int i=0; i<500; i++) {
         metricsTimer.schedule(new TimerTask() {
             public void run() {
        + metrics.getSummary().toString());
         }, 5000, 15000);
         sendTimer.schedule(new TimerTask() {
             public void run() {
                while (true) {
         }, 1000);
     public void start() {
         vertx.createHttpServer().setAcceptBacklog(10000).websocketHandler(new Handler<ServerWebSocket>() {
             public void handle(final ServerWebSocket ws) {
                 if (ws.path.equals("/channels")) {
                     ws.dataHandler(new Handler<Buffer>() {
                         public void handle(Buffer data) {
                             try {
                                 JsonParser jp = jsonFactory.createJsonParser(data.toString());
                                 JsonNode jsonObj = mapper.readTree(jp);
                                 String action = jsonObj.get("action").asText();
                                 String identifier = jsonObj.get("identifier").asText();
                                 if (action == null || identifier == null) {
                                 if ("subscribe".equals(action)) {
                                     maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier);
                           "Max " + maxIdentifier);
                                     Set<ServerWebSocket> sockets = subscriptions.get(identifier);
                                     if (sockets == null) {
                                         sockets = new HashSet<ServerWebSocket>();
                                         subscriptions.put(identifier, sockets);
                                 } else if ("unsubscribe".equals(action)) {
                                     Set<ServerWebSocket> sockets = subscriptions.get(identifier);
                                     if (sockets == null) {
                                     if (sockets.size() == 0) {
                             } catch (IOException e) {
                                 logger.error("Failed to handle " + data, e);
                 } else {
         }).requestHandler(new Handler<HttpServerRequest>() {
             public void handle(HttpServerRequest req) {
                 //if (req.path.equals("/")) req.response.sendFile("websockets/ws.html"); // Serve the html
     private String getRandomIdentifier() {
         return String.valueOf(Math.abs(random.nextInt(maxIdentifier)) + 1);
     public void sendMessageForRandomIdentifier() {
         if (subscriptions.size() == 0) {
         String identifier = getRandomIdentifier();
         if (identifier == null) {
         Set<ServerWebSocket> sockets = subscriptions.get(identifier);
         if (sockets == null || sockets.size() == 0) {
         final long t = System.currentTimeMillis();
         String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}";
         for (ServerWebSocket ws : sockets) {
             try {
             } catch (Exception e) {
                 logger.error("Failed to send " + msg, e);

Note that requires JDK 7 and here is how you can start the server:

 export PATH=/home/sbhatti/jdk1.7.0_10/bin:$PATH
 export JAVA_HOME=/home/sbhatti/jdk1.7.0_10
 mvn package
 bin/vertx run com.plexobject.vertx.VrtxServer -cp target/classes/ -instances=5

Note that I started with 5 instances of the server to match Node.js’ clustering options.

Netty is a lightweight application server that provides asynchronous events and I/O so I also tested its Websocket’s support in its latest 4.0-beta version.


Here is the main server implementation that bootstraps web-socket server:

 package com.plexobject.netty.server;
 import io.netty.bootstrap.ServerBootstrap;
 public class WebSocketServer {
         private final int port;
         public WebSocketServer(int port) {
                 this.port = port;
         public void run() throws Exception {
                 ServerBootstrap b = new ServerBootstrap();
                 try {
                NioEventLoopGroup(), new NioEventLoopGroup())
                                         .childHandler(new WebSocketServerInitializer());
                         Channel ch = b.bind().sync().channel();
                 } finally {
         public static void main(String[] args) throws Exception {
                 int port;
                 if (args.length > 0) {
                         port = Integer.parseInt(args[0]);
                 } else {
                         port = 8124;
                 new WebSocketServer(port).run();

Pub/Sub Server

Here is implementation of websocket handler that provides pub/sub functionality:

 package com.plexobject.netty.server;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.math.BigInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
 import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
 import static io.netty.handler.codec.http.HttpMethod.GET;
 import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 import io.netty.logging.InternalLogger;
 import io.netty.logging.InternalLoggerFactory;
 import io.netty.util.CharsetUtil;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
 public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
     private static final InternalLogger LOGGER = InternalLoggerFactory
     private static final ObjectMapper jsonMapper = new ObjectMapper();
     private static final JsonFactory jsonFactory = jsonMapper.getJsonFactory();
     private static final String WEBSOCKET_PATH = "/";
     private WebSocketServerHandshaker handshaker;
     private static Map<String, Set<ChannelHandlerContext>> subscriptions = new ConcurrentHashMap<String, Set<ChannelHandlerContext>>();
     private static Timer timer = new Timer(true);
     private static final AtomicLong REQUEST_ID = new AtomicLong();
     private static final Metrics METRICS = new Metrics();
     static {
         timer.schedule(new TimerTask() {
             public void run() {
                while (true) {
         }, 1000);
     public void messageReceived(ChannelHandlerContext ctx, Object msg)
             throws Exception {
         if (msg instanceof HttpRequest) {
             handleHttpRequest(ctx, (HttpRequest) msg);
         } else if (msg instanceof WebSocketFrame) {
             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
     private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req)
             throws Exception {
         // Allow only GET methods.
         if (req.getMethod() != GET) {
             sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1,
         if (req.getUri().equals("/favicon.ico")) {
             HttpResponse res = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
             sendHttpResponse(ctx, req, res);
         // Handshake
         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                 getWebSocketLocation(req), null, false);
         handshaker = wsFactory.newHandshaker(req);
         if (handshaker == null) {
         } else {
             handshaker.handshake(, req);
     private void handleWebSocketFrame(ChannelHandlerContext ctx,
             WebSocketFrame frame) throws IOException {
         // Check for closing frame
         if (frame instanceof CloseWebSocketFrame) {
             handshaker.close(, (CloseWebSocketFrame) frame);
         } else if (frame instanceof PingWebSocketFrame) {
         } else if (!(frame instanceof TextWebSocketFrame)) {
             throw new UnsupportedOperationException(String.format(
                     "%s frame types not supported", frame.getClass().getName()));
         String jsonText = ((TextWebSocketFrame) frame).getText();
         JsonParser jp = jsonFactory.createJsonParser(jsonText);
         JsonNode actualObj = jsonMapper.readTree(jp);
         String action = actualObj.get("action").getTextValue();
         String identifier = actualObj.get("identifier").getTextValue();
         if (action == null || identifier == null) {
         if ("subscribe".equals(action)) {
             Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
             if (contexts == null) {
                 contexts = new HashSet<ChannelHandlerContext>();
                 subscriptions.put(identifier, contexts);
         } else if ("unsubscribe".equals(action)) {
             Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
             if (contexts == null) {
             if (contexts.size() == 0) {
     private static String getRandomIdentifier() {
         List<String> identifiers = new ArrayList<String>(subscriptions.keySet());
         if (identifiers.size() == 0) {
             return null;
         int n = new Random().nextInt(identifiers.size());
         return identifiers.get(n);
     public static void sendMessageForRandomIdentifier() {
         String identifier = getRandomIdentifier();
         if (identifier == null) {
         Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
         if (contexts == null || contexts.size() == 0) {
         SecureRandom random = new SecureRandom();
         String elements = new BigInteger(500, random).toString(32);
         String json = "{\"request\":" + REQUEST_ID.incrementAndGet() + ", \"timestamp\":" + System.currentTimeMillis() + ", \"identifier\":\"" + identifier + "\", \"elements\": \"" + elements + "\"}";
         TextWebSocketFrame frame = new TextWebSocketFrame(json);
         for (ChannelHandlerContext ctx : contexts) {
             try {
             } catch (Exception e) {
         if (System.currentTimeMillis() % 1000 == 0) {
             System.out.println("identifiers," + Metrics.getHeading());
             System.out.println(subscriptions.size() + METRICS.getSummary().toString());
     private static void sendHttpResponse(ChannelHandlerContext ctx,
             HttpRequest req, HttpResponse res) {
         // Generate an error page if response status code is not OK (200).
         if (res.getStatus().getCode() != 200) {
             setContentLength(res, res.getContent().readableBytes());
         // Send the response and close the connection if necessary.
         ChannelFuture f =;
         if (!isKeepAlive(req) || res.getStatus().getCode() != 200) {
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
             throws Exception {
         if (cause instanceof java.nio.channels.ClosedChannelException == false) {
     private static String getWebSocketLocation(HttpRequest req) {
         return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;

As, you can see its implementation is a bit verbose and I am skipping some of other boiler code.


Atmosphere is another Java framework that provides support for Websockets so I tried it as well and here is its implementation:

 package com.plexobject;
 import org.atmosphere.config.service.WebSocketHandlerService;
 import org.atmosphere.cpr.Broadcaster;
 import org.atmosphere.cpr.BroadcasterFactory;
 import org.atmosphere.websocket.WebSocket;
 import org.atmosphere.websocket.WebSocketHandler;
 import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.atmosphere.cpr.AtmosphereServlet;
 import org.atmosphere.websocket.WebSocketHandler;
 public class ChannelResponseHandler extends WebSocketHandler {
     private static final Logger logger = LoggerFactory.getLogger(ChannelResponseHandler.class);
     private static final Metrics METRICS = new Metrics();
     private static final String PATH_PREFIX = "/channels";
     private boolean started;
     private int maxIdentifier;
     private final Random random = new Random();
     private Timer timer = new Timer(true);
     private final ObjectMapper mapper = new ObjectMapper();
     private final JsonFactory jsonFactory = mapper.getJsonFactory();
     private final AtomicLong opened = new AtomicLong();
     private final AtomicLong closed = new AtomicLong();
     private final AtomicLong nextRequestId = new AtomicLong();
     private final StringBuilder elements = new StringBuilder();
     public ChannelResponseHandler() {
         for (int i=0; i<500; i++) {
     public void onTextMessage(WebSocket webSocket, String msg) {
         try {
             JsonParser jp = jsonFactory.createJsonParser(msg);
             JsonNode jsonObj = mapper.readTree(jp);
             String action = jsonObj.get("action").asText();
             String identifier = jsonObj.get("identifier").asText();
             logger.debug("onMessage " + action + " - " + identifier);
             if (action == null || identifier == null) {
             if ("subscribe".equals(action)) {
                 maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier);
             } else if ("unsubscribe".equals(action)) {
         } catch (Exception e) {
             logger.error("Failed to handle message " + msg, e);
     public void onOpen(WebSocket webSocket) {
         logger.trace("onOpen {}", webSocket.resource().getRequest());
     public void onClose(WebSocket webSocket) {
     public void onError(WebSocket webSocket, WebSocketException t) {
         System.err.println("error " + t);
         logger.trace("onError {}", t.getStackTrace());
         super.onError(webSocket, t);
     public void sendMessageForRandomIdentifier() {
         try {
             if (opened.get() <= closed.get()) {
             String identifier = getRandomIdentifier();
             Broadcaster bc = getBroadcaster(identifier);
             if (bc == null) {
                 logger.debug("Failed to find broadcaster for " + identifier);
             long t = System.currentTimeMillis();
             String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}";
             if (t % 1000 == 0) {
         } catch (Exception e) {
             logger.error("Failed to send message", e);
     private Broadcaster getBroadcaster() {
         return getBroadcaster(null);
     private Broadcaster getBroadcaster(String identifier) {
         String path = identifier == null ? PATH_PREFIX : PATH_PREFIX + "/" + identifier;
         BroadcasterFactory factory = BroadcasterFactory.getDefault();
         return factory != null ? factory.lookup(path, true) : null;
     synchronized void start() {
         if (started) return;
         started = true;
         timer.schedule(new TimerTask() {
                 public void run() {
                   while (started) {
             }, 1000);
     synchronized void stop() {
     private String getRandomIdentifier() {
         return String.valueOf(random.nextInt(maxIdentifier) + 1);
     public static void main(String[] arguments) throws Exception {
         try {
             Server server = new Server();
             Connector con = new SelectChannelConnector();
             ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
             context.addServlet(new ServletHolder(new AtmosphereServlet()), "/*");
         } catch (Exception ex) {

Note that I had to deploy Atmosphere in Jetty 8.1.7 as a war file and here is web.xml

 <?xml version="1.0" encoding="UTF-8"?>
 <web-app xmlns:xsi=""
          xmlns="" xmlns:web=""
          id="WebApp_ID" version="2.5">
     <display-name>WebSocket Load Test</display-name>


I previously wrote client in Javascript but I rewrote in Java so that I can use Java’s concurrent library to manage counters.

WebSocket Client

I tried Netty and weberknecht libraries and settled on weberknecht. Here is its implementation:

 package com.plexobject.websocket.client;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Map;
 import de.roderick.weberknecht.*;
 public class WebSocketClient implements ClientProtocol {
     private WebSocket webSocket;
         private final String identifier;
         public WebSocketClient(final String identifier, final URI server, final ClientEventListener listener) throws Exception {
                 this.identifier = identifier;
         webSocket = new WebSocketConnection(server);
         webSocket.setEventHandler(new WebSocketEventHandler() {
             public void onClose() {
             public void onMessage(WebSocketMessage msg) {
                             listener.onMessage(identifier, msg.getText());
             public void onOpen() {
     public void close() throws Exception {
     public void send(String text) throws Exception {
     public String getIdentifier() {
         return identifier;


I also tested Kaazing a bit but it is a commercial software and their trial license limited connections so I gave up on it.

Load Tester

The load tester takes arguments about how many connections to open and how long to run the test. It generates identifiers from 1 to 10,000 and sends subscription message to the server for each of those channel identifiers. It then keeps track of messages received in a helper class Metrics and logs that information on regular interval, i.e.,

 package com.plexobject.websocket.client;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 public class LoadTester implements ClientEventListener {
     private static final AtomicLong OPEN_CONNECTIONS = new AtomicLong();
     private static final AtomicLong CLOSE_CONNECTIONS = new AtomicLong();
     private static final AtomicLong REQUEST_ID = new AtomicLong();
     private static final Metrics METRICS = new Metrics();
     private List<ClientProtocol> clients = new ArrayList<ClientProtocol>();
     private static final List<Integer> identifiers = new ArrayList<Integer>();
     static {
         for (int i=1; i<=10000; i++) {
     public LoadTester(String url, int maxConnections) throws Exception {
         try {
             URI uri = new URI(url);
             for (int i=0; i<maxConnections; i++) {
                 String identifier = String.valueOf(identifiers.get(i%identifiers.size()));
                 ClientProtocol client = new WebSocketClient(identifier, uri, this);
                 String json = "{\"action\":\"subscribe\", \"identifier\":\"" + identifier + "\"}";
                 try {
                 } catch (InterruptedException e) {
         } catch (OutOfMemoryError e) {
     public void close() {
         for (ClientProtocol client : clients) {
             try {
             } catch (Exception e) {
     public void onError(Throwable e) {
     public void onMessage(String identifier, String message) {
         String act = METRICS.update(message);
         if (!identifier.equals(act)) {
             System.out.println("Expected " + identifier + ", but received " + act + " for " + message);
     public void onClose() {
     public void onOpen() {
     public static void main(String[] args) throws Exception {
         if (args.length == 0) {
             System.err.println("Usage client.LoadTester url connections max-runtime-in-minutes");
         final String url = args[0]; // "ws://localhost:8124/"
         final int maxConnections = Integer.parseInt(args[1]);
         final int maxRuntimeMins = Integer.parseInt(args[2]);
         System.out.println("connections,connected," + Metrics.getHeading());
         final long started = System.currentTimeMillis();
         final Timer timer = new Timer(true);
         timer.schedule(new TimerTask() {
                 public void run() {
                     System.out.println(maxConnections + "," + (OPEN_CONNECTIONS.get()-CLOSE_CONNECTIONS.get()) + "," + METRICS.getSummary());
                     long elapsed = System.currentTimeMillis() - started;
                     if (elapsed > (maxRuntimeMins * 1000 * 60)) {
                         System.out.println("time done");
             }, 5000, 15000);
         LoadTester runner = new LoadTester(url, maxConnections);
         while (true) {
             try {
             } catch (InterruptedException e) {
             if (CLOSE_CONNECTIONS.get() == maxConnections) {
                 System.out.println("all clients closed");

I am skipping some of helper classes and interfaces, but you can find them from the source code.

Test Script

I created a simple shell script to launch load test for 5 minutes and given number of connections and collected results in a comma delimited file, i.e,

 mvn package
 export MAVEN_OPTS="-Xmx1000m -Xss128k"
 export url=ws://localhost:8080/atmosphere-1.0/channels
 #export url=ws://localhost:8080/channels
 #export url=ws://localhost:8124/
 while [ $clients -lt 100000 ]
    mvn exec:java -Dexec.mainClass="com.plexobject.websocket.client.LoadTester" -Dexec.classpathScope=runtime -Dexec.args="$url $clients 5"
    clients=`expr $clients + 500|awk '{print $1}'`

Note that I specified stack size of thread to be 128k as default stack size per thread is 2MB, which takes a lot of memory.

TCP Settings

As I mentioned in my last blog, I made some changes to my Linux machine to improve TCP settings, i.e.,

 net.core.rmem_max = 33554432
 net.core.wmem_max = 33554432
 net.ipv4.tcp_rmem = 4096 16384 33554432
 net.ipv4.tcp_wmem = 4096 16384 33554432
 net.ipv4.tcp_mem = 786432 1048576 26777216
 net.ipv4.tcp_max_tw_buckets = 360000
 net.core.netdev_max_backlog = 2500
 vm.min_free_kbytes = 65536
 vm.swappiness = 0
 net.ipv4.ip_local_port_range = 1024 65535

I then applied above changes using:

 sudo sysctl -w net.core.somaxconn=10000
 sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000 
 sudo sysctl -p


Here are results of testing up to 5000 connections, where server sends a message to a random channel continuously:


connections connected average response (ms) average size messages msg/sec walltime(secs)
500 500 1 1462 282431 943.317 301
1000 1000 1 1462 282100 940.967 302
1500 1500 1 1463 274156 920.233 304
2000 2000 1 1463 268180 936.733 309
2500 2500 1 1463 263443 926 307
3000 2985 1 1463 280809 932.233 332
3500 3500 2 1463 278412 934.333 384


connections connected average response (ms) average size messages msg/sec walltime(secs)
500 500 0.000 172.000 284165 937.833 305.000
1000 1000 0.000 172.000 279883 921.717 305.000
1500 1500 0.000 173.000 290317 959.983 305.000
2000 2000 0.000 173.000 286110 946.467 305.000
2500 2500 0.000 174.000 285122 949.550 305.000
3000 3000 0.000 174.000 281650 939.150 305.000
3500 3500 0.000 174.000 282833 942.700 305.000
4000 4000 0.000 174.000 283829 948.700 305.000
4500 4500 0.000 174.000 273293 921.583 305.000
5000 5000 0.000 174.000 264433 881.733 305.000
1000 1000 0.000 172.000 279883 921.717 305.000
1500 1500 0.000 173.000 290317 959.983 305.000
2000 2000 0.000 173.000 286110 946.467 305.000
2500 2500 0.000 174.000 285122 949.550 305.000
3000 3000 0.000 174.000 281650 939.150 305.000
3500 3500 0.000 174.000 282833 942.700 305.000
4000 4000 0.000 174.000 283829 948.700 305.000
4500 4500 0.000 174.000 273293 921.583 305.000
5000 5000 0.000 174.000 264433 881.733 305.000
5500 5500 0.000 174.000 289230 977.300 305.000
6000 6000 0.000 174.000 277692 934.800 305.000
6500 6500 0.000 174.000 148223 847.517 185.000

connections average response (ms) average size messages msg/sec walltime(secs)
1000 1 1462 1487141 4056.433 365
2000 1 1464 1607431 5003.033 365
3000 1 1464 1663777 5061.717 365
4000 1 1464 1607104 5109.65 365
5000 1 1464 1706898 5374.433 365
6000 2 1465 1750757 5341.05 365
7000 3 1465 1723124 5263.283 365
8000 3 1465 1758174 5224.85 365
9000 2 1465 1717950 5038.45 365
10000 3 1465 1789671 5160.517 365
11000 3 1465 1789760 5140.8 365
12000 3 1465 1792477 5097.5 365
13000 4 1465 1778748 5128.917 365
14000 4 1465 1746830 4895.433 365
15000 3 1465 1760734 4960.367 365
16000 3 1465 1739786 4933.583 365
17000 4 1465 1727759 4902.883 365
18000 3 1465 1730165 5158.7 365
19000 4 1465 1725660 4904.2 367
20000 4 1465 1704129 4852.567 365
21000 4 1465 1703201 4849.833 365
22000 3 1465 1705387 5077.417 366
23000 3 1465 1671165 4861.233 365
24000 4 1465 1670471 4942.217 365
25000 4 1465 1639296 5108.317 365


connections average response (ms) average size messages msg/sec walltime(secs)
1000 2 1363 1550155 4252.95 365
2000 1 1368 863997 2384.8 365
3000 1 1368 644806 1776.717 365
4000 1 1369 555496 1541.683 365
5000 1 1370 483624 1338 365
6000 1 1370 426728 1185.883 365
7000 1 1371 358331 994.017 365
8000 1 1371 322960 898.55 365
9000 1 1371 281227 787.633 365
10000 1 1371 260502 732.3 365
11000 1 1370 262209 733.633 365
12000 1 1371 243954 690.067 365
13000 2 1370 230117 647.833 365
14000 2 1371 235591 668.983 365
15000 2 1371 240632 685.35 365
16000 2 1371 192929 551.083 365
17000 2 1371 166705 474.217 365
18000 2 1371 142557 406.4 365
19000 2 1371 110873 319.467 365
20000 2 1371 106047 305.967 365
21000 2 1371 107947 307.633 365
22000 1 1371 89779 257.883 365
23000 2 1371 89890 259.367 365
24000 2 1371 34414 240.833 155


I tried to push limits for each framework, however Atmosphere could not handle more than 3500 connections and it slowed my machine to crawl. I was using 4.0-beta version of Netty, which also crashed after 6500 connections. was able to scale for upto 25,000 connections, but then server ran out of memory on my machine. Similarly, node.js kept going for about 24,000 but it became very slow and my load test client could not allocate more memory for connections. Both and Node.js proved to be leading contenders but was able to scale much better and maintain throughput better than Node.js. As a result, I picked for my project. You can download the source code and compare results yourself.

September 24, 2012

Scaling Node.js

As I described in my earlier blog, I have been testing Node.js and Websockets for streaming data to large number of clients. I am evaluating Node.js and WebSockets in both pub/sub model such as clients connecting to receive streaming quotes and request/reply model where clients are invoking an API and waiting for reply. Below are some of my experience with those technologies so far:

Pub/Sub Server

The server code uses cluster module to provide multiprocessing support and each process starts a Websocket server and listens for incoming connections. I used library for WebSockets. The server keeps track of all clients that have subscribed and then send random data every 10-100 milli-seconds.
Here is basic implementation of the server:

 var MAX_THREADS = process.env.MAX_THREADS || 10;
 var MAX_ROWS = process.env.MAX_ROWS || 2;
 var nextClientId = 0;
 var helper = require('helper')
 var pubsubController = require('pubsub_controller')(MAX_ROWS)
 var cluster = require('cluster');
 if (cluster.isMaster) {
    for (var i = 0; i < MAX_THREADS; i++) {
    cluster.on('exit', function(worker, code, signal) {
       console.log('worker ' + + ' died');
 } else {
    var socketsByAddr = {};
    var io = require('').listen(8124);
    io.set('log level', 1);
    io.set('transports', ['websocket']);
    io.set('force new connection', true);
    io.enable('browser client gzip');
    io.sockets.on('connection', function(socket) {
       socket.address = socket.handshake.address.address + ':' + socket.handshake.address.port;
       socket.key = socket.handshake.address.address + ':' + socket.handshake.address.port + ':' + (++nextClientId);
       socketsByAddr[socket.address] = true;
       socket.on('disconnect', function() {
          delete socketsByAddr[socket.address];
       socket.on('end', function() {
    setInterval(function() {
    }, helper.random(100) + 10);
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);

Pub/Sub Client

The pub/sub client subscribes for data and receies data every 10-100 milli-seconds. The client code builds up 100 more connections every 15 seconds and it logs the response time information. On client side, I used library. The client calls 'subscribe-execs' to subscribe and then receive messages under 'receive-execs'. Upon receiving message, the client tracks various metrics such as response time, CPU, memory usage, etc. As, I am running both client and server on the same machine, it gives accurate representation of latency without network hops or clocks mismatch. Another thing to note is that the client passes 'force new connection' flag to force new socket for each client as by default all clients share same socket.

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 10;
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 var client = require('pubsub_client')(metrics, hwUsage);
 var clients = [];
 // create connections for this client
 var buildClients = function() {
    for (var i=0; i<100; i++) {
       var client = client.newClient(i+0);
    hwUsage.stop(function(usage) {
       console.log('clients,connected,', hwUsage.heading() + ',' + metrics.heading());
       console.log(clients.length + ',' + client.totalConnected() + ',' + usage.toString() + ',' + metrics.summary().toString());
 setInterval(buildClients, 15000);
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);

The actual client is defined as a module, e.g.:

 var io = require('');
 var helper = require('helper');
 var totalConnected = 0;
 module.exports = function(metrics, hwUsage) {
    var ExecClient = function(id) { = id;
       this.client = io.connect('localhost', { port: 8124,
                               transports: ['websocket'],
                               'force new connection': true,
                               'sync on disconnect' : true,
                               'connect timeout': 500,
                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
    ExecClient.prototype.setupConnectListener = function() {
       if (typeof this.client === "undefined") return;
       var that = this;
       this.client.on('connect_failed', function(error) {
       this.client.on('connect', function() {
       this.client.on('disconnect', function() {
       this.client.on('reconnect_failed', function() {
       this.client.on('end', function() {
    ExecClient.prototype.setupReceiveListener = function() {
       if (typeof this.client === "undefined") return;
       var that = this;
       this.client.on('receive-execs', function(msg) {
          var metric = metrics.update(msg.address, msg.request, msg.timestamp, JSON.stringify(msg).length);
    return {
       totalConnected : function() {
          return totalConnected;
       newClient: function(id) {
          var client = new ExecClient(id);
          return client;

Subscription management

Following module defines APIs for managing subscriptions and as I mentioned when number of messages received by client reaches maxMessages, it unsubscribes from the server.

 var helper = require('helper');
 var emitVolatile = typeof process.env.EMIT_VOLATILE !== "undefined" && process.env.EMIT_VOLATILE === 'true';
 module.exports = function(numExecs) {
    var execSubscribers = {};
    var counter = 0; 
    var generatePayload = function(address) {
       var execs = [];
       var date = new Date();
       for (var i=0; i<numExecs; i++) {
          execs.push({sequence:i, activityDateStr:date.toString(), com: helper.random(5),price: helper.random(100), accountId:1772139, symbol:"QQQ", transaction:"STO", description:"QQQ Stock", qty: helper.random(200), key: "QQQ:::S", netAmount: helper.random(1000)});
       var payload = {request: counter, rows:numExecs, timestamp : date.getTime(), address:address, execs: execs};
       return payload;
    return {
       subscribe: function(socket) {
          execSubscribers[socket.key] = socket;
       unsubscribe: function(socket) {
          delete execSubscribers[socket.key];
       count: function() {
          return helper.size(execSubscribers);
       pushUpdates: function() {
          var count = 0;
          for (var addr in execSubscribers) {
             try {
                var socket = execSubscribers[addr];
                if (emitVolatile) {
                   socket.volatile.emit('receive-execs', generatePayload(addr));
                } else {
                   socket.emit('receive-execs', generatePayload(addr));
                if (typeof socket.numMessages === "undefined") {
                   socket.numMessages = 1;
                } else {
                   //delete execSubscribers[socket.key];
             } catch (err) {
                console.log('failed to send execution report' + err);
                try {
                } catch (ex) {
                   console.log('failed to close socket ' + ex);
                delete execSubscribers[socket.key];
          return count;

Hardware Measurement

Following module defines API for measuring CPU time, load average, memory, etc:

 var fs = require('fs');
 var os = require('os');
 var helper = require('helper');
 var HardwareUsage = function() {
    this.startCpuTime = 0;
    this.startLoadAvg = os.loadavg();
    this.startMemory = process.memoryUsage();
    this.endCpuTime = 0;
    this.endLoadAvg = 0;
    this.endMemory = 0;
    this.started = new Date().getTime();
    var that = this;
    this.cpuUsage (function(totalCpu) {
       that.startCpuTime = totalCpu;
 HardwareUsage.prototype.cpuUsage = function(callback) {
    if (!fs.existsSync("/proc/" + + "/stat")) {
    var that = this;
    fs.readFile("/proc/" + + "/stat", function(err, data){
       var elems = data.toString().split(' ');
       var utime = parseInt(elems[13]);
       var stime = parseInt(elems[14]);
       var totalCpu = utime + stime;
 HardwareUsage.prototype.percCpuChange = function() {
    return helper.percentChange(this.startCpuTime, this.endCpuTime);
 HardwareUsage.prototype.percLoadChange = function() {
    return helper.percentChange(this.startLoadAvg[0], this.endLoadAvg[0]);
 HardwareUsage.prototype.percMemoryChange = function() {
    return helper.percentChange(this.startMemory.heapTotal, this.endMemory.heapUsed);
 HardwareUsage.prototype.toString = function() {
    return helper.round(this.endCpuTime) + ',' + helper.round(this.endLoadAvg[0]) + ',' + helper.round(this.startMemory.heapTotal/1024.0/1024.0) + ',' + helper.round(this.endMemory.heapUsed/1024.0/1024.0);
 HardwareUsage.prototype.stop = function(callback) {
    var that = this;
    this.cpuUsage (function(totalCpu) {
       that.endCpuTime = totalCpu;
       that.endMemory = process.memoryUsage();
       that.endLoadAvg = os.loadavg();
       that.elapsed = new Date().getTime() - that.started;
 var usage = new HardwareUsage();
 exports.heading = function() {
    return 'cpu time, load avg, total memory (M), used memory(M)';
 exports.start = function() {
    usage = new HardwareUsage();
 exports.stop = function(callback) {

Response time Measurement

Following module defines API for measuring response time:

 var helper = require('helper');
 module.exports = function(store, outlierSize) {
    var MetricResultSummary = function(metric) {
       this.averageResponseTime = helper.round(metric.totalTime / metric.totalRequests);
       this.averageHighResponseTime = helper.round(helper.sum(metric.topTenHigh)/ metric.topTenHigh.length);
       this.averageLowResponseTime = helper.round(helper.sum(metric.topTenLow)/ metric.topTenLow.length);
       this.averageSize = helper.round(metric.totalPayloadSize / metric.totalRequests);
       this.messagesReceived = metric.messagesReceived;
       this.count = 0;
    MetricResultSummary.prototype.toString = function() {
       return this.count + ',' + this.averageResponseTime + ',' + this.averageHighResponseTime + ',' + this.averageLowResponseTime + ',' + this.averageSize + ',' + this.messagesReceived;
    var Metric = function() {
       this.totalTime = 0;
       this.totalRequests = 0;
       this.topTenHigh = [];
       this.topTenLow = [];
       this.totalPayloadSize = 0;
       this.messagesReceived = 0;
    Metric.prototype.addTo = function(target) {
       target.totalTime += this.totalTime;
       target.totalRequests += this.totalRequests;
       for (var i=0; i<this.topTenHigh.length; i++) {
       for (var i=0; i<this.topTenLow.length; i++) {
       target.totalPayloadSize += this.totalPayloadSize;
       target.messagesReceived += this.messagesReceived;
    Metric.prototype.update = function(id, sendTime, payloadSize) {
       var elapsed = new Date().getTime() - sendTime;
       for (var i=0; i<outlierSize; i++) {
          if (typeof this.topTenHigh[i] === "undefined" || elapsed > this.topTenHigh[i]) {
             this.topTenHigh[i] = elapsed;
       for (var i=0; i<outlierSize; i++) {
          if (typeof this.topTenLow[i] === "undefined" || elapsed < this.topTenLow[i]) {
             this.topTenLow[i] = elapsed;
       this.totalTime += elapsed;
       this.totalPayloadSize += payloadSize;
    Metric.prototype.averageResponse = function() {
       return helper.round(this.totalTime / this.totalRequests);
    Metric.prototype.summary = function() {
       return new MetricResultSummary(this);
    Metric.prototype.toString = function() {
       return new MetricResultSummary(this).toString();
    var get = function(key) {
          var metric = store[key];
          if (typeof metric === "undefined") {
             metric = new Metric();
             store[key] = metric;
          return metric;
    return {
       get: function(key) {
          return get(key);
       update: function(key, id, sendTime, payloadSize) {
          var metric = get(key);
          metric.update(id, sendTime, payloadSize);
          return metric;
       summaryFor: function(key) {
          var metric = get(key);
          return metric.summary();
       heading: function() {
          return 'count, average response (ms), average outlier high response (ms), average outlier low response (ms), average message size, messages received';
       summary: function() {
          var result = new Metric();
          var count = 0;
          for (var key in store) {
             var metric = store[key];
          var summary = result.summary();
          summary.count = count;
          return summary;

Request/Reply Server

The request/reply server also uses cluster module and listens for Websocket port on each process. Upon receiving order-request API, it sends a reply to the client. Here is the implementation of server:

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 2;
 var MAX_THREADS = process.env.MAX_THREADS || 20;
 var helper = require('helper')
 var cluster = require('cluster');
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 var socketsByAddr = {};
 var totalClients = 0;
 if (cluster.isMaster) {
    for (var i = 0; i < MAX_THREADS; i++) {
    cluster.on('exit', function(worker, code, signal) {
       console.log('worker ' + + ' died');
 } else {
    var io = require('').listen(8124);
    io.set('log level', 1); 
    io.set('transports', ['websocket']);
    io.set('force new connection', true);
    io.enable('browser client gzip');
    io.sockets.on('connection', function(socket) {
       socket.address = socket.handshake.address.address + ':' + socket.handshake.address.port;
       socketsByAddr[socket.address] = true;
       socket.on('order-request', function(request) {
          var response = {request: request.counter, timestamp : request.timestamp, address:request.address};
          metrics.update(request.address, request.request, request.timestamp, JSON.stringify(request).length);
          socket.emit('order-response', response);
       // disconnected
       socket.on('disconnect', function() {
          delete socketsByAddr[socket.address];
       socket.on('end', function() {
 setInterval(function() {
    hwUsage.stop(function(usage) {
      console.log('sockets, clients, ' + hwUsage.heading() + ',' + metrics.heading());
      console.log(helper.size(socketsByAddr) + ',' + totalClients + ',' + usage.toString() + ',' + metrics.summary().toString());
 }, 15000);
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);

Request/Reply Client

The request/reply client is similar to pub/sub model except it initiates order-request API every 50-250 milli-seconds. It also creates new connections every 15 seconds and logs response time continuously.

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 10;
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 var order_client = require('order_client')(metrics, hwUsage);
 // create connections for this client
 var totalConnections = 0;
 var buildClients = function() {
    for (var i=0; i<100; i++) {
       var client = order_client.newClient();
 setInterval(buildClients, INTERVAL_BETWEEN_NEW_CLIENTS_SECS);
 setInterval(function() {
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
   console.trace('stack trace');

The actual client is defined as module, e.g.

 var io = require('');
 var helper = require('helper');
 var MAX_ROWS = process.env.MAX_ROWS || 10;
 var nextClientId = 0;
 var nextRequestId = 0;
 var totalConnected = 0;
 module.exports = function(metrics, hwUsage) {
    var OrderClient = function() { = ++nextClientId;
       this.connected = false;
       this.client = io.connect('localhost', { port: 8124,
                               transports: ['websocket'],
                               'force new connection': true,
                               'sync on disconnect' : true,
                               'connect timeout': 500,
                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
    OrderClient.prototype.setupConnectListener = function() {
       var that = this;
       this.client.on('connect_failed', function(error) {
          this.connected = false;
       this.client.on('connect', function() {
          this.connected = true;
       this.client.on('disconnect', function() {
          this.connected = false;
       this.client.on('reconnect_failed', function() {
          this.connected = false;
       this.client.on('end', function() {
          this.connected = false; 
    OrderClient.prototype.sendRequest = function(max) {
       var execs = [];
       var date = new Date();
       for (var i=0; i<max; i++) {
          execs.push({sequence:i, activityDateStr:date.toString(), com: helper.random(5),price: helper.random(100), accountId:1772139, symbol:"QQQ", transaction:"STO", description:"QQQ Stock", qty: helper.random(200), key: "QQQ:::S", netAmount: helper.random(1000)});
       var request = {request: ++nextRequestId, rows:max, timestamp : date.getTime(),, execs: execs};
       this.client.emit('order-request', request);
    OrderClient.prototype.setupResponseListener = function() {
       var that = this;
       this.client.on('order-response', function(response) {
          var metric = metrics.update(response.address, response.request, response.timestamp, JSON.stringify(response).length);
    return {
       log: function() {
          hwUsage.stop(function(usage) {
             console.log('clients,connected,' + hwUsage.heading() + ',' + metrics.heading());
             console.log(nextClientId + ',' + totalConnected + ',' + usage.toString() + ',' + metrics.summary().toString());
       newClient: function() {
          var client = new OrderClient();
          setInterval(function() {
          }, helper.random(250) + 50);
          return client;

First blocker

I started testing on my Linux machine and saw RangeError: Maximum call stack size exceeded after server reached about 2000 connections. It turned out JSON library on went into recursion when it received messages while it's parsing existing message. I found a patch at Fix infinite recursion in Websocket parsers and manually applied it as I didn't see it on the latest version library 0.8.9. After applying it, I was able to make some progress.

Second blocker

After reaching about 10,000 connections I started seeing Cannot call method 'packet' of null coming from the socket.js. At the time, I was using default configuration for Websockets and apparently it was falling back to xhr-polling under heavy load (See open bug for it). I changed configuration to explicitly defined websocket protocol without any fallback, e.g.

 io.set('transports', ['websocket']);

I also made some changes to network settings on my Linux machine as recommended by Node.js w/1M concurrent connections!.

 net.core.rmem_max = 33554432
 net.core.wmem_max = 33554432
 net.ipv4.tcp_rmem = 4096 16384 33554432
 net.ipv4.tcp_wmem = 4096 16384 33554432
 net.ipv4.tcp_mem = 786432 1048576 26777216
 net.ipv4.tcp_max_tw_buckets = 360000
 net.core.netdev_max_backlog = 2500
 vm.min_free_kbytes = 65536
 vm.swappiness = 0
 net.ipv4.ip_local_port_range = 1024 65535

I then applied above changes using:

 sudo sysctl -w net.core.somaxconn=10000
 sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000 
 sudo sysctl -p

Third blocker

Above changes got me to about 17,000 connections but then I started seeing connection timeout on the client side and warn: client not handshaken client should reconnect on the server side. are two open bugs: first and second for it. I added reconnect parameters to client connection, e.g.

                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});

Fourth blocker

Under heavy load, I also saw "Error: not opened" coming from WebSocket library, e.g.

     at WebSocket.send (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Transport.WS.send (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Transport.packet (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Transport.WS.payload (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Socket.flushBuffer (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Socket.setBuffer (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Socket.onConnect (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Transport.onConnect (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Transport.onPacket (/alldata/home/sbhatti/prototype-nodejs/node_modules/
     at Transport.onData (/alldata/home/sbhatti/prototype-nodejs/node_modules/

I chose to ignore it as Websocket is attempting to send data when connection is closed and client will just reconnect.


I am still evaluating Node.js and I have not decided if I would recommend Node.js for streaming solution. I will be comparing these results against some Java solutions such as Atomosphere and and will post those results in future.

September 6, 2012

Building a streaming quote server using Node.js and Websockets

A couple of years ago, I implemented a quote server using XMPP, Bosh, Strophe and Ejabbberd at work and described an overview of the design and code in my blog. We have been looking at Node.js and Websockets recently for streaming data so I will describe a simple implementation of quote server here using those two technologies:

Quote Server

The quote server uses Web sockets and listens for subscribe-stock-quote message to subscribe for a particular quote and unsubscribe-stock-quote message to unsubscribe.
Here is a simple implementation of the Quote server:

 var express = require('express')
    , sio = require('')
    , http = require('http')
    , quoteSubscriptions = require('quote_subscriptions')
    , app = express();
 var server = http.createServer(app);
 var sequence = 0;
 app.configure(function () {
    app.use(express.static(__dirname + '/public'));
 var io = sio.listen(server);
 io.set('log level', 1);
 io.sockets.on('connection', function(socket) {
    // subscribe to stock quotes 
    socket.on('subscribe-stock-quote', function(symbol) {
       console.log('subscribe-stock-quote received');
       quoteSubscriptions.subscribe(socket, symbol, function() {
          socket.emit('subscribed-stock-quote', 'You have been subscribed to ' + symbol);
    // unsubscribe to stock quotes 
    socket.on('unsubscribe-stock-quote', function (symbol) {
       console.log('unsubscribe-stock-quote received');
       quoteSubscriptions.unsubscribe(socket, symbol, function() {
          socket.emit('unsubscribed-stock-quote received', 'You have been unsubscribed to ' + symbol);
    // disconnected
    socket.on('disconnect', function() {
       quoteSubscriptions.unsubscribeAll(socket, function() {
          socket.emit('disconnected', socket.symbol + ' has been unsubscribed from stock quotes.');
 // update clients every 500 milli-seconds with latest stock quotes
 setInterval(function () {
    io.sockets.emit('sequence', ++sequence);
 }, 500);

In addition to sending quote updates, the server also sends a loop counter so that we know how many times quotes have been pushed to clients and that message is broadcasted to all clients.

The subscriptions are managed in another module quote_subscriptions, i.e.,:

 // This module maintains quote subscriptions for all clients 
 var yclient = require('y_client');
 var stocksSubscribers = {};
 var quoteCache = {};
 exports.subscribe = function(socket, symbol, callback) {
    if (typeof socket.stockSymbols === "undefined") {
       socket.stockSymbols = [];
    if (socket.stockSymbols.indexOf(symbol) == -1) {
    var subscribers = stocksSubscribers[symbol];
    if (typeof subscribers === "undefined") {
       subscribers = [];
       stocksSubscribers[symbol] = subscribers;
    if (subscribers.indexOf(socket) == -1) {
 exports.unsubscribeAll = function(socket, callback) {
    if (typeof socket.stockSymbols === "undefined") {
    for (var i=0; i<socket.stockSymbols.length; i++) {
       this.unsubscribe(socket, socket.stockSymbols[i], function() {});
 exports.unsubscribe = function(socket, symbol, callback) {
    if (typeof socket.stockSymbols === "undefined") {
    var ndx = socket.stockSymbols.indexOf(symbol);
    if (ndx !== -1) {
       socket.stockSymbols.splice(ndx, 1);
    var subscribers = stocksSubscribers[symbol];
    if (typeof subscribers === "undefined") {
    ndx = subscribers.indexOf(socket);
    if (ndx !== -1) {
       subscribers.splice(ndx, 1);
 exports.pushUpdates = function() {
    for (var symbol in stocksSubscribers) {
       var sockets = stocksSubscribers[symbol];
       if (sockets.length == 0) {
       quote = quoteCache[symbol];
       if (typeof quote === "undefined") {
          yclient.quote(symbol, function(quote) {
             quoteCache[symbol] = quote;
             sendQuote(quote, sockets);
       } else {
          sendQuote(quote, sockets);
 function sendQuote(quote, sockets) {
    var rnd = Math.random();
    quote.counter = quote.counter + 1; = round(rnd % 2 == 0 ? quote.original_bid + rnd :  quote.original_bid - rnd, 2);
    quote.ask = round(rnd % 2 == 0 ? quote.original_ask + rnd :  quote.original_ask - rnd, 2);
    for (var i=0; i<sockets.length; i++) {
       var socket = sockets[i];
       var address = socket.handshake.address;
       console.log('Quote sending ' + quote.symbol + ' ' + quote.counter + ' to ' + address.address + ":" + address.port);
       socket.volatile.emit('receive-stock-quote', quote);
 function round(num, dec) {
    return Math.round(num*Math.pow(10,dec))/Math.pow(10,dec);

The above module defines methods to subscribe, unsubscribe and push quote updates to all clients. Note that above module requests quotes from Yahoo if it doesn’t exist in the cache and then randomly changes bid/ask values.

Yahoo Finance Client

I used Yahoo finance to retrieve the quotes, however these quotes are cached and modified by the quote subscription module:

 // This module provides quote/chain apis using yahoo finance
 var http = require('http')
   , libxmljs = require('libxmljs')
   , util = require('util')
 exports.quote = function(symbol, callback) {
    api('', '/d/quotes.csv?s=' + symbol + '&f=nb2b3ra2yj3e7gopjkl1xm3m4h', function(text) {
       cols = text.split(',');
       callback({'symbol':cols[0], 'ask':cols[1], 'bid':cols[2], 'counter':0, 'original_ask':cols[1], 'original_bid':cols[2]});    
 function api(host, path, callback) {
    var options = {
        host: host,
        port: 80,
        path: path, 
        method: 'GET',
        headers: {'User-Agent': 'Node'}
    var req = http.request(options, function(res) {
      var text = '';
      res.on('data', function (chunk) {
        text += chunk;
      res.on('end', function (e) {
         text = text.replace(/"/g, '');
      res.on('error', function (e) {
        console.log('Could not send api with response : ' + e.message);
    req.on('error', function(e) {
      console.log('Could not send api with response : ' + e.message);
    // write data to request body

Web Client

Here is an example of Web client that allows users to subscribe to different stock symbols and receive streaming quotes every 500 milliseconds.

 <!doctype html> 
 <html lang="en">
       <meta charset="utf-8"> 
       <title>OptionsHouse Streaming</title>
       <script src="/"></script>
          var socket = io.connect('ws://localhost:8080'); 
          var lastSymbol;
          var lastQuote;
          socket.on('connect', function() { 
             lastSymbol = 'AAPL';
             socket.emit('subscribe-stock-quote', lastSymbol);
          });         socket.on('disconnect', function() {
             alert('remote server died');
          socket.on('sequence',function(seq) { 
             var seqDiv = document.getElementById('sequence');
             seqDiv.innerHTML = seq;
          socket.on('receive-stock-quote',function(quote) { 
             var company = document.getElementById('company');
             var counter = document.getElementById('counter');
             var bid = document.getElementById('bid');
             var ask = document.getElementById('ask');
             var bidColor = '<font color="black">'
             var askColor = '<font color="black">'
             if (typeof lastQuote !== "undefined") {
                if ( > {
                   bidColor = '<font color="green">'
                } else if ( < {
                   bidColor = '<font color="red">'
                if (quote.ask > lastQuote.ask) {
                   askColor = '<font color="green">'
                } else if (quote.ask < lastQuote.ask) {
                   askColor = '<font color="red">'
             company.innerHTML = quote.symbol;
             counter.innerHTML = quote.counter;
             bid.innerHTML = bidColor + + '</font>';
             ask.innerHTML = askColor + quote.ask + '</font>';
             lastQuote = quote;
          window.addEventListener('load',function() { 
             function() { 
                if (typeof lastSymbol !== "undefined") {
                   socket.emit('unsubscribe-stock-quote', lastSymbol);
                var symbol = document.getElementById('symbol').value; 
                socket.emit('subscribe-stock-quote', symbol);
                lastSymbol = symbol;
             }, false); 
             function() { 
                var symbol = document.getElementById('symbol').value; 
                socket.emit('unsubscribe-stock-quote', symbol);
             }, false); 
          }, false);
       <div id="form">
          Symbol: <input type="text" id="symbol" size="10" value="AAPL" />
          <input type="button" id="start" value="Subscribe" />
          <input type="button" id="stop" value="Unsubscribe" />
       <table width="500">
             <th>Stock Counter</th>
             <td align="center"> <div id="company"></div> </td>
             <td align="center"> <div id="bid"></div> </td>
             <td align="center"> <div id="ask"></div> </td>
             <td align="center"> <div id="counter"></div> </td>
       Quote Server Loop: <span id="sequence"/>

Running the example

You can download all the code from My github account, e.g.

 git clone  
 cd node-quote-server 
 npm install
 node app

Then you can point your browser to http://localhost:8080 and start playing:


Node.js comes with good support for Websockets so it takes only few lines to build the quote server. I am still testing Node.js and Websockets with different browsers and simulating load with large number of clients and will post those results in future.

