A couple of years ago, I implemented a quote server using XMPP, Bosh, Strophe and Ejabbberd at work and described an overview of the design and code in my blog. We have been looking at Node.js and Websockets recently for streaming data so I will describe a simple implementation of quote server here using those two technologies:
Quote Server
The quote server uses Web sockets and listens for subscribe-stock-quote message to subscribe for a particular quote and unsubscribe-stock-quote message to unsubscribe.
Here is a simple implementation of the Quote server:
var express = require('express') , sio = require('socket.io') , http = require('http') , quoteSubscriptions = require('quote_subscriptions') , app = express(); var server = http.createServer(app); var sequence = 0; app.configure(function () { app.use(express.static(__dirname + '/public')); app.use(app.router); }) var io = sio.listen(server); io.set('log level', 1); server.listen(8080); io.sockets.on('connection', function(socket) { // subscribe to stock quotes socket.on('subscribe-stock-quote', function(symbol) { console.log('subscribe-stock-quote received'); quoteSubscriptions.subscribe(socket, symbol, function() { socket.emit('subscribed-stock-quote', 'You have been subscribed to ' + symbol); }); }); // unsubscribe to stock quotes socket.on('unsubscribe-stock-quote', function (symbol) { console.log('unsubscribe-stock-quote received'); quoteSubscriptions.unsubscribe(socket, symbol, function() { socket.emit('unsubscribed-stock-quote received', 'You have been unsubscribed to ' + symbol); }); }); // disconnected socket.on('disconnect', function() { quoteSubscriptions.unsubscribeAll(socket, function() { socket.emit('disconnected', socket.symbol + ' has been unsubscribed from stock quotes.'); }); }); }); // update clients every 500 milli-seconds with latest stock quotes setInterval(function () { quoteSubscriptions.pushUpdates(); io.sockets.emit('sequence', ++sequence); }, 500);
In addition to sending quote updates, the server also sends a loop counter so that we know how many times quotes have been pushed to clients and that message is broadcasted to all clients.
The subscriptions are managed in another module quote_subscriptions, i.e.,:
//////////////////////////////////////////////////////////////////////////////////////////// // // This module maintains quote subscriptions for all clients // //////////////////////////////////////////////////////////////////////////////////////////// var yclient = require('y_client'); var stocksSubscribers = {}; var quoteCache = {}; exports.subscribe = function(socket, symbol, callback) { if (typeof socket.stockSymbols === "undefined") { socket.stockSymbols = []; } if (socket.stockSymbols.indexOf(symbol) == -1) { socket.stockSymbols.push(symbol); } // var subscribers = stocksSubscribers[symbol]; if (typeof subscribers === "undefined") { subscribers = []; stocksSubscribers[symbol] = subscribers; } if (subscribers.indexOf(socket) == -1) { subscribers.push(socket); } callback(); } exports.unsubscribeAll = function(socket, callback) { if (typeof socket.stockSymbols === "undefined") { return; } for (var i=0; i<socket.stockSymbols.length; i++) { this.unsubscribe(socket, socket.stockSymbols[i], function() {}); } callback(); } exports.unsubscribe = function(socket, symbol, callback) { if (typeof socket.stockSymbols === "undefined") { return; } var ndx = socket.stockSymbols.indexOf(symbol); if (ndx !== -1) { socket.stockSymbols.splice(ndx, 1); } // var subscribers = stocksSubscribers[symbol]; if (typeof subscribers === "undefined") { return; } ndx = subscribers.indexOf(socket); if (ndx !== -1) { subscribers.splice(ndx, 1); } callback(); } exports.pushUpdates = function() { for (var symbol in stocksSubscribers) { var sockets = stocksSubscribers[symbol]; if (sockets.length == 0) { continue; } quote = quoteCache[symbol]; if (typeof quote === "undefined") { yclient.quote(symbol, function(quote) { quoteCache[symbol] = quote; sendQuote(quote, sockets); }); } else { sendQuote(quote, sockets); } } } function sendQuote(quote, sockets) { var rnd = Math.random(); quote.counter = quote.counter + 1; quote.bid = round(rnd % 2 == 0 ? quote.original_bid + rnd : quote.original_bid - rnd, 2); quote.ask = round(rnd % 2 == 0 ? quote.original_ask + rnd : quote.original_ask - rnd, 2); for (var i=0; i<sockets.length; i++) { var socket = sockets[i]; var address = socket.handshake.address; console.log('Quote sending ' + quote.symbol + ' ' + quote.counter + ' to ' + address.address + ":" + address.port); socket.volatile.emit('receive-stock-quote', quote); } } function round(num, dec) { return Math.round(num*Math.pow(10,dec))/Math.pow(10,dec); }
The above module defines methods to subscribe, unsubscribe and push quote updates to all clients. Note that above module requests quotes from Yahoo if it doesn’t exist in the cache and then randomly changes bid/ask values.
Yahoo Finance Client
I used Yahoo finance to retrieve the quotes, however these quotes are cached and modified by the quote subscription module:
/////////////////////////////////////////////////////////////////////////////////////////// // // This module provides quote/chain apis using yahoo finance // //////////////////////////////////////////////////////////////////////////////////////////// var http = require('http') , libxmljs = require('libxmljs') , util = require('util') exports.quote = function(symbol, callback) { api('download.finance.yahoo.com', '/d/quotes.csv?s=' + symbol + '&f=nb2b3ra2yj3e7gopjkl1xm3m4h', function(text) { cols = text.split(','); callback({'symbol':cols[0], 'ask':cols[1], 'bid':cols[2], 'counter':0, 'original_ask':cols[1], 'original_bid':cols[2]}); }); } function api(host, path, callback) { var options = { host: host, port: 80, path: path, method: 'GET', headers: {'User-Agent': 'Node'} }; var req = http.request(options, function(res) { res.setEncoding('utf8'); var text = ''; res.on('data', function (chunk) { text += chunk; }); res.on('end', function (e) { text = text.replace(/"/g, ''); callback(text); }); res.on('error', function (e) { console.log('Could not send api with response : ' + e.message); }); }); req.on('error', function(e) { console.log('Could not send api with response : ' + e.message); }); // write data to request body req.end(); }
Web Client
Here is an example of Web client that allows users to subscribe to different stock symbols and receive streaming quotes every 500 milliseconds.
<!doctype html> <html lang="en"> <head> <meta charset="utf-8"> <title>OptionsHouse Streaming</title> <script src="/socket.io/socket.io.js"></script> <script> var socket = io.connect('ws://localhost:8080'); var lastSymbol; var lastQuote; socket.on('connect', function() { lastSymbol = 'AAPL'; socket.emit('subscribe-stock-quote', lastSymbol); }); socket.on('disconnect', function() { alert('remote server died'); }); socket.on('sequence',function(seq) { var seqDiv = document.getElementById('sequence'); seqDiv.innerHTML = seq; }); socket.on('receive-stock-quote',function(quote) { var company = document.getElementById('company'); var counter = document.getElementById('counter'); var bid = document.getElementById('bid'); var ask = document.getElementById('ask'); var bidColor = '<font color="black">' var askColor = '<font color="black">' if (typeof lastQuote !== "undefined") { if (quote.bid > lastQuote.bid) { bidColor = '<font color="green">' } else if (quote.bid < lastQuote.bid) { bidColor = '<font color="red">' } if (quote.ask > lastQuote.ask) { askColor = '<font color="green">' } else if (quote.ask < lastQuote.ask) { askColor = '<font color="red">' } } company.innerHTML = quote.symbol; counter.innerHTML = quote.counter; bid.innerHTML = bidColor + quote.bid + '</font>'; ask.innerHTML = askColor + quote.ask + '</font>'; lastQuote = quote; }); window.addEventListener('load',function() { document.getElementById('start').addEventListener('click', function() { if (typeof lastSymbol !== "undefined") { socket.emit('unsubscribe-stock-quote', lastSymbol); } var symbol = document.getElementById('symbol').value; socket.emit('subscribe-stock-quote', symbol); lastSymbol = symbol; }, false); document.getElementById('stop').addEventListener('click', function() { var symbol = document.getElementById('symbol').value; socket.emit('unsubscribe-stock-quote', symbol); }, false); }, false); </script> </head> <body> <div id="form"> Symbol: <input type="text" id="symbol" size="10" value="AAPL" /> <input type="button" id="start" value="Subscribe" /> <input type="button" id="stop" value="Unsubscribe" /> </div> <table width="500"> <tr> <th>Company</th> <th>Bid</th> <th>Ask</th> <th>Stock Counter</th> </tr> <tr> <td align="center"> <div id="company"></div> </td> <td align="center"> <div id="bid"></div> </td> <td align="center"> <div id="ask"></div> </td> <td align="center"> <div id="counter"></div> </td> </tr> </table> <p></p> Quote Server Loop: <span id="sequence"/> </body> </html>
Running the example
You can download all the code from My github account, e.g.
git clone https://github.com/bhatti/node-quote-server.git cd node-quote-server npm install node app
Then you can point your browser to http://localhost:8080 and start playing:
Summary
Node.js comes with good support for Websockets so it takes only few lines to build the quote server. I am still testing Node.js and Websockets with different browsers and simulating load with large number of clients and will post those results in future.