Shahzad Bhatti

September 24, 2012

Scaling Node.js

Filed under: Javascript — admin @ 12:22 pm

As I described in my earlier blog, I have been testing Node.js and Websockets for streaming data to large number of clients. I am evaluating Node.js and WebSockets in both pub/sub model such as clients connecting to receive streaming quotes and request/reply model where clients are invoking an API and waiting for reply. Below are some of my experience with those technologies so far:

Pub/Sub Server

The server code uses cluster module to provide multiprocessing support and each process starts a Websocket server and listens for incoming connections. I used socket.io library for WebSockets. The server keeps track of all clients that have subscribed and then send random data every 10-100 milli-seconds.
Here is basic implementation of the server:

 var MAX_THREADS = process.env.MAX_THREADS || 10;
 var MAX_ROWS = process.env.MAX_ROWS || 2;
 var nextClientId = 0;
 //
 var helper = require('helper')
 
 var pubsubController = require('pubsub_controller')(MAX_ROWS)
 var cluster = require('cluster');
 
 if (cluster.isMaster) {
    for (var i = 0; i < MAX_THREADS; i++) {
       cluster.fork();
    }
    cluster.on('exit', function(worker, code, signal) {
       console.log('worker ' + worker.process.pid + ' died');
    });
 } else {
    var socketsByAddr = {};
    var io = require('socket.io').listen(8124);
    io.set('log level', 1);
    io.set('transports', ['websocket']);
    io.set('force new connection', true);
    io.enable('browser client gzip');
 
    io.sockets.on('connection', function(socket) {
       socket.address = socket.handshake.address.address + ':' + socket.handshake.address.port;
       socket.key = socket.handshake.address.address + ':' + socket.handshake.address.port + ':' + (++nextClientId);
       socketsByAddr[socket.address] = true;
       pubsubController.subscribe(socket);
       socket.on('disconnect', function() {
          pubsubController.unsubscribe(socket);
          delete socketsByAddr[socket.address];
       });
 
       socket.on('end', function() {
       });
    });
 
    setInterval(function() {
       pubsubController.pushUpdates();
    }, helper.random(100) + 10);
 }
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
 });
 
 

Pub/Sub Client

The pub/sub client subscribes for data and receies data every 10-100 milli-seconds. The client code builds up 100 more connections every 15 seconds and it logs the response time information. On client side, I used socket.io-client library. The client calls 'subscribe-execs' to subscribe and then receive messages under 'receive-execs'. Upon receiving message, the client tracks various metrics such as response time, CPU, memory usage, etc. As, I am running both client and server on the same machine, it gives accurate representation of latency without network hops or clocks mismatch. Another thing to note is that the client passes 'force new connection' flag to force new socket for each client as by default all clients share same socket.

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 10;
 
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 hwUsage.start();
       
 var client = require('pubsub_client')(metrics, hwUsage);
 var clients = [];
 
 // create connections for this client
 var buildClients = function() {
    for (var i=0; i<100; i++) {
       var client = client.newClient(i+0);
       clients.push(client);
    }
    hwUsage.stop(function(usage) {
       console.log('clients,connected,', hwUsage.heading() + ',' + metrics.heading());
       console.log(clients.length + ',' + client.totalConnected() + ',' + usage.toString() + ',' + metrics.summary().toString());
    });
 };
 
 
 setInterval(buildClients, 15000);
 
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
 });
 
 

The actual client is defined as a module, e.g.:

 var io = require('socket.io-client');
 var helper = require('helper');
 var totalConnected = 0;
 
 module.exports = function(metrics, hwUsage) {
    var ExecClient = function(id) {
       this.id = id;
       this.client = io.connect('localhost', { port: 8124,
                               transports: ['websocket'],
                               'force new connection': true,
                               'sync on disconnect' : true,
                               'connect timeout': 500,
                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
    };
    
    // 
    ExecClient.prototype.setupConnectListener = function() {
       if (typeof this.client === "undefined") return;
       var that = this;
       this.client.on('connect_failed', function(error) {
       });
 
       this.client.on('connect', function() {
          totalConnected++;
       });
 
       this.client.on('disconnect', function() {
          totalConnected--;
       });
 
       this.client.on('reconnect_failed', function() {
          //process.exit(1);
       });
 
       this.client.on('end', function() {
          //process.exit(1);
       });
    };
 
    ExecClient.prototype.setupReceiveListener = function() {
       if (typeof this.client === "undefined") return;
       var that = this;
       this.client.on('receive-execs', function(msg) {
          console.log('receive');
          var metric = metrics.update(msg.address, msg.request, msg.timestamp, JSON.stringify(msg).length);
        });
    };
 
    return {
       totalConnected : function() {
          return totalConnected;
       },
       newClient: function(id) {
          var client = new ExecClient(id);
          client.setupConnectListener();
          client.setupReceiveListener();
          return client;
       }
    };
 }
 
 

Subscription management

Following module defines APIs for managing subscriptions and as I mentioned when number of messages received by client reaches maxMessages, it unsubscribes from the server.

 var helper = require('helper');
 var emitVolatile = typeof process.env.EMIT_VOLATILE !== "undefined" && process.env.EMIT_VOLATILE === 'true';
 
 module.exports = function(numExecs) {
    var execSubscribers = {};
    var counter = 0; 
    var generatePayload = function(address) {
       var execs = [];
       var date = new Date();
       for (var i=0; i<numExecs; i++) {
          execs.push({sequence:i, activityDateStr:date.toString(), com: helper.random(5),price: helper.random(100), accountId:1772139, symbol:"QQQ", transaction:"STO", description:"QQQ Stock", qty: helper.random(200), key: "QQQ:::S", netAmount: helper.random(1000)});
       }
       var payload = {request: counter, rows:numExecs, timestamp : date.getTime(), address:address, execs: execs};
       return payload;
    }
 
    return {
       subscribe: function(socket) {
          execSubscribers[socket.key] = socket;
       },
       unsubscribe: function(socket) {
          delete execSubscribers[socket.key];
       },
       count: function() {
          return helper.size(execSubscribers);
       },
       pushUpdates: function() {
          var count = 0;
          for (var addr in execSubscribers) {
             count++;
             try {
                var socket = execSubscribers[addr];
                //
                if (emitVolatile) {
                   socket.volatile.emit('receive-execs', generatePayload(addr));
                } else {
                   socket.emit('receive-execs', generatePayload(addr));
                }
                //
                if (typeof socket.numMessages === "undefined") {
                   socket.numMessages = 1;
                } else {
                   ++socket.numMessages;
                   //delete execSubscribers[socket.key];
                }
             } catch (err) {
                console.log('failed to send execution report' + err);
                try {
                   socket.disconnect();
                } catch (ex) {
                   console.log('failed to close socket ' + ex);
                }
                delete execSubscribers[socket.key];
             }
          }
          return count;
       }
    };
 } 
 

Hardware Measurement

Following module defines API for measuring CPU time, load average, memory, etc:

 var fs = require('fs');
 var os = require('os');
 var helper = require('helper');
 
 var HardwareUsage = function() {
    this.startCpuTime = 0;
    this.startLoadAvg = os.loadavg();
    this.startMemory = process.memoryUsage();
    this.endCpuTime = 0;
    this.endLoadAvg = 0;
    this.endMemory = 0;
    this.started = new Date().getTime();
    var that = this;
    this.cpuUsage (function(totalCpu) {
       that.startCpuTime = totalCpu;
    });
 }
 
 HardwareUsage.prototype.cpuUsage = function(callback) {
    if (!fs.existsSync("/proc/" + process.pid + "/stat")) {
       callback();
       return;
    }
    var that = this;
 
    fs.readFile("/proc/" + process.pid + "/stat", function(err, data){
       var elems = data.toString().split(' ');
       var utime = parseInt(elems[13]);
       var stime = parseInt(elems[14]);
       var totalCpu = utime + stime;
       callback(totalCpu);
    });
 };
 
 
 HardwareUsage.prototype.percCpuChange = function() {
    return helper.percentChange(this.startCpuTime, this.endCpuTime);
 }
 
 HardwareUsage.prototype.percLoadChange = function() {
    return helper.percentChange(this.startLoadAvg[0], this.endLoadAvg[0]);
 }
 
 HardwareUsage.prototype.percMemoryChange = function() {
    return helper.percentChange(this.startMemory.heapTotal, this.endMemory.heapUsed);
 }
 
 
 HardwareUsage.prototype.toString = function() {
    return helper.round(this.endCpuTime) + ',' + helper.round(this.endLoadAvg[0]) + ',' + helper.round(this.startMemory.heapTotal/1024.0/1024.0) + ',' + helper.round(this.endMemory.heapUsed/1024.0/1024.0);
 };
 
 HardwareUsage.prototype.stop = function(callback) {
    var that = this;
    this.cpuUsage (function(totalCpu) {
       that.endCpuTime = totalCpu;
       that.endMemory = process.memoryUsage();
       that.endLoadAvg = os.loadavg();
       that.elapsed = new Date().getTime() - that.started;
       callback(that);
    });
 };
 
 var usage = new HardwareUsage();
 
 exports.heading = function() {
    return 'cpu time, load avg, total memory (M), used memory(M)';
 };
 exports.start = function() {
    usage = new HardwareUsage();
 };
 
 exports.stop = function(callback) {
    usage.stop(callback);
 };
 

Response time Measurement

Following module defines API for measuring response time:

 var helper = require('helper');
    
 module.exports = function(store, outlierSize) {
    var MetricResultSummary = function(metric) {
       this.averageResponseTime = helper.round(metric.totalTime / metric.totalRequests);
       this.averageHighResponseTime = helper.round(helper.sum(metric.topTenHigh)/ metric.topTenHigh.length);
       this.averageLowResponseTime = helper.round(helper.sum(metric.topTenLow)/ metric.topTenLow.length);
       this.averageSize = helper.round(metric.totalPayloadSize / metric.totalRequests);
       this.messagesReceived = metric.messagesReceived;
       this.count = 0;
    }
    MetricResultSummary.prototype.toString = function() {
       return this.count + ',' + this.averageResponseTime + ',' + this.averageHighResponseTime + ',' + this.averageLowResponseTime + ',' + this.averageSize + ',' + this.messagesReceived;
    };
 
    var Metric = function() {
       this.totalTime = 0;
       this.totalRequests = 0;
       this.topTenHigh = [];
       this.topTenLow = [];
       this.totalPayloadSize = 0;
       this.messagesReceived = 0;
    }
 
    Metric.prototype.addTo = function(target) {
       target.totalTime += this.totalTime;
       target.totalRequests += this.totalRequests;
       for (var i=0; i<this.topTenHigh.length; i++) {
          target.topTenHigh.push(this.topTenHigh[i]);
       }
       for (var i=0; i<this.topTenLow.length; i++) {
          target.topTenLow.push(this.topTenLow[i]);
       }
       target.totalPayloadSize += this.totalPayloadSize;
       target.messagesReceived += this.messagesReceived;
    };
 
    Metric.prototype.update = function(id, sendTime, payloadSize) {
       var elapsed = new Date().getTime() - sendTime;
       for (var i=0; i<outlierSize; i++) {
          if (typeof this.topTenHigh[i] === "undefined" || elapsed > this.topTenHigh[i]) {
             this.topTenHigh[i] = elapsed;
             break;
          }
       }
       //
       for (var i=0; i<outlierSize; i++) {
          if (typeof this.topTenLow[i] === "undefined" || elapsed < this.topTenLow[i]) {
             this.topTenLow[i] = elapsed;
             break;
          }
       }
       this.messagesReceived++;
       this.totalTime += elapsed;
       this.totalPayloadSize += payloadSize;
       this.totalRequests++;
    };
    //
    Metric.prototype.averageResponse = function() {
       return helper.round(this.totalTime / this.totalRequests);
    };
    Metric.prototype.summary = function() {
       return new MetricResultSummary(this);
    };
    Metric.prototype.toString = function() {
       return new MetricResultSummary(this).toString();
    };
 
    var get = function(key) {
          var metric = store[key];
          if (typeof metric === "undefined") {
             metric = new Metric();
             store[key] = metric;
          }
          return metric;
    };
    return {
       get: function(key) {
          return get(key);
       },
       update: function(key, id, sendTime, payloadSize) {
          var metric = get(key);
          metric.update(id, sendTime, payloadSize);
          return metric;
       },
       summaryFor: function(key) {
          var metric = get(key);
          return metric.summary();
       },
       heading: function() {
          return 'count, average response (ms), average outlier high response (ms), average outlier low response (ms), average message size, messages received';
       },
       summary: function() {
          var result = new Metric();
          var count = 0;
          for (var key in store) {
             var metric = store[key];
             metric.addTo(result);
             count++;
          }
          var summary = result.summary();
          summary.count = count;
          return summary;
       }
    };
 }
 
 

Request/Reply Server

The request/reply server also uses cluster module and listens for Websocket port on each process. Upon receiving order-request API, it sends a reply to the client. Here is the implementation of server:

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 2;
 var MAX_THREADS = process.env.MAX_THREADS || 20;
 var helper = require('helper')
    
 var cluster = require('cluster');
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 hwUsage.start();
 var socketsByAddr = {};
 var totalClients = 0;
 
 
 if (cluster.isMaster) {
    for (var i = 0; i < MAX_THREADS; i++) {
       cluster.fork();
    }  
    cluster.on('exit', function(worker, code, signal) {
       console.log('worker ' + worker.process.pid + ' died');
    });
 } else {
    //
    var io = require('socket.io').listen(8124);
    io.set('log level', 1); 
    io.set('transports', ['websocket']);
    io.set('force new connection', true);
    io.enable('browser client gzip');
    
    io.sockets.on('connection', function(socket) {
       totalClients++;
       socket.address = socket.handshake.address.address + ':' + socket.handshake.address.port;
       socketsByAddr[socket.address] = true;
       socket.on('order-request', function(request) {
          var response = {request: request.counter, timestamp : request.timestamp, address:request.address};
          metrics.update(request.address, request.request, request.timestamp, JSON.stringify(request).length);
          socket.emit('order-response', response);
       });
       
       // disconnected
       socket.on('disconnect', function() {
          totalClients--;
          delete socketsByAddr[socket.address];
       });
       
       socket.on('end', function() {
       });
    });
    });
 }
 
 
 setInterval(function() {
    hwUsage.stop(function(usage) {
      console.log('sockets, clients, ' + hwUsage.heading() + ',' + metrics.heading());
      console.log(helper.size(socketsByAddr) + ',' + totalClients + ',' + usage.toString() + ',' + metrics.summary().toString());
    });
 }, 15000);
 
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
 });
 
 

Request/Reply Client

The request/reply client is similar to pub/sub model except it initiates order-request API every 50-250 milli-seconds. It also creates new connections every 15 seconds and logs response time continuously.

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 10;
 var INTERVAL_BETWEEN_NEW_CLIENTS_SECS = (process.env.INTERVAL_BETWEEN_NEW_CLIENTS_SECS || 15) * 1000;
 
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 hwUsage.start();
 
 var order_client = require('order_client')(metrics, hwUsage);
 // create connections for this client
 var totalConnections = 0;
 var buildClients = function() {
    for (var i=0; i<100; i++) {
       var client = order_client.newClient();
    }
 };
 
 
 setInterval(buildClients, INTERVAL_BETWEEN_NEW_CLIENTS_SECS);
 
 setInterval(function() {
     order_client.log();
 }, INTERVAL_BETWEEN_NEW_CLIENTS_SECS);
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
   console.trace('stack trace');
   //process.exit(1);
 });
 

The actual client is defined as module, e.g.

 var io = require('socket.io-client');
 var helper = require('helper');
 var MAX_ROWS = process.env.MAX_ROWS || 10;
 
 var nextClientId = 0;
 var nextRequestId = 0;
 var totalConnected = 0;
 module.exports = function(metrics, hwUsage) {
    var OrderClient = function() {
       this.id = ++nextClientId;
       this.connected = false;
       this.client = io.connect('localhost', { port: 8124,
                               transports: ['websocket'],
                               'force new connection': true,
                               'sync on disconnect' : true,
                               'connect timeout': 500,
                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
    };
 
    //
    OrderClient.prototype.setupConnectListener = function() {
       var that = this;
       this.client.on('connect_failed', function(error) {
          this.connected = false;
       });
    
       this.client.on('connect', function() {
          this.connected = true;
          totalConnected++;
       });
 
       this.client.on('disconnect', function() {
          this.connected = false;
          totalConnected--;
       });
    
       this.client.on('reconnect_failed', function() {
          this.connected = false;
       });
       
       this.client.on('end', function() {
          this.connected = false; 
       });
    };    
          
       
    OrderClient.prototype.sendRequest = function(max) {
       var execs = [];
       var date = new Date();
       for (var i=0; i<max; i++) {
          execs.push({sequence:i, activityDateStr:date.toString(), com: helper.random(5),price: helper.random(100), accountId:1772139, symbol:"QQQ", transaction:"STO", description:"QQQ Stock", qty: helper.random(200), key: "QQQ:::S", netAmount: helper.random(1000)});
       }
       var request = {request: ++nextRequestId, rows:max, timestamp : date.getTime(), address:this.id, execs: execs};
       this.client.emit('order-request', request);
    }; 
    OrderClient.prototype.setupResponseListener = function() {
       var that = this;
       this.client.on('order-response', function(response) {
          var metric = metrics.update(response.address, response.request, response.timestamp, JSON.stringify(response).length);
        });
    };
 
    return {
       log: function() {
          hwUsage.stop(function(usage) {
             console.log('clients,connected,' + hwUsage.heading() + ',' + metrics.heading());
             console.log(nextClientId + ',' + totalConnected + ',' + usage.toString() + ',' + metrics.summary().toString());
          });
       },
       newClient: function() {
          var client = new OrderClient();
          client.setupConnectListener();
          client.setupResponseListener();
          setInterval(function() {
             client.sendRequest(helper.random(MAX_ROWS));
          }, helper.random(250) + 50);
          return client;
       }
    };
 }
 
 

First blocker

I started testing on my Linux machine and saw RangeError: Maximum call stack size exceeded after server reached about 2000 connections. It turned out JSON library on socket.io went into recursion when it received messages while it's parsing existing message. I found a patch at Fix infinite recursion in Websocket parsers and manually applied it as I didn't see it on the latest version socket.io library 0.8.9. After applying it, I was able to make some progress.

Second blocker

After reaching about 10,000 connections I started seeing Cannot call method 'packet' of null coming from the socket.js. At the time, I was using default configuration for Websockets and apparently it was falling back to xhr-polling under heavy load (See open bug for it). I changed configuration to explicitly defined websocket protocol without any fallback, e.g.

 io.set('transports', ['websocket']);
 

I also made some changes to network settings on my Linux machine as recommended by Node.js w/1M concurrent connections!.

 net.core.rmem_max = 33554432
 net.core.wmem_max = 33554432
 net.ipv4.tcp_rmem = 4096 16384 33554432
 net.ipv4.tcp_wmem = 4096 16384 33554432
 net.ipv4.tcp_mem = 786432 1048576 26777216
 net.ipv4.tcp_max_tw_buckets = 360000
 net.core.netdev_max_backlog = 2500
 vm.min_free_kbytes = 65536
 vm.swappiness = 0
 net.ipv4.ip_local_port_range = 1024 65535
 

I then applied above changes using:

 sudo sysctl -w net.core.somaxconn=10000
 sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000 
 sudo sysctl -p
 

Third blocker

Above changes got me to about 17,000 connections but then I started seeing connection timeout on the client side and warn: client not handshaken client should reconnect on the server side. are two open bugs: first and second for it. I added reconnect parameters to client connection, e.g.

                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
 

Fourth blocker

Under heavy load, I also saw "Error: not opened" coming from WebSocket library, e.g.

     at WebSocket.send (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/node_modules/ws/lib/WebSocket.js:175:16)
     at Transport.WS.send (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transports/websocket.js:107:22)
     at Transport.packet (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:178:10)
     at Transport.WS.payload (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transports/websocket.js:120:12)
     at Socket.flushBuffer (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/socket.js:327:20)
     at Socket.setBuffer (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/socket.js:314:14)
     at Socket.onConnect (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/socket.js:409:14)
     at Transport.onConnect (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:139:17)
     at Transport.onPacket (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:91:12)
     at Transport.onData (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:69:16)
 

I chose to ignore it as Websocket is attempting to send data when connection is closed and client will just reconnect.

Summary

I am still evaluating Node.js and I have not decided if I would recommend Node.js for streaming solution. I will be comparing these results against some Java solutions such as Atomosphere and Vertx.io and will post those results in future.


September 17, 2012

Tips from Unusually Excellent: The Necessary Nine Skills Required for the Practice of Great Leadership

Filed under: Business — admin @ 2:17 pm

I recently read Unusually Excellent: The Necessary Nine Skills Required for the Practice of Great Leadership. Here are a few tips I enjoyed from the book:

Credibility

Earning the Right to Lead Through Character

This book shows that in order to gain credibility, you need to be authentic, trustworthy, and have character traits such as courage, integrity, and commitment:

  • Being Authentic
  • Look at Life: Seeing Who You Are
  • Owning Your Past: The Sting of Failure – Adversity demands more of us than normal times do.
  • One Day at a time – An Unexpectedly Bad Day
  • Share the Shame – You can share a couple of your past disappointments with your team mates to connect with them on a personal level
  • Face Time – Meet face to face to build personal relationships
  • The Perception Gap – Get feedback on how others see you
  • The Courage to Listen
  • Honest Feedback – great leaders don’t avoid conflict and give honest feedback but at the same time be authentic and professional.

Being Trustworthy

The book shows that great leaders build a track record of honesty, fairness, and integrity that creates a leadership “equity” within their constituency. Trustworthiness takes precedence over heavyweight attributes like creativity and intelligence.

  • Safely Successful – physical, emotional and professional safety is primarl need.
  • Be honest – match your actions with your words and match those words with the truth we see in the world (no spin).
  • Be vulnerable – showing your weakness or raw emotion
  • Be fair
  • A better place for all – The book recommends building trusted interpersonal relationships that have commitments to work and loyality. On the other hand fear inspires defensive behavior, which leaders can eliminate fear by being transparent, crystal clear, and integrity.
  • A Culture of Trust Is a Culture of Truth – One reason people within enterprises fear telling the truth to each other and to their bosses is that they know the organization cannot properly distinguish between the message and the messenger.
  • Bad News Doesn’t Swim Upstream
  • A Culture of Trust Is a Culture of Innovation – Trust is the basis of safety. Create trust, and you’ll create a safe place to take risks and in turn build culture of innovation. The organizations should not punish “good failure”
  • A Culture of Trust Is a Culture of Performance – you should never punish a good person for delivering bad news—or even, on occasion, bad work.
  • Take Your Pain Quickly and Acutely—and Move On

Being Compelling – Commitment to Winning

The book shows that great leaders evoke the emotion and energy of being involved in a crusade. No one will sacrifice for a project if the leader hasn’t made a full and clear—and public—commitment. Great leaders don’t want to be merely an employee instead they want to be part of a team, working together to create something important.

  • Choice and Obligation – The best, most talented followers are really volunteers, and because of those very attributes they are often in considerable demand elsewhere.
  • Attracting the Best and Brightest – Great leaders engage and listen to people.
  • Keeping Your Best on Board
  • Cheerleader
  • Tell Me the Truth – the best people actually find reality, even if it is bad news, compelling.
  • Keep Me Challenged – Talented people want and need challenging work.
  • No Hard Feelings – leaders must be able to stand in their followers’ shoes and see themselves from that viewpoint.

Competence: Leading on the Field with Skill

  • Leading People Talent to Teams – Hiring great people is arguably the highest-leverage activity that leaders undertake.
  • Seating Chart – talent is useless if the person is not a good match with that role and responsibility and a specific place in the structure of the organization.
  • People First – hire the very best people; only then should you focus on building the right plan for the organization.
  • Engagement – engage people by setting realistic goals with them and fairly rewarding them for meeting or exceeding those expectations.
  • Enrollment
  • Expectations
  • Energy – functional, emotional and career energy.
  • Empowerment – delegate power to other people
  • Retreat to Attack
  • How Has the Nature of Your Enterprise Changed? – As a leader, if you have not prepared your people for that change or you resist that change, you have failed in your responsibilities.
  • Where Is Your Authority or Positional Power Best Used in Leading People? – By carefully setting performance expectations with your key team members, you move the whole game up a notch.
  • What Is Your Plan to Deal with Your Weakest Link? – being aware of poorly performing subordinate and acting on it instead of avoiding it.
  • Will You Distinguish the Bad Performer from a Bad Plan? – think like a venture capitalist, with your project leaders as the entrepreneurs and the project itself a new venture. Have a post-mortem and inquire why project failed.

Leading Strategy – ideas to plans

Leaders need to distinct between leading people, strategy and execution.

  • Process to Plans – plan shows what needs to be done, where as trategy is bigger than plan and includes how things are done and fallback options.
  • The Process: Inclusive and Collaborative – The process must include the best people and the best ideas, from both within and outside the company, and must foster collaborative thinking and constructive, rigorous discussion.
  • Winnowing Out a Plan – solicit ideas from others when you don’t know the domain
  • The Plan: Realistic and Compelling – The book shows that leaders need to be engaged throughout the process to make sure the process moves along with appropriate energy and that the team remains realistic in terms of time, resources, and goals.
  • Stickiness – commitment in the face of adversity

Leading Execution – actions to results

Execution is about results. Leaders need to distinct between leading people, strategy and execution. Execution provides feedback that can be measured against plans.

  • Solve the Hard Problems First – don’t distract yourself with second-tier tasks
  • At the Edges – In order to build high-reliability organizations (e.g. SWAT), you need zero tolerance team execution, which require:
    • reliable communications
    • continuous training
    • standardize and synchronize
    • mission-goal clarity and loyality
    • empower the front line
    • redundancy
  • Leadership Leverage in Execution – The book suggests leading the process and setting the standards for the right goals. This includes leading the design process to create the appropriate metrics, ensuring a winner’s commitment and making sure that attitude permeates the culture.
  • Curb Your Enthusiasm: Focus, Commit, and Deliver – don’t overcommit and follow the rule of “first things first.”
  • It Isn’t Real If You Don’t Measure It – Measuring what matters is an extremely high-leverage opportunity. Use management by objectives (MBO), “as measured by” (AMB) or a key performance indicator (KPI) processes for measureing factors that correlate very highly with winning.
  • Let the Dashboard Drive – Measuring what matters to naturally direct attention, focus, and commitment to the right activities
  • It’s Just Like Pinball: If You Win, You Get to Play Again
    • Winner’s mindset
    • Failing elgantly – No lame excuses
  • Sloppiness – HRO never allow sloppiness because they know it equals death. The book shows that leaders may
    feel like part of being a nice guy, succumbing to that temptation promotes a culture of mediocrity.

  • Performance Feedback – look for data coming back from the field.

Consequence: Creating a Culture, Leaving a Legacy of Values

Trust is the most fragile of assets; at a certain point, different in every situation.

Legacy = Culture + Reputation

A Leader’s Communication

  • Open, Honest Dialogue – The book shows that the ability of leaders to communicate effectively is highest leverage activity in their set of responsibilities and should include:
    • What are we doing? (Vision and mission.)
    • Why are we doing it? (Purpose and goals.)
    • What’s the plan to win?
    • (What’s the strategy here?)
    • How are we doing? (Results and status—health of the business.)
    • What is my part in the game? (What do you expect from me?)
    • What’s in it for me? (Why is this a compelling place for me to be?)
    • How am I doing? (Give me feedback, acknowledgment, appreciation.)

Talking Trust

In order to build trust, leaders not only need to focus on contents but also emotional content of that message and the
connection—the leader’s empathy with the audience.

Checklists and Guideposts

Here are some key points from the book:

  • communication is a core responsibility of leading
  • most of the important things in organizations are the result of the right conversation
  • starving followers from basic information will result in high cost

Here are five C’s for What question leader needs to communicate:

  • A compelling cause
  • Credibility & Competence
  • Character
  • Commitment
  • Contribution

Here are five E’s for Why question leader needs to communicate:

  • Engagement
  • Enrollment
  • Energy
  • Empowerment
  • Endorsement

Here are six C’s for When question leader needs to communicate:

  • Context
  • Confidence
  • Challenge
  • Collaboration
  • Culture
  • Coaching

Here are seven C’s for How question leader needs to communicate:

  • Clarity
  • Consistency
  • Carefulness
  • Courage
  • Conviction
  • Compassion
  • Completion

The Solitary Touch

The book shows that there is really no such thing as a “casual” conversation.

Your 24 × 7 Job

The book shows leader has three basic tasks:

  • Align the interests, energy, and commitment of the team.
  • Reduce fear, confusion, and anxiety.
  • Instill confidence and trust, while rallying support and contributions.

A Leader’s Decision Making Values-Based Choices

The book shows that leader does not need to make most of the decisions, but need to help followers make make better decisions.

Decision Structure

  • What Exactly Are We Deciding?
  • What Flavor Is This Decision? – decisions can be classified as either simple or complex. You need sufficient data to make the decision, otherwise you have to use intuition. Decisions can also be characterized as easy or difficult.
  • When Does This Decision Need to Be Made?

    Total Cycle Time = Time to Decide + Time to Commit + Time to Execute

  • Who Should Make This Decision? – who is best equipped—by skill, experience, proximity
  • Don’t Wait; Decide
  • Chasing Decisions – communicate with followers and empower them to make decisions

A Leader’s Impact The Transfer of Influence from Leader to Follower

Finally, the book shows how to build lasting legacy and reputation:

  • Leader Taking the High Ground
  • Whisper Campaign – use public forums to acknowledge accomplishments, sacrifices and courage. Also, appreciate them in private.
  • All You Leave Behind – using exit interviews to get feedback
  • Collective Memory
  • What to Do
  • Pay attention to change
  • Get More Curious, and Smarter, About Human
  • Nature – leaders tend to gravitate toward the objective and away from the subjective.
  • Give feedback
  • Celebrate success
  • Respect Life Outside of Work
  • Your Greatest Legacy


September 6, 2012

Building a streaming quote server using Node.js and Websockets

Filed under: Javascript — admin @ 5:32 pm

A couple of years ago, I implemented a quote server using XMPP, Bosh, Strophe and Ejabbberd at work and described an overview of the design and code in my blog. We have been looking at Node.js and Websockets recently for streaming data so I will describe a simple implementation of quote server here using those two technologies:

Quote Server

The quote server uses Web sockets and listens for subscribe-stock-quote message to subscribe for a particular quote and unsubscribe-stock-quote message to unsubscribe.
Here is a simple implementation of the Quote server:

 var express = require('express')
    , sio = require('socket.io')
    , http = require('http')
    , quoteSubscriptions = require('quote_subscriptions')
    , app = express();
 
 var server = http.createServer(app);
 var sequence = 0;
 app.configure(function () {
    app.use(express.static(__dirname + '/public'));
    app.use(app.router);
 })
 
 var io = sio.listen(server);
 io.set('log level', 1);
 server.listen(8080);
 
 io.sockets.on('connection', function(socket) {
    // subscribe to stock quotes 
    socket.on('subscribe-stock-quote', function(symbol) {
       console.log('subscribe-stock-quote received');
       quoteSubscriptions.subscribe(socket, symbol, function() {
          socket.emit('subscribed-stock-quote', 'You have been subscribed to ' + symbol);
       });
    });
 
    // unsubscribe to stock quotes 
    socket.on('unsubscribe-stock-quote', function (symbol) {
       console.log('unsubscribe-stock-quote received');
       quoteSubscriptions.unsubscribe(socket, symbol, function() {
          socket.emit('unsubscribed-stock-quote received', 'You have been unsubscribed to ' + symbol);
       });
    });
 
    // disconnected
    socket.on('disconnect', function() {
       quoteSubscriptions.unsubscribeAll(socket, function() {
          socket.emit('disconnected', socket.symbol + ' has been unsubscribed from stock quotes.');
       });
    });
 });
 
 // update clients every 500 milli-seconds with latest stock quotes
 setInterval(function () {
    quoteSubscriptions.pushUpdates();
    io.sockets.emit('sequence', ++sequence);
 }, 500);
 
 
 

In addition to sending quote updates, the server also sends a loop counter so that we know how many times quotes have been pushed to clients and that message is broadcasted to all clients.

The subscriptions are managed in another module quote_subscriptions, i.e.,:

 ////////////////////////////////////////////////////////////////////////////////////////////
 //
 // This module maintains quote subscriptions for all clients 
 //
 ////////////////////////////////////////////////////////////////////////////////////////////
 var yclient = require('y_client');
 var stocksSubscribers = {};
 var quoteCache = {};
 exports.subscribe = function(socket, symbol, callback) {
    if (typeof socket.stockSymbols === "undefined") {
       socket.stockSymbols = [];
    }
    if (socket.stockSymbols.indexOf(symbol) == -1) {
       socket.stockSymbols.push(symbol);
    }
    //
    var subscribers = stocksSubscribers[symbol];
    if (typeof subscribers === "undefined") {
       subscribers = [];
       stocksSubscribers[symbol] = subscribers;
    }
    if (subscribers.indexOf(socket) == -1) {
       subscribers.push(socket);
    }
    callback();
 }
 
 exports.unsubscribeAll = function(socket, callback) {
    if (typeof socket.stockSymbols === "undefined") {
       return;
    }
    for (var i=0; i<socket.stockSymbols.length; i++) {
       this.unsubscribe(socket, socket.stockSymbols[i], function() {});
    }
    callback();
 }
 
 exports.unsubscribe = function(socket, symbol, callback) {
    if (typeof socket.stockSymbols === "undefined") {
       return;
    }
    var ndx = socket.stockSymbols.indexOf(symbol);
    if (ndx !== -1) {
       socket.stockSymbols.splice(ndx, 1);
    }
    //
    var subscribers = stocksSubscribers[symbol];
    if (typeof subscribers === "undefined") {
       return;
    }
    ndx = subscribers.indexOf(socket);
    if (ndx !== -1) {
       subscribers.splice(ndx, 1);
    }
    callback();
 }
 exports.pushUpdates = function() {
    for (var symbol in stocksSubscribers) {
       var sockets = stocksSubscribers[symbol];
       if (sockets.length == 0) {
          continue;
       }
       quote = quoteCache[symbol];
       if (typeof quote === "undefined") {
          yclient.quote(symbol, function(quote) {
             quoteCache[symbol] = quote;
             sendQuote(quote, sockets);
          });
       } else {
          sendQuote(quote, sockets);
       }
    }
 }
 
 
 function sendQuote(quote, sockets) {
    var rnd = Math.random();
    quote.counter = quote.counter + 1;
    quote.bid = round(rnd % 2 == 0 ? quote.original_bid + rnd :  quote.original_bid - rnd, 2);
    quote.ask = round(rnd % 2 == 0 ? quote.original_ask + rnd :  quote.original_ask - rnd, 2);
    for (var i=0; i<sockets.length; i++) {
       var socket = sockets[i];
       var address = socket.handshake.address;
       console.log('Quote sending ' + quote.symbol + ' ' + quote.counter + ' to ' + address.address + ":" + address.port);
       socket.volatile.emit('receive-stock-quote', quote);
    }
 }
 
 function round(num, dec) {
    return Math.round(num*Math.pow(10,dec))/Math.pow(10,dec);
 }
 
 
 

The above module defines methods to subscribe, unsubscribe and push quote updates to all clients. Note that above module requests quotes from Yahoo if it doesn’t exist in the cache and then randomly changes bid/ask values.

Yahoo Finance Client

I used Yahoo finance to retrieve the quotes, however these quotes are cached and modified by the quote subscription module:

 ///////////////////////////////////////////////////////////////////////////////////////////
 //
 // This module provides quote/chain apis using yahoo finance
 //    
 ////////////////////////////////////////////////////////////////////////////////////////////
 var http = require('http')
   , libxmljs = require('libxmljs')
   , util = require('util')
       
          
 exports.quote = function(symbol, callback) {
    api('download.finance.yahoo.com', '/d/quotes.csv?s=' + symbol + '&f=nb2b3ra2yj3e7gopjkl1xm3m4h', function(text) {
       cols = text.split(',');
       callback({'symbol':cols[0], 'ask':cols[1], 'bid':cols[2], 'counter':0, 'original_ask':cols[1], 'original_bid':cols[2]});    
    });
 }  
 
 function api(host, path, callback) {
    var options = {
        host: host,
        port: 80,
        path: path, 
        method: 'GET',
        headers: {'User-Agent': 'Node'}
    };
    var req = http.request(options, function(res) {
      res.setEncoding('utf8');
      var text = '';
      res.on('data', function (chunk) {
        text += chunk;
      });
      res.on('end', function (e) {
         text = text.replace(/"/g, '');
         callback(text);
      });
      res.on('error', function (e) {
        console.log('Could not send api with response : ' + e.message);
      });
    });
 
    req.on('error', function(e) {
      console.log('Could not send api with response : ' + e.message);
    });
 
    // write data to request body
    req.end();
 }
 
 

Web Client

Here is an example of Web client that allows users to subscribe to different stock symbols and receive streaming quotes every 500 milliseconds.

 <!doctype html> 
 <html lang="en">
    <head>
       <meta charset="utf-8"> 
       <title>OptionsHouse Streaming</title>
       <script src="/socket.io/socket.io.js"></script>
       <script> 
          var socket = io.connect('ws://localhost:8080'); 
          var lastSymbol;
          var lastQuote;
          socket.on('connect', function() { 
             lastSymbol = 'AAPL';
             socket.emit('subscribe-stock-quote', lastSymbol);
          });         socket.on('disconnect', function() {
             alert('remote server died');
          });
          socket.on('sequence',function(seq) { 
             var seqDiv = document.getElementById('sequence');
             seqDiv.innerHTML = seq;
          }); 
          socket.on('receive-stock-quote',function(quote) { 
             var company = document.getElementById('company');
             var counter = document.getElementById('counter');
             var bid = document.getElementById('bid');
             var ask = document.getElementById('ask');
             var bidColor = '<font color="black">'
             var askColor = '<font color="black">'
             if (typeof lastQuote !== "undefined") {
                if (quote.bid > lastQuote.bid) {
                   bidColor = '<font color="green">'
                } else if (quote.bid < lastQuote.bid) {
                   bidColor = '<font color="red">'
                }
                if (quote.ask > lastQuote.ask) {
                   askColor = '<font color="green">'
                } else if (quote.ask < lastQuote.ask) {
                   askColor = '<font color="red">'
                }
             }
             company.innerHTML = quote.symbol;
             counter.innerHTML = quote.counter;
             bid.innerHTML = bidColor + quote.bid + '</font>';
             ask.innerHTML = askColor + quote.ask + '</font>';
             lastQuote = quote;
          });
    
          window.addEventListener('load',function() { 
             document.getElementById('start').addEventListener('click',
             function() { 
                if (typeof lastSymbol !== "undefined") {
                   socket.emit('unsubscribe-stock-quote', lastSymbol);
                }
                var symbol = document.getElementById('symbol').value; 
                socket.emit('subscribe-stock-quote', symbol);
                lastSymbol = symbol;
             }, false); 
             document.getElementById('stop').addEventListener('click',
             function() { 
                var symbol = document.getElementById('symbol').value; 
                socket.emit('unsubscribe-stock-quote', symbol);
             }, false); 
          }, false);
       </script> 
    </head> 
    <body>
       <div id="form">
          Symbol: <input type="text" id="symbol" size="10" value="AAPL" />
          <input type="button" id="start" value="Subscribe" />
          <input type="button" id="stop" value="Unsubscribe" />
       </div>
       <table width="500">
          <tr>
             <th>Company</th>
             <th>Bid</th>
             <th>Ask</th>
             <th>Stock Counter</th>
          </tr>
          <tr>
             <td align="center"> <div id="company"></div> </td>
             <td align="center"> <div id="bid"></div> </td>
             <td align="center"> <div id="ask"></div> </td>
             <td align="center"> <div id="counter"></div> </td>
          </tr>
       </table>
       <p></p>
       Quote Server Loop: <span id="sequence"/>
    </body>
 </html>
 
 

Running the example

You can download all the code from My github account, e.g.

 git clone https://github.com/bhatti/node-quote-server.git  
 cd node-quote-server 
 npm install
 node app
 

Then you can point your browser to http://localhost:8080 and start playing:

Summary

Node.js comes with good support for Websockets so it takes only few lines to build the quote server. I am still testing Node.js and Websockets with different browsers and simulating load with large number of clients and will post those results in future.


Powered by WordPress