Shahzad Bhatti Welcome to my ramblings and rants!

September 6, 2012

Building a streaming quote server using Node.js and Websockets

Filed under: Javascript — admin @ 5:32 pm

A couple of years ago, I implemented a quote server using XMPP, Bosh, Strophe and Ejabbberd at work and described an overview of the design and code in my blog. We have been looking at Node.js and Websockets recently for streaming data so I will describe a simple implementation of quote server here using those two technologies:

Quote Server

The quote server uses Web sockets and listens for subscribe-stock-quote message to subscribe for a particular quote and unsubscribe-stock-quote message to unsubscribe.
Here is a simple implementation of the Quote server:

 var express = require('express')
    , sio = require('socket.io')
    , http = require('http')
    , quoteSubscriptions = require('quote_subscriptions')
    , app = express();
 
 var server = http.createServer(app);
 var sequence = 0;
 app.configure(function () {
    app.use(express.static(__dirname + '/public'));
    app.use(app.router);
 })
 
 var io = sio.listen(server);
 io.set('log level', 1);
 server.listen(8080);
 
 io.sockets.on('connection', function(socket) {
    // subscribe to stock quotes 
    socket.on('subscribe-stock-quote', function(symbol) {
       console.log('subscribe-stock-quote received');
       quoteSubscriptions.subscribe(socket, symbol, function() {
          socket.emit('subscribed-stock-quote', 'You have been subscribed to ' + symbol);
       });
    });
 
    // unsubscribe to stock quotes 
    socket.on('unsubscribe-stock-quote', function (symbol) {
       console.log('unsubscribe-stock-quote received');
       quoteSubscriptions.unsubscribe(socket, symbol, function() {
          socket.emit('unsubscribed-stock-quote received', 'You have been unsubscribed to ' + symbol);
       });
    });
 
    // disconnected
    socket.on('disconnect', function() {
       quoteSubscriptions.unsubscribeAll(socket, function() {
          socket.emit('disconnected', socket.symbol + ' has been unsubscribed from stock quotes.');
       });
    });
 });
 
 // update clients every 500 milli-seconds with latest stock quotes
 setInterval(function () {
    quoteSubscriptions.pushUpdates();
    io.sockets.emit('sequence', ++sequence);
 }, 500);
 
 
 

In addition to sending quote updates, the server also sends a loop counter so that we know how many times quotes have been pushed to clients and that message is broadcasted to all clients.

The subscriptions are managed in another module quote_subscriptions, i.e.,:

 ////////////////////////////////////////////////////////////////////////////////////////////
 //
 // This module maintains quote subscriptions for all clients 
 //
 ////////////////////////////////////////////////////////////////////////////////////////////
 var yclient = require('y_client');
 var stocksSubscribers = {};
 var quoteCache = {};
 exports.subscribe = function(socket, symbol, callback) {
    if (typeof socket.stockSymbols === "undefined") {
       socket.stockSymbols = [];
    }
    if (socket.stockSymbols.indexOf(symbol) == -1) {
       socket.stockSymbols.push(symbol);
    }
    //
    var subscribers = stocksSubscribers[symbol];
    if (typeof subscribers === "undefined") {
       subscribers = [];
       stocksSubscribers[symbol] = subscribers;
    }
    if (subscribers.indexOf(socket) == -1) {
       subscribers.push(socket);
    }
    callback();
 }
 
 exports.unsubscribeAll = function(socket, callback) {
    if (typeof socket.stockSymbols === "undefined") {
       return;
    }
    for (var i=0; i<socket.stockSymbols.length; i++) {
       this.unsubscribe(socket, socket.stockSymbols[i], function() {});
    }
    callback();
 }
 
 exports.unsubscribe = function(socket, symbol, callback) {
    if (typeof socket.stockSymbols === "undefined") {
       return;
    }
    var ndx = socket.stockSymbols.indexOf(symbol);
    if (ndx !== -1) {
       socket.stockSymbols.splice(ndx, 1);
    }
    //
    var subscribers = stocksSubscribers[symbol];
    if (typeof subscribers === "undefined") {
       return;
    }
    ndx = subscribers.indexOf(socket);
    if (ndx !== -1) {
       subscribers.splice(ndx, 1);
    }
    callback();
 }
 exports.pushUpdates = function() {
    for (var symbol in stocksSubscribers) {
       var sockets = stocksSubscribers[symbol];
       if (sockets.length == 0) {
          continue;
       }
       quote = quoteCache[symbol];
       if (typeof quote === "undefined") {
          yclient.quote(symbol, function(quote) {
             quoteCache[symbol] = quote;
             sendQuote(quote, sockets);
          });
       } else {
          sendQuote(quote, sockets);
       }
    }
 }
 
 
 function sendQuote(quote, sockets) {
    var rnd = Math.random();
    quote.counter = quote.counter + 1;
    quote.bid = round(rnd % 2 == 0 ? quote.original_bid + rnd :  quote.original_bid - rnd, 2);
    quote.ask = round(rnd % 2 == 0 ? quote.original_ask + rnd :  quote.original_ask - rnd, 2);
    for (var i=0; i<sockets.length; i++) {
       var socket = sockets[i];
       var address = socket.handshake.address;
       console.log('Quote sending ' + quote.symbol + ' ' + quote.counter + ' to ' + address.address + ":" + address.port);
       socket.volatile.emit('receive-stock-quote', quote);
    }
 }
 
 function round(num, dec) {
    return Math.round(num*Math.pow(10,dec))/Math.pow(10,dec);
 }
 
 
 

The above module defines methods to subscribe, unsubscribe and push quote updates to all clients. Note that above module requests quotes from Yahoo if it doesn’t exist in the cache and then randomly changes bid/ask values.

Yahoo Finance Client

I used Yahoo finance to retrieve the quotes, however these quotes are cached and modified by the quote subscription module:

 ///////////////////////////////////////////////////////////////////////////////////////////
 //
 // This module provides quote/chain apis using yahoo finance
 //    
 ////////////////////////////////////////////////////////////////////////////////////////////
 var http = require('http')
   , libxmljs = require('libxmljs')
   , util = require('util')
       
          
 exports.quote = function(symbol, callback) {
    api('download.finance.yahoo.com', '/d/quotes.csv?s=' + symbol + '&f=nb2b3ra2yj3e7gopjkl1xm3m4h', function(text) {
       cols = text.split(',');
       callback({'symbol':cols[0], 'ask':cols[1], 'bid':cols[2], 'counter':0, 'original_ask':cols[1], 'original_bid':cols[2]});    
    });
 }  
 
 function api(host, path, callback) {
    var options = {
        host: host,
        port: 80,
        path: path, 
        method: 'GET',
        headers: {'User-Agent': 'Node'}
    };
    var req = http.request(options, function(res) {
      res.setEncoding('utf8');
      var text = '';
      res.on('data', function (chunk) {
        text += chunk;
      });
      res.on('end', function (e) {
         text = text.replace(/"/g, '');
         callback(text);
      });
      res.on('error', function (e) {
        console.log('Could not send api with response : ' + e.message);
      });
    });
 
    req.on('error', function(e) {
      console.log('Could not send api with response : ' + e.message);
    });
 
    // write data to request body
    req.end();
 }
 
 

Web Client

Here is an example of Web client that allows users to subscribe to different stock symbols and receive streaming quotes every 500 milliseconds.

 <!doctype html> 
 <html lang="en">
    <head>
       <meta charset="utf-8"> 
       <title>OptionsHouse Streaming</title>
       <script src="/socket.io/socket.io.js"></script>
       <script> 
          var socket = io.connect('ws://localhost:8080'); 
          var lastSymbol;
          var lastQuote;
          socket.on('connect', function() { 
             lastSymbol = 'AAPL';
             socket.emit('subscribe-stock-quote', lastSymbol);
          });         socket.on('disconnect', function() {
             alert('remote server died');
          });
          socket.on('sequence',function(seq) { 
             var seqDiv = document.getElementById('sequence');
             seqDiv.innerHTML = seq;
          }); 
          socket.on('receive-stock-quote',function(quote) { 
             var company = document.getElementById('company');
             var counter = document.getElementById('counter');
             var bid = document.getElementById('bid');
             var ask = document.getElementById('ask');
             var bidColor = '<font color="black">'
             var askColor = '<font color="black">'
             if (typeof lastQuote !== "undefined") {
                if (quote.bid > lastQuote.bid) {
                   bidColor = '<font color="green">'
                } else if (quote.bid < lastQuote.bid) {
                   bidColor = '<font color="red">'
                }
                if (quote.ask > lastQuote.ask) {
                   askColor = '<font color="green">'
                } else if (quote.ask < lastQuote.ask) {
                   askColor = '<font color="red">'
                }
             }
             company.innerHTML = quote.symbol;
             counter.innerHTML = quote.counter;
             bid.innerHTML = bidColor + quote.bid + '</font>';
             ask.innerHTML = askColor + quote.ask + '</font>';
             lastQuote = quote;
          });
    
          window.addEventListener('load',function() { 
             document.getElementById('start').addEventListener('click',
             function() { 
                if (typeof lastSymbol !== "undefined") {
                   socket.emit('unsubscribe-stock-quote', lastSymbol);
                }
                var symbol = document.getElementById('symbol').value; 
                socket.emit('subscribe-stock-quote', symbol);
                lastSymbol = symbol;
             }, false); 
             document.getElementById('stop').addEventListener('click',
             function() { 
                var symbol = document.getElementById('symbol').value; 
                socket.emit('unsubscribe-stock-quote', symbol);
             }, false); 
          }, false);
       </script> 
    </head> 
    <body>
       <div id="form">
          Symbol: <input type="text" id="symbol" size="10" value="AAPL" />
          <input type="button" id="start" value="Subscribe" />
          <input type="button" id="stop" value="Unsubscribe" />
       </div>
       <table width="500">
          <tr>
             <th>Company</th>
             <th>Bid</th>
             <th>Ask</th>
             <th>Stock Counter</th>
          </tr>
          <tr>
             <td align="center"> <div id="company"></div> </td>
             <td align="center"> <div id="bid"></div> </td>
             <td align="center"> <div id="ask"></div> </td>
             <td align="center"> <div id="counter"></div> </td>
          </tr>
       </table>
       <p></p>
       Quote Server Loop: <span id="sequence"/>
    </body>
 </html>
 
 

Running the example

You can download all the code from My github account, e.g.

 git clone https://github.com/bhatti/node-quote-server.git  
 cd node-quote-server 
 npm install
 node app
 

Then you can point your browser to http://localhost:8080 and start playing:

Summary

Node.js comes with good support for Websockets so it takes only few lines to build the quote server. I am still testing Node.js and Websockets with different browsers and simulating load with large number of clients and will post those results in future.


No Comments »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

You must be logged in to post a comment.

Powered by WordPress