My last two bogs (Pub/sub in Node.js and Websockets and Scaling Node.js) described how I have been evaluating Node.js and Websockets for streaming data. I would describe results of that evaluation along with a few other frameworks that I have been testing.
Use Case
The primary use case for my application is pub/sub where clients subscribe to some channel and receive updates such as subscribing for stock quotes or notifications about orders.
Node.js
In my last blog, I described all the issues I had with socket.io library for Node.js and after a lot of frustration I gave up on it. Instead, I used WS library for WebSockets and though it doesn’t provide fallback options for other protocols but’s it much more stable and efficient.
Pub/Sub Server
Here is the pub/sub server that provides methods to subscribe some channel identifier and sends update to a random channel continuously.
var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 5; var MAX_THREADS = process.env.MAX_THREADS || 5; var MAX_ROWS = process.env.MAX_ROWS || 10; var SEND_MESSAGE_INTERVAL = SEND_MESSAGE_INTERVAL || 1; var nextClientId = 0; var errors = 0; // var hwUsage = require('hardware_usage'); var metrics = require('metrics')(OUTLIER_SIZE); var cluster = require('cluster'); hwUsage.start(); if (cluster.isMaster) { for (var i = 0; i < MAX_THREADS; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { }); console.log('clients, errors, ' + metrics.heading() + ',' + hwUsage.heading()); } else { var pubsubController = require('pubsub_controller')(cluster.worker.id, MAX_ROWS, hwUsage, metrics); var WebSocketServer = require('ws').Server , wss = new WebSocketServer({port: 8124}); wss.on('connection', function(ws) { ws.key = ++nextClientId; ws.on('close', function() { pubsubController.unsubscribeAll(ws); }); ws.on('error', function() { errors++; }); ws.on('message', function(text) { var msg = JSON.parse(text); if (msg.action === 'subscribe') { pubsubController.subscribe(ws, msg.identifier); } else if (msg.action === 'unsubscribe') { pubsubController.subscribe(ws, msg.identifier); } }); }); setInterval(function() { pubsubController.push(); }, SEND_MESSAGE_INTERVAL); setInterval(function() { hwUsage.stop(function(usage) { console.log(nextClientId+ ',' + errors + metrics.summary().toString() + ',' + usage.toString()); }); }, 15000); } process.on('uncaughtException', function(err) { console.log("GLOBAL " + err + "\n" + err.stack); });
Subscription management
Following module defines APIs for managing subscriptions and as I mentioned when number of messages received by client reaches maxMessages, it unsubscribes from the server.
var helper = require('helper'); module.exports = function(workerId, numElements, hwUsage, metrics) { var channelSubscribers = {}; var nextRequestId = 0; var allKeys = {}; allKeys[workerId] = []; var generatePayload = function(identifier) { var elements = []; var date = new Date(); for (var i=0; i<numElements; i++) { elements.push({sequence:i, price: helper.random(100), identifierId:identifier, symbol:"QQQ", transaction:"STO", qty: helper.random(200)}); } var reqId = Number(++nextRequestId + '.' + workerId); var payload = {request: reqId, rows:numElements, timestamp : date.getTime(), identifier:Number(identifier), elements: elements}; return payload; }; var makeKey = function(socket) { return workerId + '_' + socket.key; }; var getSubscriber = function(socket) { return channelSubscribers[makeKey(socket)]; }; var setSubscriber = function(socket, rec) { channelSubscribers[makeKey(socket)] = rec; }; var removeSubscribers = function(socket) { delete channelSubscribers[makeKey(socket)]; }; return { subscribe: function(socket, identifier) { var rec = getSubscriber(socket); if (typeof rec === "undefined") { rec = {socket : socket, identifiers : []}; } rec.identifiers.push(identifier); allKeys[workerId].push(makeKey(socket)); setSubscriber(socket, rec); }, unsubscribe: function(socket, identifier) { var rec = getSubscriber(socket); if (typeof rec === "undefined") { return false; } var ndx = rec.identifiers.indexOf(identifier); rec.identifiers.splice(ndx, 1); if (rec.identifiers.length == 0) { removeSubscribers(socket); } else { setSubscriber(socket, rec); } }, unsubscribeAll: function(socket) { removeSubscribers(socket); }, count: function() { return helper.size(channelSubscribers); }, push: function() { if (allKeys[workerId].length == 0) return 0; var key = allKeys[workerId][helper.random(allKeys[workerId].length)]; var rec = channelSubscribers[key]; if (typeof rec === "undefined") { return -1; } // for (var i=0; i<rec.identifiers.length; i++) { var identifier = rec.identifiers[i]; try { var request = generatePayload(identifier); var text = JSON.stringify(request); rec.socket.send(text); metrics.update(nextRequestId, new Date().getTime(), text.length); } catch (err) { console.log('failed to send execution report' + err); try { rec.socket.disconnect(); } catch (ex) { console.log('failed to close socket ' + ex); } delete channelSubscribers[key]; } } } }; }
I am skipping some of helper modules, but you can find them from the source code.
Vertx.io
The Vertx.io framework follows Node.js design for single-thread event loop with asynchronous I/O but is written in Java. It also provides polyglot support for a number of languages such as Java, Javascript, Ruby, Groovy, and Python. Here is similar implementation of Pub/Sub server in Java:
package com.plexobject.vertx; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; import org.vertx.java.core.Handler; import org.vertx.java.core.buffer.Buffer; import org.vertx.java.core.http.HttpServerRequest; import org.vertx.java.core.http.ServerWebSocket; import org.vertx.java.deploy.Verticle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashSet; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.Random; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; public class VertxServer extends Verticle { private static final Logger logger = LoggerFactory.getLogger(VertxServer.class); private final ObjectMapper mapper = new ObjectMapper(); private final JsonFactory jsonFactory = mapper.getJsonFactory(); private Map<String, Set<ServerWebSocket>> subscriptions = new ConcurrentHashMap<String, Set<ServerWebSocket>>(); private Timer sendTimer = new Timer(true); private Timer metricsTimer = new Timer(true); private final AtomicLong nextRequestId = new AtomicLong(); private final Metrics metrics = new Metrics(); private final StringBuilder elements = new StringBuilder(); private final Random random = new Random(); private int maxIdentifier; public VertxServer() { logger.info("identifiers," + Metrics.getHeading()); for (int i=0; i<500; i++) { elements.append(String.valueOf(i)); } metricsTimer.schedule(new TimerTask() { @Override public void run() { logger.info(subscriptions.size() + metrics.getSummary().toString()); } }, 5000, 15000); sendTimer.schedule(new TimerTask() { @Override public void run() { while (true) { sendMessageForRandomIdentifier(); } } }, 1000); } public void start() { vertx.createHttpServer().setAcceptBacklog(10000).websocketHandler(new Handler<ServerWebSocket>() { public void handle(final ServerWebSocket ws) { if (ws.path.equals("/channels")) { ws.dataHandler(new Handler<Buffer>() { public void handle(Buffer data) { try { JsonParser jp = jsonFactory.createJsonParser(data.toString()); JsonNode jsonObj = mapper.readTree(jp); String action = jsonObj.get("action").asText(); String identifier = jsonObj.get("identifier").asText(); if (action == null || identifier == null) { return; } if ("subscribe".equals(action)) { maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier); logger.info("Max " + maxIdentifier); Set<ServerWebSocket> sockets = subscriptions.get(identifier); if (sockets == null) { sockets = new HashSet<ServerWebSocket>(); subscriptions.put(identifier, sockets); } sockets.add(ws); } else if ("unsubscribe".equals(action)) { Set<ServerWebSocket> sockets = subscriptions.get(identifier); if (sockets == null) { return; } sockets.remove(ws); if (sockets.size() == 0) { subscriptions.remove(identifier); } } } catch (IOException e) { logger.error("Failed to handle " + data, e); } } }); } else { ws.reject(); } } }).requestHandler(new Handler<HttpServerRequest>() { public void handle(HttpServerRequest req) { //if (req.path.equals("/")) req.response.sendFile("websockets/ws.html"); // Serve the html } }).listen(8080); } // private String getRandomIdentifier() { return String.valueOf(Math.abs(random.nextInt(maxIdentifier)) + 1); } public void sendMessageForRandomIdentifier() { if (subscriptions.size() == 0) { return; } String identifier = getRandomIdentifier(); if (identifier == null) { return; } Set<ServerWebSocket> sockets = subscriptions.get(identifier); if (sockets == null || sockets.size() == 0) { return; } final long t = System.currentTimeMillis(); String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}"; for (ServerWebSocket ws : sockets) { try { ws.writeTextFrame(msg); } catch (Exception e) { subscriptions.remove(ws); logger.error("Failed to send " + msg, e); } } metrics.update(msg); } }
Note that Vertx.io requires JDK 7 and here is how you can start the server:
export PATH=/home/sbhatti/jdk1.7.0_10/bin:$PATH export JAVA_HOME=/home/sbhatti/jdk1.7.0_10 mvn package bin/vertx run com.plexobject.vertx.VrtxServer -cp target/classes/ -instances=5
Note that I started Vertx.io with 5 instances of the server to match Node.js’ clustering options.
Netty
Netty.io is a lightweight application server that provides asynchronous events and I/O so I also tested its Websocket’s support in its latest 4.0-beta version.
Bootstrap
Here is the main server implementation that bootstraps web-socket server:
package com.plexobject.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.socket.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class WebSocketServer { private final int port; public WebSocketServer(int port) { this.port = port; } public void run() throws Exception { ServerBootstrap b = new ServerBootstrap(); try { b.group(new NioEventLoopGroup(), new NioEventLoopGroup()) .channel(NioServerSocketChannel.class).localAddress(port) .childHandler(new WebSocketServerInitializer()); Channel ch = b.bind().sync().channel(); ch.closeFuture().sync(); } finally { b.shutdown(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8124; } new WebSocketServer(port).run(); } }
Pub/Sub Server
Here is implementation of websocket handler that provides pub/sub functionality:
package com.plexobject.netty.server; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.Random; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.math.BigInteger; import java.security.SecureRandom; import java.util.concurrent.atomic.AtomicLong; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaders.setContentLength; import static io.netty.handler.codec.http.HttpMethod.GET; import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.CharsetUtil; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.ObjectMapper; public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<Object> { @SuppressWarnings("unused") private static final InternalLogger LOGGER = InternalLoggerFactory .getInstance(WebSocketServerHandler.class); private static final ObjectMapper jsonMapper = new ObjectMapper(); private static final JsonFactory jsonFactory = jsonMapper.getJsonFactory(); private static final String WEBSOCKET_PATH = "/"; private WebSocketServerHandshaker handshaker; private static Map<String, Set<ChannelHandlerContext>> subscriptions = new ConcurrentHashMap<String, Set<ChannelHandlerContext>>(); private static Timer timer = new Timer(true); private static final AtomicLong REQUEST_ID = new AtomicLong(); private static final Metrics METRICS = new Metrics(); static { timer.schedule(new TimerTask() { @Override public void run() { while (true) { sendMessageForRandomIdentifier(); } } }, 1000); } @Override public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequest) { handleHttpRequest(ctx, (HttpRequest) msg); } else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) throws Exception { // Allow only GET methods. if (req.getMethod() != GET) { sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN)); return; } if (req.getUri().equals("/favicon.ico")) { HttpResponse res = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND); sendHttpResponse(ctx, req, res); return; } // Handshake WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(req), null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws IOException { // Check for closing frame if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame); return; } else if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData())); return; } else if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } String jsonText = ((TextWebSocketFrame) frame).getText(); JsonParser jp = jsonFactory.createJsonParser(jsonText); JsonNode actualObj = jsonMapper.readTree(jp); String action = actualObj.get("action").getTextValue(); String identifier = actualObj.get("identifier").getTextValue(); if (action == null || identifier == null) { return; } if ("subscribe".equals(action)) { Set<ChannelHandlerContext> contexts = subscriptions.get(identifier); if (contexts == null) { contexts = new HashSet<ChannelHandlerContext>(); subscriptions.put(identifier, contexts); } contexts.add(ctx); } else if ("unsubscribe".equals(action)) { Set<ChannelHandlerContext> contexts = subscriptions.get(identifier); if (contexts == null) { return; } contexts.remove(ctx); if (contexts.size() == 0) { subscriptions.remove(identifier); } } } private static String getRandomIdentifier() { List<String> identifiers = new ArrayList<String>(subscriptions.keySet()); if (identifiers.size() == 0) { return null; } int n = new Random().nextInt(identifiers.size()); return identifiers.get(n); } public static void sendMessageForRandomIdentifier() { String identifier = getRandomIdentifier(); if (identifier == null) { return; } Set<ChannelHandlerContext> contexts = subscriptions.get(identifier); if (contexts == null || contexts.size() == 0) { return; } SecureRandom random = new SecureRandom(); String elements = new BigInteger(500, random).toString(32); String json = "{\"request\":" + REQUEST_ID.incrementAndGet() + ", \"timestamp\":" + System.currentTimeMillis() + ", \"identifier\":\"" + identifier + "\", \"elements\": \"" + elements + "\"}"; TextWebSocketFrame frame = new TextWebSocketFrame(json); METRICS.update(json); for (ChannelHandlerContext ctx : contexts) { try { ctx.channel().write(frame); } catch (Exception e) { subscriptions.remove(ctx); } } if (System.currentTimeMillis() % 1000 == 0) { System.out.println("identifiers," + Metrics.getHeading()); System.out.println(subscriptions.size() + METRICS.getSummary().toString()); } } private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { // Generate an error page if response status code is not OK (200). if (res.getStatus().getCode() != 200) { res.setContent(Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8)); setContentLength(res, res.getContent().readableBytes()); } // Send the response and close the connection if necessary. ChannelFuture f = ctx.channel().write(res); if (!isKeepAlive(req) || res.getStatus().getCode() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof java.nio.channels.ClosedChannelException == false) { //cause.printStackTrace(); } ctx.close(); } private static String getWebSocketLocation(HttpRequest req) { return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH; } }
As, you can see its implementation is a bit verbose and I am skipping some of other boiler code.
Atmosphere
package com.plexobject; import org.atmosphere.config.service.WebSocketHandlerService; import org.atmosphere.cpr.Broadcaster; import org.atmosphere.cpr.BroadcasterFactory; import org.atmosphere.websocket.WebSocket; import org.atmosphere.websocket.WebSocketHandler; import org.atmosphere.websocket.WebSocketProcessor.WebSocketException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.atmosphere.cpr.AtmosphereServlet; import org.atmosphere.websocket.WebSocketHandler; @WebSocketHandlerService public class ChannelResponseHandler extends WebSocketHandler { private static final Logger logger = LoggerFactory.getLogger(ChannelResponseHandler.class); private static final Metrics METRICS = new Metrics(); private static final String PATH_PREFIX = "/channels"; private boolean started; private int maxIdentifier; private final Random random = new Random(); private Timer timer = new Timer(true); private final ObjectMapper mapper = new ObjectMapper(); private final JsonFactory jsonFactory = mapper.getJsonFactory(); private final AtomicLong opened = new AtomicLong(); private final AtomicLong closed = new AtomicLong(); private final AtomicLong nextRequestId = new AtomicLong(); private final StringBuilder elements = new StringBuilder(); public ChannelResponseHandler() { for (int i=0; i<500; i++) { elements.append(String.valueOf(i)); } } @Override public void onTextMessage(WebSocket webSocket, String msg) { try { JsonParser jp = jsonFactory.createJsonParser(msg); JsonNode jsonObj = mapper.readTree(jp); String action = jsonObj.get("action").asText(); String identifier = jsonObj.get("identifier").asText(); logger.debug("onMessage " + action + " - " + identifier); if (action == null || identifier == null) { return; } if ("subscribe".equals(action)) { getBroadcaster(identifier).addAtmosphereResource(webSocket.resource()); maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier); } else if ("unsubscribe".equals(action)) { getBroadcaster(identifier).removeAtmosphereResource(webSocket.resource()); } start(); } catch (Exception e) { logger.error("Failed to handle message " + msg, e); } } @Override public void onOpen(WebSocket webSocket) { logger.trace("onOpen {}", webSocket.resource().getRequest()); webSocket.resource().suspend(); opened.incrementAndGet(); } @Override public void onClose(WebSocket webSocket) { logger.trace("onClose"); getBroadcaster().removeAtmosphereResource(webSocket.resource()); super.onClose(webSocket); closed.incrementAndGet(); } @Override public void onError(WebSocket webSocket, WebSocketException t) { System.err.println("error " + t); logger.trace("onError {}", t.getStackTrace()); super.onError(webSocket, t); } public void sendMessageForRandomIdentifier() { try { if (opened.get() <= closed.get()) { return; } String identifier = getRandomIdentifier(); Broadcaster bc = getBroadcaster(identifier); if (bc == null) { logger.debug("Failed to find broadcaster for " + identifier); return; } long t = System.currentTimeMillis(); String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}"; bc.broadcast(msg); METRICS.update(msg); if (t % 1000 == 0) { logger.info(METRICS.getSummary().toString()); } } catch (Exception e) { logger.error("Failed to send message", e); } } private Broadcaster getBroadcaster() { return getBroadcaster(null); } // private Broadcaster getBroadcaster(String identifier) { String path = identifier == null ? PATH_PREFIX : PATH_PREFIX + "/" + identifier; BroadcasterFactory factory = BroadcasterFactory.getDefault(); return factory != null ? factory.lookup(path, true) : null; } synchronized void start() { if (started) return; started = true; timer.schedule(new TimerTask() { @Override public void run() { while (started) { sendMessageForRandomIdentifier(); } } }, 1000); } synchronized void stop() { timer.cancel(); } private String getRandomIdentifier() { return String.valueOf(random.nextInt(maxIdentifier) + 1); } public static void main(String[] arguments) throws Exception { try { Server server = new Server(); Connector con = new SelectChannelConnector(); con.setPort(8124); server.addConnector(con); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath("/"); server.setHandler(context); context.addServlet(new ServletHolder(new AtmosphereServlet()), "/*"); server.start();//WebSocketHandler server.join(); } catch (Exception ex) { System.err.println(ex); } } }
Note that I had to deploy Atmosphere in Jetty 8.1.7 as a war file and here is web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" id="WebApp_ID" version="2.5"> <display-name>WebSocket Load Test</display-name> <servlet> <description>AtmosphereServlet</description> <servlet-name>AtmosphereServlet</servlet-name> <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>AtmosphereServlet</servlet-name> <url-pattern>/channels/*</url-pattern> </servlet-mapping> </web-app>
Client
I previously wrote client in Javascript but I rewrote in Java so that I can use Java’s concurrent library to manage counters.
WebSocket Client
I tried Netty and weberknecht libraries and settled on weberknecht. Here is its implementation:
package com.plexobject.websocket.client; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URI; import java.net.URL; import java.util.Calendar; import java.util.HashMap; import java.util.Map; import de.roderick.weberknecht.*; public class WebSocketClient implements ClientProtocol { private WebSocket webSocket; private final String identifier; public WebSocketClient(final String identifier, final URI server, final ClientEventListener listener) throws Exception { this.identifier = identifier; webSocket = new WebSocketConnection(server); webSocket.setEventHandler(new WebSocketEventHandler() { @Override public void onClose() { listener.onClose(); } @Override public void onMessage(WebSocketMessage msg) { listener.onMessage(identifier, msg.getText()); } @Override public void onOpen() { listener.onOpen(); } }); webSocket.connect(); } @Override public void close() throws Exception { webSocket.close(); } @Override public void send(String text) throws Exception { webSocket.send(text); } @Override public String getIdentifier() { return identifier; } }
Kaazing
I also tested Kaazing a bit but it is a commercial software and their trial license limited connections so I gave up on it.
Load Tester
The load tester takes arguments about how many connections to open and how long to run the test. It generates identifiers from 1 to 10,000 and sends subscription message to the server for each of those channel identifiers. It then keeps track of messages received in a helper class Metrics and logs that information on regular interval, i.e.,
package com.plexobject.websocket.client; import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.math.BigInteger; import java.net.URI; import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; public class LoadTester implements ClientEventListener { private static final AtomicLong OPEN_CONNECTIONS = new AtomicLong(); private static final AtomicLong CLOSE_CONNECTIONS = new AtomicLong(); private static final AtomicLong REQUEST_ID = new AtomicLong(); private static final Metrics METRICS = new Metrics(); private List<ClientProtocol> clients = new ArrayList<ClientProtocol>(); private static final List<Integer> identifiers = new ArrayList<Integer>(); static { for (int i=1; i<=10000; i++) { identifiers.add(i); } } public LoadTester(String url, int maxConnections) throws Exception { try { URI uri = new URI(url); for (int i=0; i<maxConnections; i++) { String identifier = String.valueOf(identifiers.get(i%identifiers.size())); ClientProtocol client = new WebSocketClient(identifier, uri, this); String json = "{\"action\":\"subscribe\", \"identifier\":\"" + identifier + "\"}"; client.send(json); clients.add(client); try { Thread.sleep(1); } catch (InterruptedException e) { } } } catch (OutOfMemoryError e) { System.err.println(e); System.exit(0); } } // public void close() { for (ClientProtocol client : clients) { try { client.close(); } catch (Exception e) { } } System.exit(0); } @Override public void onError(Throwable e) { System.err.println(e); } @Override public void onMessage(String identifier, String message) { String act = METRICS.update(message); if (!identifier.equals(act)) { System.out.println("Expected " + identifier + ", but received " + act + " for " + message); } } @Override public void onClose() { CLOSE_CONNECTIONS.incrementAndGet(); } @Override public void onOpen() { OPEN_CONNECTIONS.incrementAndGet(); } public static void main(String[] args) throws Exception { if (args.length == 0) { System.err.println("Usage client.LoadTester url connections max-runtime-in-minutes"); System.exit(1); } final String url = args[0]; // "ws://localhost:8124/" final int maxConnections = Integer.parseInt(args[1]); final int maxRuntimeMins = Integer.parseInt(args[2]); System.out.println("connections,connected," + Metrics.getHeading()); final long started = System.currentTimeMillis(); final Timer timer = new Timer(true); timer.schedule(new TimerTask() { @Override public void run() { System.out.println(maxConnections + "," + (OPEN_CONNECTIONS.get()-CLOSE_CONNECTIONS.get()) + "," + METRICS.getSummary()); long elapsed = System.currentTimeMillis() - started; if (elapsed > (maxRuntimeMins * 1000 * 60)) { System.out.println("time done"); System.exit(0); } } }, 5000, 15000); // LoadTester runner = new LoadTester(url, maxConnections); while (true) { try { Thread.sleep(5000); } catch (InterruptedException e) { } // if (CLOSE_CONNECTIONS.get() == maxConnections) { System.out.println("all clients closed"); runner.close(); } } } }
I am skipping some of helper classes and interfaces, but you can find them from the source code.
Test Script
I created a simple shell script to launch load test for 5 minutes and given number of connections and collected results in a comma delimited file, i.e,
#!/bin/bash mvn package export MAVEN_OPTS="-Xmx1000m -Xss128k" export url=ws://localhost:8080/atmosphere-1.0/channels #export url=ws://localhost:8080/channels #export url=ws://localhost:8124/ clients=500 while [ $clients -lt 100000 ] do mvn exec:java -Dexec.mainClass="com.plexobject.websocket.client.LoadTester" -Dexec.classpathScope=runtime -Dexec.args="$url $clients 5" clients=`expr $clients + 500|awk '{print $1}'` done
Note that I specified stack size of thread to be 128k as default stack size per thread is 2MB, which takes a lot of memory.
TCP Settings
As I mentioned in my last blog, I made some changes to my Linux machine to improve TCP settings, i.e.,
net.core.rmem_max = 33554432 net.core.wmem_max = 33554432 net.ipv4.tcp_rmem = 4096 16384 33554432 net.ipv4.tcp_wmem = 4096 16384 33554432 net.ipv4.tcp_mem = 786432 1048576 26777216 net.ipv4.tcp_max_tw_buckets = 360000 net.core.netdev_max_backlog = 2500 vm.min_free_kbytes = 65536 vm.swappiness = 0 net.ipv4.ip_local_port_range = 1024 65535
I then applied above changes using:
sudo sysctl -w net.core.somaxconn=10000 sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000 sudo sysctl -p
Results
Here are results of testing up to 5000 connections, where server sends a message to a random channel continuously:
Atmosphere
connections | connected | average response (ms) | average size | messages | msg/sec | walltime(secs) |
---|---|---|---|---|---|---|
500 | 500 | 1 | 1462 | 282431 | 943.317 | 301 |
1000 | 1000 | 1 | 1462 | 282100 | 940.967 | 302 |
1500 | 1500 | 1 | 1463 | 274156 | 920.233 | 304 |
2000 | 2000 | 1 | 1463 | 268180 | 936.733 | 309 |
2500 | 2500 | 1 | 1463 | 263443 | 926 | 307 |
3000 | 2985 | 1 | 1463 | 280809 | 932.233 | 332 |
3500 | 3500 | 2 | 1463 | 278412 | 934.333 | 384 |
Netty
connections | connected | average response (ms) | average size | messages | msg/sec | walltime(secs) |
---|---|---|---|---|---|---|
500 | 500 | 0.000 | 172.000 | 284165 | 937.833 | 305.000 |
1000 | 1000 | 0.000 | 172.000 | 279883 | 921.717 | 305.000 |
1500 | 1500 | 0.000 | 173.000 | 290317 | 959.983 | 305.000 |
2000 | 2000 | 0.000 | 173.000 | 286110 | 946.467 | 305.000 |
2500 | 2500 | 0.000 | 174.000 | 285122 | 949.550 | 305.000 |
3000 | 3000 | 0.000 | 174.000 | 281650 | 939.150 | 305.000 |
3500 | 3500 | 0.000 | 174.000 | 282833 | 942.700 | 305.000 |
4000 | 4000 | 0.000 | 174.000 | 283829 | 948.700 | 305.000 |
4500 | 4500 | 0.000 | 174.000 | 273293 | 921.583 | 305.000 |
5000 | 5000 | 0.000 | 174.000 | 264433 | 881.733 | 305.000 |
1000 | 1000 | 0.000 | 172.000 | 279883 | 921.717 | 305.000 |
1500 | 1500 | 0.000 | 173.000 | 290317 | 959.983 | 305.000 |
2000 | 2000 | 0.000 | 173.000 | 286110 | 946.467 | 305.000 |
2500 | 2500 | 0.000 | 174.000 | 285122 | 949.550 | 305.000 |
3000 | 3000 | 0.000 | 174.000 | 281650 | 939.150 | 305.000 |
3500 | 3500 | 0.000 | 174.000 | 282833 | 942.700 | 305.000 |
4000 | 4000 | 0.000 | 174.000 | 283829 | 948.700 | 305.000 |
4500 | 4500 | 0.000 | 174.000 | 273293 | 921.583 | 305.000 |
5000 | 5000 | 0.000 | 174.000 | 264433 | 881.733 | 305.000 |
5500 | 5500 | 0.000 | 174.000 | 289230 | 977.300 | 305.000 |
6000 | 6000 | 0.000 | 174.000 | 277692 | 934.800 | 305.000 |
6500 | 6500 | 0.000 | 174.000 | 148223 | 847.517 | 185.000 |
Vertx.io
connections | average response (ms) | average size | messages | msg/sec | walltime(secs) |
---|---|---|---|---|---|
1000 | 1 | 1462 | 1487141 | 4056.433 | 365 |
2000 | 1 | 1464 | 1607431 | 5003.033 | 365 |
3000 | 1 | 1464 | 1663777 | 5061.717 | 365 |
4000 | 1 | 1464 | 1607104 | 5109.65 | 365 |
5000 | 1 | 1464 | 1706898 | 5374.433 | 365 |
6000 | 2 | 1465 | 1750757 | 5341.05 | 365 |
7000 | 3 | 1465 | 1723124 | 5263.283 | 365 |
8000 | 3 | 1465 | 1758174 | 5224.85 | 365 |
9000 | 2 | 1465 | 1717950 | 5038.45 | 365 |
10000 | 3 | 1465 | 1789671 | 5160.517 | 365 |
11000 | 3 | 1465 | 1789760 | 5140.8 | 365 |
12000 | 3 | 1465 | 1792477 | 5097.5 | 365 |
13000 | 4 | 1465 | 1778748 | 5128.917 | 365 |
14000 | 4 | 1465 | 1746830 | 4895.433 | 365 |
15000 | 3 | 1465 | 1760734 | 4960.367 | 365 |
16000 | 3 | 1465 | 1739786 | 4933.583 | 365 |
17000 | 4 | 1465 | 1727759 | 4902.883 | 365 |
18000 | 3 | 1465 | 1730165 | 5158.7 | 365 |
19000 | 4 | 1465 | 1725660 | 4904.2 | 367 |
20000 | 4 | 1465 | 1704129 | 4852.567 | 365 |
21000 | 4 | 1465 | 1703201 | 4849.833 | 365 |
22000 | 3 | 1465 | 1705387 | 5077.417 | 366 |
23000 | 3 | 1465 | 1671165 | 4861.233 | 365 |
24000 | 4 | 1465 | 1670471 | 4942.217 | 365 |
25000 | 4 | 1465 | 1639296 | 5108.317 | 365 |
Node.js
connections | average response (ms) | average size | messages | msg/sec | walltime(secs) |
---|---|---|---|---|---|
1000 | 2 | 1363 | 1550155 | 4252.95 | 365 |
2000 | 1 | 1368 | 863997 | 2384.8 | 365 |
3000 | 1 | 1368 | 644806 | 1776.717 | 365 |
4000 | 1 | 1369 | 555496 | 1541.683 | 365 |
5000 | 1 | 1370 | 483624 | 1338 | 365 |
6000 | 1 | 1370 | 426728 | 1185.883 | 365 |
7000 | 1 | 1371 | 358331 | 994.017 | 365 |
8000 | 1 | 1371 | 322960 | 898.55 | 365 |
9000 | 1 | 1371 | 281227 | 787.633 | 365 |
10000 | 1 | 1371 | 260502 | 732.3 | 365 |
11000 | 1 | 1370 | 262209 | 733.633 | 365 |
12000 | 1 | 1371 | 243954 | 690.067 | 365 |
13000 | 2 | 1370 | 230117 | 647.833 | 365 |
14000 | 2 | 1371 | 235591 | 668.983 | 365 |
15000 | 2 | 1371 | 240632 | 685.35 | 365 |
16000 | 2 | 1371 | 192929 | 551.083 | 365 |
17000 | 2 | 1371 | 166705 | 474.217 | 365 |
18000 | 2 | 1371 | 142557 | 406.4 | 365 |
19000 | 2 | 1371 | 110873 | 319.467 | 365 |
20000 | 2 | 1371 | 106047 | 305.967 | 365 |
21000 | 2 | 1371 | 107947 | 307.633 | 365 |
22000 | 1 | 1371 | 89779 | 257.883 | 365 |
23000 | 2 | 1371 | 89890 | 259.367 | 365 |
24000 | 2 | 1371 | 34414 | 240.833 | 155 |
Summary
I tried to push limits for each framework, however Atmosphere could not handle more than 3500 connections and it slowed my machine to crawl. I was using 4.0-beta version of Netty, which also crashed after 6500 connections. Vertx.io was able to scale for upto 25,000 connections, but then server ran out of memory on my machine. Similarly, node.js kept going for about 24,000 but it became very slow and my load test client could not allocate more memory for connections. Both Vertx.io and Node.js proved to be leading contenders but Vertx.io was able to scale much better and maintain throughput better than Node.js. As a result, I picked Vertx.io for my project. You can download the source code and compare results yourself.