Shahzad Bhatti Welcome to my ramblings and rants!

October 8, 2012

Comparing Server side Websockets using Atmosphere, Netty, Node.js and Vertx.io

Filed under: Javascript — admin @ 1:31 pm

My last two bogs (Pub/sub in Node.js and Websockets and Scaling Node.js) described how I have been evaluating Node.js and Websockets for streaming data. I would describe results of that evaluation along with a few other frameworks that I have been testing.

Use Case

The primary use case for my application is pub/sub where clients subscribe to some channel and receive updates such as subscribing for stock quotes or notifications about orders.

Node.js

In my last blog, I described all the issues I had with socket.io library for Node.js and after a lot of frustration I gave up on it. Instead, I used WS library for WebSockets and though it doesn’t provide fallback options for other protocols but’s it much more stable and efficient.

Pub/Sub Server

Here is the pub/sub server that provides methods to subscribe some channel identifier and sends update to a random channel continuously.

var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 5;
 var MAX_THREADS = process.env.MAX_THREADS || 5;
 var MAX_ROWS = process.env.MAX_ROWS || 10;
 var SEND_MESSAGE_INTERVAL = SEND_MESSAGE_INTERVAL || 1;
 
 var nextClientId = 0;
 var errors = 0;
 //
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')(OUTLIER_SIZE);
 var cluster = require('cluster');
 hwUsage.start();
 
 if (cluster.isMaster) {
     for (var i = 0; i < MAX_THREADS; i++) {
         cluster.fork();
     }
     cluster.on('exit', function(worker, code, signal) {
     });
     console.log('clients, errors, ' + metrics.heading() + ',' + hwUsage.heading());
 } else {
     var pubsubController = require('pubsub_controller')(cluster.worker.id, MAX_ROWS, hwUsage, metrics);
     var WebSocketServer = require('ws').Server
     , wss = new WebSocketServer({port: 8124});
     wss.on('connection', function(ws) {
         ws.key = ++nextClientId;
         ws.on('close', function() {
             pubsubController.unsubscribeAll(ws);
         });
         ws.on('error', function() {
             errors++;
         });
         ws.on('message', function(text) {
             var msg = JSON.parse(text);
             if (msg.action === 'subscribe') {
                 pubsubController.subscribe(ws, msg.identifier);
             } else if (msg.action === 'unsubscribe') {
                 pubsubController.subscribe(ws, msg.identifier);
             }
         });
     });
 
     setInterval(function() {
         pubsubController.push();
     }, SEND_MESSAGE_INTERVAL);
     setInterval(function() {
         hwUsage.stop(function(usage) {
             console.log(nextClientId+ ',' + errors + metrics.summary().toString() + ',' + usage.toString());
         });
     }, 15000);
 }
 
 
 process.on('uncaughtException', function(err) {
     console.log("GLOBAL " + err + "\n" + err.stack);
 });
 

Subscription management

Following module defines APIs for managing subscriptions and as I mentioned when number of messages received by client reaches maxMessages, it unsubscribes from the server.

var helper = require('helper');
 
 module.exports = function(workerId, numElements, hwUsage, metrics) {
     var channelSubscribers = {};
     var nextRequestId = 0;
     var allKeys = {};
     allKeys[workerId] = [];
     var generatePayload = function(identifier) {
         var elements = [];
         var date = new Date();
         for (var i=0; i<numElements; i++) {            elements.push({sequence:i, price: helper.random(100), identifierId:identifier, symbol:"QQQ", transaction:"STO", qty: helper.random(200)});
         }
         var reqId = Number(++nextRequestId + '.' + workerId);
 
         var payload = {request: reqId, rows:numElements, timestamp : date.getTime(), identifier:Number(identifier), elements: elements};
         return payload;
     };
     var makeKey = function(socket) {
         return workerId + '_' + socket.key;
     };
     var getSubscriber = function(socket) {
         return channelSubscribers[makeKey(socket)];
     };
     var setSubscriber = function(socket, rec) {
         channelSubscribers[makeKey(socket)] = rec;
     };  
     var removeSubscribers = function(socket) {
         delete channelSubscribers[makeKey(socket)];
     };  
     return {
         subscribe: function(socket, identifier) {
             var rec = getSubscriber(socket);
             if (typeof rec === "undefined") {
                 rec = {socket : socket, identifiers : []};
             }
             rec.identifiers.push(identifier);
             allKeys[workerId].push(makeKey(socket)); 
             setSubscriber(socket, rec);
         },  
         unsubscribe: function(socket, identifier) {
             var rec = getSubscriber(socket);
             if (typeof rec === "undefined") {
                 return false;
             }
             var ndx = rec.identifiers.indexOf(identifier);
             rec.identifiers.splice(ndx, 1);
             if (rec.identifiers.length == 0) {
                 removeSubscribers(socket);
             } else {
                 setSubscriber(socket, rec);
             }
         },
         unsubscribeAll: function(socket) {
             removeSubscribers(socket);
         },
         count: function() {
             return helper.size(channelSubscribers);
         },
         push: function() {
             if (allKeys[workerId].length == 0) return 0;
             var key = allKeys[workerId][helper.random(allKeys[workerId].length)];
             var rec = channelSubscribers[key];
             if (typeof rec === "undefined") {
                 return -1;
             }
             //
             for (var i=0; i<rec.identifiers.length; i++) {
                 var identifier = rec.identifiers[i];
                 try {
                     var request = generatePayload(identifier);
                     var text = JSON.stringify(request);
                     rec.socket.send(text);
                     metrics.update(nextRequestId, new Date().getTime(), text.length);
                 } catch (err) {
                     console.log('failed to send execution report' + err);
                     try {
                         rec.socket.disconnect();
                     } catch (ex) {
                         console.log('failed to close socket ' + ex);
                     }
                     delete channelSubscribers[key];
                 }
             }
         }
     };
 }
 

I am skipping some of helper modules, but you can find them from the source code.

Vertx.io

The Vertx.io framework follows Node.js design for single-thread event loop with asynchronous I/O but is written in Java. It also provides polyglot support for a number of languages such as Java, Javascript, Ruby, Groovy, and Python. Here is similar implementation of Pub/Sub server in Java:

 package com.plexobject.vertx;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 
 import org.vertx.java.core.Handler;
 import org.vertx.java.core.buffer.Buffer;
 import org.vertx.java.core.http.HttpServerRequest;
 import org.vertx.java.core.http.ServerWebSocket;
 import org.vertx.java.deploy.Verticle;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class VertxServer extends Verticle {
     private static final Logger logger = LoggerFactory.getLogger(VertxServer.class);
     private final ObjectMapper mapper = new ObjectMapper();
     private final JsonFactory jsonFactory = mapper.getJsonFactory();
     private Map<String, Set<ServerWebSocket>> subscriptions = new ConcurrentHashMap<String, Set<ServerWebSocket>>();
     private Timer sendTimer = new Timer(true);
     private Timer metricsTimer = new Timer(true);
     private final AtomicLong nextRequestId = new AtomicLong();
     private final Metrics metrics = new Metrics();
     private final StringBuilder elements = new StringBuilder();
     private final Random random = new Random();
     private int maxIdentifier;
 
     public VertxServer() {
         logger.info("identifiers," + Metrics.getHeading());
         for (int i=0; i<500; i++) {
             elements.append(String.valueOf(i));
         }
         metricsTimer.schedule(new TimerTask() {
             @Override
             public void run() {
                 logger.info(subscriptions.size() + metrics.getSummary().toString());
             }
         }, 5000, 15000);
         sendTimer.schedule(new TimerTask() {
             @Override
             public void run() {
                while (true) {
                   sendMessageForRandomIdentifier();
                }
             }
         }, 1000);
     }
     public void start() {
         vertx.createHttpServer().setAcceptBacklog(10000).websocketHandler(new Handler<ServerWebSocket>() {
             public void handle(final ServerWebSocket ws) {
                 if (ws.path.equals("/channels")) {
                     ws.dataHandler(new Handler<Buffer>() {
                         public void handle(Buffer data) {
                             try {
                                 JsonParser jp = jsonFactory.createJsonParser(data.toString());
                                 JsonNode jsonObj = mapper.readTree(jp);
                                 String action = jsonObj.get("action").asText();
                                 String identifier = jsonObj.get("identifier").asText();
                                 if (action == null || identifier == null) {
                                     return;
                                 }
                                 if ("subscribe".equals(action)) {
                                     maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier);
                                     logger.info("Max " + maxIdentifier);
                                     Set<ServerWebSocket> sockets = subscriptions.get(identifier);
                                     if (sockets == null) {
                                         sockets = new HashSet<ServerWebSocket>();
                                         subscriptions.put(identifier, sockets);
                                     }
                                     sockets.add(ws);
                                 } else if ("unsubscribe".equals(action)) {
                                     Set<ServerWebSocket> sockets = subscriptions.get(identifier);
                                     if (sockets == null) {
                                         return;
                                     }
                                     sockets.remove(ws);
                                     if (sockets.size() == 0) {
                                         subscriptions.remove(identifier);
                                     }
                                 }
                             } catch (IOException e) {
                                 logger.error("Failed to handle " + data, e);
                             }
                         }
                     });
                 } else {
                     ws.reject();
                 }
             }
         }).requestHandler(new Handler<HttpServerRequest>() {
             public void handle(HttpServerRequest req) {
                 //if (req.path.equals("/")) req.response.sendFile("websockets/ws.html"); // Serve the html
             }
         }).listen(8080);
     }
 
     //
     private String getRandomIdentifier() {
         return String.valueOf(Math.abs(random.nextInt(maxIdentifier)) + 1);
     }
 
     public void sendMessageForRandomIdentifier() {
         if (subscriptions.size() == 0) {
             return;
         }
         String identifier = getRandomIdentifier();
         if (identifier == null) {
             return;
         }
         Set<ServerWebSocket> sockets = subscriptions.get(identifier);
         if (sockets == null || sockets.size() == 0) {
             return;
         }
         final long t = System.currentTimeMillis();
         String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}";
         for (ServerWebSocket ws : sockets) {
             try {
                 ws.writeTextFrame(msg);
             } catch (Exception e) {
                 subscriptions.remove(ws);
                 logger.error("Failed to send " + msg, e);
             }
         }
         metrics.update(msg);
     }
 }
 

Note that Vertx.io requires JDK 7 and here is how you can start the server:

 export PATH=/home/sbhatti/jdk1.7.0_10/bin:$PATH
 export JAVA_HOME=/home/sbhatti/jdk1.7.0_10
 mvn package
 bin/vertx run com.plexobject.vertx.VrtxServer -cp target/classes/ -instances=5

Note that I started Vertx.io with 5 instances of the server to match Node.js’ clustering options.

Netty

Netty.io is a lightweight application server that provides asynchronous events and I/O so I also tested its Websocket’s support in its latest 4.0-beta version.

Bootstrap

Here is the main server implementation that bootstraps web-socket server:

 package com.plexobject.netty.server;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.socket.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 
 public class WebSocketServer {
 
         private final int port;
 
         public WebSocketServer(int port) {
                 this.port = port;
         }
 
         public void run() throws Exception {
                 ServerBootstrap b = new ServerBootstrap();
                 try {
                         b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                                         .channel(NioServerSocketChannel.class).localAddress(port)
                                         .childHandler(new WebSocketServerInitializer());
 
                         Channel ch = b.bind().sync().channel();
 
                         ch.closeFuture().sync();
                 } finally {
                         b.shutdown();
                 }
         }
 
         public static void main(String[] args) throws Exception {
                 int port;
                 if (args.length > 0) {
                         port = Integer.parseInt(args[0]);
                 } else {
                         port = 8124;
                 }
                 new WebSocketServer(port).run();
         }
 }
 

Pub/Sub Server

Here is implementation of websocket handler that provides pub/sub functionality:

 package com.plexobject.netty.server;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.math.BigInteger;
 import java.security.SecureRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
 import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
 import static io.netty.handler.codec.http.HttpMethod.GET;
 import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundMessageHandlerAdapter;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 import io.netty.logging.InternalLogger;
 import io.netty.logging.InternalLoggerFactory;
 import io.netty.util.CharsetUtil;
 
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
 
 public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
     @SuppressWarnings("unused")
     private static final InternalLogger LOGGER = InternalLoggerFactory
             .getInstance(WebSocketServerHandler.class);
     private static final ObjectMapper jsonMapper = new ObjectMapper();
     private static final JsonFactory jsonFactory = jsonMapper.getJsonFactory();
 
     private static final String WEBSOCKET_PATH = "/";
     private WebSocketServerHandshaker handshaker;
     private static Map<String, Set<ChannelHandlerContext>> subscriptions = new ConcurrentHashMap<String, Set<ChannelHandlerContext>>();
     private static Timer timer = new Timer(true);
     private static final AtomicLong REQUEST_ID = new AtomicLong();
     private static final Metrics METRICS = new Metrics();
 
     static {
         timer.schedule(new TimerTask() {
             @Override
             public void run() {
                while (true) {
                   sendMessageForRandomIdentifier();
                }
             }
         }, 1000);
     }
 
     @Override
     public void messageReceived(ChannelHandlerContext ctx, Object msg)
             throws Exception {
         if (msg instanceof HttpRequest) {
             handleHttpRequest(ctx, (HttpRequest) msg);
         } else if (msg instanceof WebSocketFrame) {
             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
         }
     }
 
     private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req)
             throws Exception {
         // Allow only GET methods.
         if (req.getMethod() != GET) {
             sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1,
                     FORBIDDEN));
             return;
         }
         if (req.getUri().equals("/favicon.ico")) {
             HttpResponse res = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
             sendHttpResponse(ctx, req, res);
             return;
         }
 
         // Handshake
         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                 getWebSocketLocation(req), null, false);
         handshaker = wsFactory.newHandshaker(req);
         if (handshaker == null) {
             wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
         } else {
             handshaker.handshake(ctx.channel(), req);
         }
     }
 
     private void handleWebSocketFrame(ChannelHandlerContext ctx,
             WebSocketFrame frame) throws IOException {
         // Check for closing frame
         if (frame instanceof CloseWebSocketFrame) {
             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
             return;
         } else if (frame instanceof PingWebSocketFrame) {
             ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
             return;
         } else if (!(frame instanceof TextWebSocketFrame)) {
             throw new UnsupportedOperationException(String.format(
                     "%s frame types not supported", frame.getClass().getName()));
         }
 
         String jsonText = ((TextWebSocketFrame) frame).getText();
         JsonParser jp = jsonFactory.createJsonParser(jsonText);
         JsonNode actualObj = jsonMapper.readTree(jp);
         String action = actualObj.get("action").getTextValue();
         String identifier = actualObj.get("identifier").getTextValue();
         if (action == null || identifier == null) {
               return;
         }
         if ("subscribe".equals(action)) {
             Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
             if (contexts == null) {
                 contexts = new HashSet<ChannelHandlerContext>();
                 subscriptions.put(identifier, contexts);
             }
             contexts.add(ctx);
         } else if ("unsubscribe".equals(action)) {
             Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
             if (contexts == null) {
                 return;
             }
             contexts.remove(ctx);
             if (contexts.size() == 0) {
                 subscriptions.remove(identifier);
             }
         }
     }
 
     private static String getRandomIdentifier() {
         List<String> identifiers = new ArrayList<String>(subscriptions.keySet());
         if (identifiers.size() == 0) {
             return null;
         }
         int n = new Random().nextInt(identifiers.size());
         return identifiers.get(n);
     }
 
 
     public static void sendMessageForRandomIdentifier() {
         String identifier = getRandomIdentifier();
         if (identifier == null) {
             return;
         }
         Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
         if (contexts == null || contexts.size() == 0) {
             return;
         }
         SecureRandom random = new SecureRandom();
         String elements = new BigInteger(500, random).toString(32);
         String json = "{\"request\":" + REQUEST_ID.incrementAndGet() + ", \"timestamp\":" + System.currentTimeMillis() + ", \"identifier\":\"" + identifier + "\", \"elements\": \"" + elements + "\"}";
         TextWebSocketFrame frame = new TextWebSocketFrame(json);
         METRICS.update(json);
         for (ChannelHandlerContext ctx : contexts) {
             try {
                 ctx.channel().write(frame);
             } catch (Exception e) {
                 subscriptions.remove(ctx);
             }
         }
         if (System.currentTimeMillis() % 1000 == 0) {
             System.out.println("identifiers," + Metrics.getHeading());
             System.out.println(subscriptions.size() + METRICS.getSummary().toString());
         }
     }
 
     private static void sendHttpResponse(ChannelHandlerContext ctx,
             HttpRequest req, HttpResponse res) {
         // Generate an error page if response status code is not OK (200).
         if (res.getStatus().getCode() != 200) {
             res.setContent(Unpooled.copiedBuffer(res.getStatus().toString(),
                     CharsetUtil.UTF_8));
             setContentLength(res, res.getContent().readableBytes());
         }
 
         // Send the response and close the connection if necessary.
         ChannelFuture f = ctx.channel().write(res);
         if (!isKeepAlive(req) || res.getStatus().getCode() != 200) {
             f.addListener(ChannelFutureListener.CLOSE);
         }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
             throws Exception {
         if (cause instanceof java.nio.channels.ClosedChannelException == false) {
             //cause.printStackTrace();
         }
         ctx.close();
     }
 
     private static String getWebSocketLocation(HttpRequest req) {
         return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
     }
 }
 

As, you can see its implementation is a bit verbose and I am skipping some of other boiler code.

Atmosphere

Atmosphere is another Java framework that provides support for Websockets so I tried it as well and here is its implementation:

 package com.plexobject;
 
 import org.atmosphere.config.service.WebSocketHandlerService;
 import org.atmosphere.cpr.Broadcaster;
 import org.atmosphere.cpr.BroadcasterFactory;
 import org.atmosphere.websocket.WebSocket;
 import org.atmosphere.websocket.WebSocketHandler;
 import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 
 import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.atmosphere.cpr.AtmosphereServlet;
 import org.atmosphere.websocket.WebSocketHandler;
 
 @WebSocketHandlerService
 public class ChannelResponseHandler extends WebSocketHandler {
     private static final Logger logger = LoggerFactory.getLogger(ChannelResponseHandler.class);
 
     private static final Metrics METRICS = new Metrics();
     private static final String PATH_PREFIX = "/channels";
     private boolean started;
     private int maxIdentifier;
     private final Random random = new Random();
     private Timer timer = new Timer(true);
     private final ObjectMapper mapper = new ObjectMapper();
     private final JsonFactory jsonFactory = mapper.getJsonFactory();
     private final AtomicLong opened = new AtomicLong();
     private final AtomicLong closed = new AtomicLong();
     private final AtomicLong nextRequestId = new AtomicLong();
     private final StringBuilder elements = new StringBuilder();
 
     public ChannelResponseHandler() {
         for (int i=0; i<500; i++) {
             elements.append(String.valueOf(i));
         }
     }
 
     @Override
     public void onTextMessage(WebSocket webSocket, String msg) {
         try {
             JsonParser jp = jsonFactory.createJsonParser(msg);
             JsonNode jsonObj = mapper.readTree(jp);
             String action = jsonObj.get("action").asText();
             String identifier = jsonObj.get("identifier").asText();
             logger.debug("onMessage " + action + " - " + identifier);
             if (action == null || identifier == null) {
                   return;
             }
             if ("subscribe".equals(action)) {
                 getBroadcaster(identifier).addAtmosphereResource(webSocket.resource());
                 maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier);
             } else if ("unsubscribe".equals(action)) {
                 getBroadcaster(identifier).removeAtmosphereResource(webSocket.resource());
             }
             start();
         } catch (Exception e) {
             logger.error("Failed to handle message " + msg, e);
         }
     }
 
     @Override
     public void onOpen(WebSocket webSocket) {
         logger.trace("onOpen {}", webSocket.resource().getRequest());
         webSocket.resource().suspend();
         opened.incrementAndGet();
     }
 
     @Override
     public void onClose(WebSocket webSocket) {
         logger.trace("onClose");
         getBroadcaster().removeAtmosphereResource(webSocket.resource());
         super.onClose(webSocket);
         closed.incrementAndGet();
     }
 
     @Override
     public void onError(WebSocket webSocket, WebSocketException t) {
         System.err.println("error " + t);
         logger.trace("onError {}", t.getStackTrace());
         super.onError(webSocket, t);
     }
 
     public void sendMessageForRandomIdentifier() {
         try {
             if (opened.get() <= closed.get()) {
                 return;
             }
             String identifier = getRandomIdentifier();
             Broadcaster bc = getBroadcaster(identifier);
             if (bc == null) {
                 logger.debug("Failed to find broadcaster for " + identifier);
                 return;
             }
             long t = System.currentTimeMillis();
             String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}";
             bc.broadcast(msg);
             METRICS.update(msg);
             if (t % 1000 == 0) {
                 logger.info(METRICS.getSummary().toString());
             }
         } catch (Exception e) {
             logger.error("Failed to send message", e);
         }
     }
     private Broadcaster getBroadcaster() {
         return getBroadcaster(null);
     }
     //
     private Broadcaster getBroadcaster(String identifier) {
         String path = identifier == null ? PATH_PREFIX : PATH_PREFIX + "/" + identifier;
         BroadcasterFactory factory = BroadcasterFactory.getDefault();
         return factory != null ? factory.lookup(path, true) : null;
     }
 
     synchronized void start() {
         if (started) return;
         started = true;
         timer.schedule(new TimerTask() {
                 @Override
                 public void run() {
                   while (started) {
                      sendMessageForRandomIdentifier();
                   }
                 }
             }, 1000);
     }
 
     synchronized void stop() {
         timer.cancel();
     }
 
     private String getRandomIdentifier() {
         return String.valueOf(random.nextInt(maxIdentifier) + 1);
     }
 
     public static void main(String[] arguments) throws Exception {
         try {
             Server server = new Server();
             Connector con = new SelectChannelConnector();
             con.setPort(8124);
             server.addConnector(con);
             ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
             context.setContextPath("/");
             server.setHandler(context);
             context.addServlet(new ServletHolder(new AtmosphereServlet()), "/*");
             server.start();//WebSocketHandler
             server.join();
         } catch (Exception ex) {
             System.err.println(ex);
         }
     }
 }
 

Note that I had to deploy Atmosphere in Jetty 8.1.7 as a war file and here is web.xml

 <?xml version="1.0" encoding="UTF-8"?>
 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
          id="WebApp_ID" version="2.5">
 
     <display-name>WebSocket Load Test</display-name>
     <servlet>
         <description>AtmosphereServlet</description>
         <servlet-name>AtmosphereServlet</servlet-name>
         <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
         <load-on-startup>1</load-on-startup>
     </servlet>
     <servlet-mapping>
         <servlet-name>AtmosphereServlet</servlet-name>
         <url-pattern>/channels/*</url-pattern>
     </servlet-mapping>
 </web-app>
 

Client

I previously wrote client in Javascript but I rewrote in Java so that I can use Java’s concurrent library to manage counters.

WebSocket Client

I tried Netty and weberknecht libraries and settled on weberknecht. Here is its implementation:

 package com.plexobject.websocket.client;
 
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Map;
 import de.roderick.weberknecht.*;
 
 public class WebSocketClient implements ClientProtocol {
     private WebSocket webSocket;
         private final String identifier;
 
         public WebSocketClient(final String identifier, final URI server, final ClientEventListener listener) throws Exception {
                 this.identifier = identifier;
 
         webSocket = new WebSocketConnection(server);
         webSocket.setEventHandler(new WebSocketEventHandler() {
 
             @Override
             public void onClose() {
                         listener.onClose();
             }
 
             @Override
             public void onMessage(WebSocketMessage msg) {
                             listener.onMessage(identifier, msg.getText());
             }
 
             @Override
             public void onOpen() {
                         listener.onOpen();
             }
         });
         webSocket.connect();
         }
 
     @Override
     public void close() throws Exception {
         webSocket.close();
     }
     @Override
     public void send(String text) throws Exception {
         webSocket.send(text);
     }
 
     @Override
     public String getIdentifier() {
         return identifier;
     }
 }
 

Kaazing

I also tested Kaazing a bit but it is a commercial software and their trial license limited connections so I gave up on it.

Load Tester

The load tester takes arguments about how many connections to open and how long to run the test. It generates identifiers from 1 to 10,000 and sends subscription message to the server for each of those channel identifiers. It then keeps track of messages received in a helper class Metrics and logs that information on regular interval, i.e.,

 package com.plexobject.websocket.client;
 
 
 import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.net.URI;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
 import java.util.concurrent.atomic.AtomicLong;
 
 public class LoadTester implements ClientEventListener {
     private static final AtomicLong OPEN_CONNECTIONS = new AtomicLong();
     private static final AtomicLong CLOSE_CONNECTIONS = new AtomicLong();
     private static final AtomicLong REQUEST_ID = new AtomicLong();
     private static final Metrics METRICS = new Metrics();
     private List<ClientProtocol> clients = new ArrayList<ClientProtocol>();
     private static final List<Integer> identifiers = new ArrayList<Integer>();
     static {
         for (int i=1; i<=10000; i++) {
             identifiers.add(i);
         }
     }
 
     public LoadTester(String url, int maxConnections) throws Exception {
         try {
             URI uri = new URI(url);
             for (int i=0; i<maxConnections; i++) {
                 String identifier = String.valueOf(identifiers.get(i%identifiers.size()));
                 ClientProtocol client = new WebSocketClient(identifier, uri, this);
                 String json = "{\"action\":\"subscribe\", \"identifier\":\"" + identifier + "\"}";
                 client.send(json);
                 clients.add(client);
                 try {
                     Thread.sleep(1);
                 } catch (InterruptedException e) {
                 }
             }
         } catch (OutOfMemoryError e) {
             System.err.println(e);
             System.exit(0);
         }
     }
 
     //
     public void close() {
         for (ClientProtocol client : clients) {
             try {
                 client.close();
             } catch (Exception e) {
             }
         }
         System.exit(0);
     }
 
     @Override
     public void onError(Throwable e) {
         System.err.println(e);
     }  
 
     @Override
     public void onMessage(String identifier, String message) {
         String act = METRICS.update(message);
         if (!identifier.equals(act)) {
             System.out.println("Expected " + identifier + ", but received " + act + " for " + message);
         }
     }
 
     @Override
     public void onClose() {
         CLOSE_CONNECTIONS.incrementAndGet();
     }
 
     @Override
     public void onOpen() {
         OPEN_CONNECTIONS.incrementAndGet();
     }
     public static void main(String[] args) throws Exception {
         if (args.length == 0) {
             System.err.println("Usage client.LoadTester url connections max-runtime-in-minutes");
             System.exit(1);
         }
         final String url = args[0]; // "ws://localhost:8124/"
         final int maxConnections = Integer.parseInt(args[1]);
         final int maxRuntimeMins = Integer.parseInt(args[2]);
         System.out.println("connections,connected," + Metrics.getHeading());
         final long started = System.currentTimeMillis();
         final Timer timer = new Timer(true);
         timer.schedule(new TimerTask() {
                 @Override
                 public void run() {
                     System.out.println(maxConnections + "," + (OPEN_CONNECTIONS.get()-CLOSE_CONNECTIONS.get()) + "," + METRICS.getSummary());
                     long elapsed = System.currentTimeMillis() - started;
                     if (elapsed > (maxRuntimeMins * 1000 * 60)) {
                         System.out.println("time done");
                         System.exit(0);
                     }
                 }
             }, 5000, 15000);
         //
         LoadTester runner = new LoadTester(url, maxConnections);
         while (true) {
             try {
                 Thread.sleep(5000);
             } catch (InterruptedException e) {
             }
             //
             if (CLOSE_CONNECTIONS.get() == maxConnections) {
                 System.out.println("all clients closed");
                 runner.close();
             }
         }
     }
 
 }
  

I am skipping some of helper classes and interfaces, but you can find them from the source code.

Test Script

I created a simple shell script to launch load test for 5 minutes and given number of connections and collected results in a comma delimited file, i.e,

 #!/bin/bash 
 mvn package
 export MAVEN_OPTS="-Xmx1000m -Xss128k"
 export url=ws://localhost:8080/atmosphere-1.0/channels
 #export url=ws://localhost:8080/channels
 #export url=ws://localhost:8124/
 clients=500
 while [ $clients -lt 100000 ]
 do
    mvn exec:java -Dexec.mainClass="com.plexobject.websocket.client.LoadTester" -Dexec.classpathScope=runtime -Dexec.args="$url $clients 5"
    clients=`expr $clients + 500|awk '{print $1}'`
 done
 

Note that I specified stack size of thread to be 128k as default stack size per thread is 2MB, which takes a lot of memory.

TCP Settings

As I mentioned in my last blog, I made some changes to my Linux machine to improve TCP settings, i.e.,

 net.core.rmem_max = 33554432
 net.core.wmem_max = 33554432
 net.ipv4.tcp_rmem = 4096 16384 33554432
 net.ipv4.tcp_wmem = 4096 16384 33554432
 net.ipv4.tcp_mem = 786432 1048576 26777216
 net.ipv4.tcp_max_tw_buckets = 360000
 net.core.netdev_max_backlog = 2500
 vm.min_free_kbytes = 65536
 vm.swappiness = 0
 net.ipv4.ip_local_port_range = 1024 65535

I then applied above changes using:

 sudo sysctl -w net.core.somaxconn=10000
 sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000 
 sudo sysctl -p

Results

Here are results of testing up to 5000 connections, where server sends a message to a random channel continuously:

Atmosphere

connections connected average response (ms) average size messages msg/sec walltime(secs)
500 500 1 1462 282431 943.317 301
1000 1000 1 1462 282100 940.967 302
1500 1500 1 1463 274156 920.233 304
2000 2000 1 1463 268180 936.733 309
2500 2500 1 1463 263443 926 307
3000 2985 1 1463 280809 932.233 332
3500 3500 2 1463 278412 934.333 384

Netty

connections connected average response (ms) average size messages msg/sec walltime(secs)
500 500 0.000 172.000 284165 937.833 305.000
1000 1000 0.000 172.000 279883 921.717 305.000
1500 1500 0.000 173.000 290317 959.983 305.000
2000 2000 0.000 173.000 286110 946.467 305.000
2500 2500 0.000 174.000 285122 949.550 305.000
3000 3000 0.000 174.000 281650 939.150 305.000
3500 3500 0.000 174.000 282833 942.700 305.000
4000 4000 0.000 174.000 283829 948.700 305.000
4500 4500 0.000 174.000 273293 921.583 305.000
5000 5000 0.000 174.000 264433 881.733 305.000
1000 1000 0.000 172.000 279883 921.717 305.000
1500 1500 0.000 173.000 290317 959.983 305.000
2000 2000 0.000 173.000 286110 946.467 305.000
2500 2500 0.000 174.000 285122 949.550 305.000
3000 3000 0.000 174.000 281650 939.150 305.000
3500 3500 0.000 174.000 282833 942.700 305.000
4000 4000 0.000 174.000 283829 948.700 305.000
4500 4500 0.000 174.000 273293 921.583 305.000
5000 5000 0.000 174.000 264433 881.733 305.000
5500 5500 0.000 174.000 289230 977.300 305.000
6000 6000 0.000 174.000 277692 934.800 305.000
6500 6500 0.000 174.000 148223 847.517 185.000

Vertx.io

connections average response (ms) average size messages msg/sec walltime(secs)
1000 1 1462 1487141 4056.433 365
2000 1 1464 1607431 5003.033 365
3000 1 1464 1663777 5061.717 365
4000 1 1464 1607104 5109.65 365
5000 1 1464 1706898 5374.433 365
6000 2 1465 1750757 5341.05 365
7000 3 1465 1723124 5263.283 365
8000 3 1465 1758174 5224.85 365
9000 2 1465 1717950 5038.45 365
10000 3 1465 1789671 5160.517 365
11000 3 1465 1789760 5140.8 365
12000 3 1465 1792477 5097.5 365
13000 4 1465 1778748 5128.917 365
14000 4 1465 1746830 4895.433 365
15000 3 1465 1760734 4960.367 365
16000 3 1465 1739786 4933.583 365
17000 4 1465 1727759 4902.883 365
18000 3 1465 1730165 5158.7 365
19000 4 1465 1725660 4904.2 367
20000 4 1465 1704129 4852.567 365
21000 4 1465 1703201 4849.833 365
22000 3 1465 1705387 5077.417 366
23000 3 1465 1671165 4861.233 365
24000 4 1465 1670471 4942.217 365
25000 4 1465 1639296 5108.317 365

Node.js

connections average response (ms) average size messages msg/sec walltime(secs)
1000 2 1363 1550155 4252.95 365
2000 1 1368 863997 2384.8 365
3000 1 1368 644806 1776.717 365
4000 1 1369 555496 1541.683 365
5000 1 1370 483624 1338 365
6000 1 1370 426728 1185.883 365
7000 1 1371 358331 994.017 365
8000 1 1371 322960 898.55 365
9000 1 1371 281227 787.633 365
10000 1 1371 260502 732.3 365
11000 1 1370 262209 733.633 365
12000 1 1371 243954 690.067 365
13000 2 1370 230117 647.833 365
14000 2 1371 235591 668.983 365
15000 2 1371 240632 685.35 365
16000 2 1371 192929 551.083 365
17000 2 1371 166705 474.217 365
18000 2 1371 142557 406.4 365
19000 2 1371 110873 319.467 365
20000 2 1371 106047 305.967 365
21000 2 1371 107947 307.633 365
22000 1 1371 89779 257.883 365
23000 2 1371 89890 259.367 365
24000 2 1371 34414 240.833 155

Summary

I tried to push limits for each framework, however Atmosphere could not handle more than 3500 connections and it slowed my machine to crawl. I was using 4.0-beta version of Netty, which also crashed after 6500 connections. Vertx.io was able to scale for upto 25,000 connections, but then server ran out of memory on my machine. Similarly, node.js kept going for about 24,000 but it became very slow and my load test client could not allocate more memory for connections. Both Vertx.io and Node.js proved to be leading contenders but Vertx.io was able to scale much better and maintain throughput better than Node.js. As a result, I picked Vertx.io for my project. You can download the source code and compare results yourself.

Powered by WordPress