Shahzad Bhatti

January 15, 2017

Review of “Whiplash: How to Survive Our Faster Future”

Filed under: Future,Technology — admin @ 4:38 pm

I read “Whiplash: How to Survive Our Faster Future” by Jai Ito and Jeff Howe over the holidays. Joi Ito is a director of the MIT Media Lab. The MIT Media Lab was created by Nicholas Negroponte in 1985 to build an environment where best ideas from schools of arts and science can be married to build next revolutionary discoveries.

This book narrates anecdotes of how technology revolutionized human development in past and how it continues to disrupt our lives today. As a consequence of Moor’s law and the Internet, technology is changing at an exponential speed. In such rapidly changing environments, this book provides key lessons that can be used to prepare us for uncertain future and paradigm shifts. In such exponential times, the invention of new technologies far outpaces the moral and ethical consequences of those breakthroughs. As technologies can be used for both good and bad, they offer both salvage and demise of humanity.

Here are primary principles that authors present to shape the new world:

Emergence over Authority:

The invention of Internet has facilitated communication and collaboration all over the world, where best ideas can be easily shared and exchanged. As a result, institutes that had central authorities are disintegrating. The authors present several examples of Emergence vs Authority such as Blogs vs Newspapers, Wikipedia vs Encyclopedia and central governments vs social networks based political revolutions.
In emergent systems, participants use simple rules and exchange information to build complex systems. Examples of emergent systems are ant colony, slime mold, brain, flocking birds, stock exchanges, biology, etc. The authoritarian systems enable incremental changes whereas emergent systems are more adoptive and foster non-linear progress.

Pull Over Push:

Push-based systems control their access whereas pull based use transparency and two-way communication and are able to cope with the crisis far better than push based systems. The authors recited an example of pull-based when meltdown of Fukushima nuclear plant occurred as a result of severe earthquake and fourteen feet tsunami. Joi and a team of volunteers across the world collaborate and built Geiger counters to take accurate readings of radiation. Other examples of pull based systems are crowdfunding and crowdsourcing.

Compasses over Maps:

A map has detailed knowledge and an optimal route whereas compass offers more autonomy and offer more flexibility in an unpredictable environment. The authors stated examples of the education system where standardized tests and curriculum deprive students of creativity and passion for learning. They used the culture of Media Lab as an illustration where the vision is based on compass heading. It provides a framework for individual progress leaving flexibility for interactions between groups.

Risk over Safety:

Traditional businesses are more risk averse where new ventures are thoroughly analyzed and they spend millions in studies. However, the cost of experimenting new ideas has drastically been reduced in today’s market and it offers much better return on investment.

Disobedience over Compliance:

Innovation requires creativity and breaking rules so a high-impact institutes require a culture of disobedience. It needs a culture where criticism and diverse ideas are embraced.

Practice over Theory:

Due to low-cost of launching new products, an innovating organization requires a culture where experiments are valued more than detailed planning.

Diversity over Ability:

The authors provided lessons from biochemistry companies that used gamers to design protein molecules. They used gamers with diverse background and those gamers had better pattern recognition than the biochemists with PhD. Most organizations believe in diversity, but most organizations lack diversity especially in high-tech companies such as Facebook, Yahoo and Google.

Resilience over Strength:

Resilient organizations are like the immune system that can successfully recover from failures. The authors gave examples of cyber-security where there are threats from various sources and successful defense requires treating security systems as biological systems and building strong immune systems against those security risks.

Systems over Objects:

Systems over objects emphasize understanding the connections between people, communities, and the environment. Instead of optimizing an individual or an organization, we need to optimize the impact of innovations on an entire natural system.

Conclusion:

In this final chapter, authors gave examples AI and machine learning where deep learning and reinforced learning has allowed machines to beat human experts in Chess and Go. The authors cite “The Singularity is Near” by Ray Kuzweil, who predicts that we will have intelligent explosion by 2045. In this world, we will have to think about how humans and machines will work together.

November 15, 2016

Tips from “Algorithms to Live By”

Filed under: Algorithms,Computing — admin @ 10:51 pm

The “Algorithms to Live By” by Brian Christian and Tom Griffiths describes computer algorithms from several domains and illustrates practical examples for applying those algorithms in real-life problems. Here is a list of some of those algorithms that I found very useful:

1. Optimal Stopping

This class of problems determines the optimal time to stop further processing when searching or selecting an option. Here are a few examples:

Secretary Hiring Problem
This is a famous math problem, which was defined by a mathematician named Merril Flood based on “Look-Then-Leap-Rule” to find the best candidate by waiting until you review 37% of the candidates and then hiring the candidate who is better than all of the past candidates. There are several other applications of this algorithm such as finding a life partner or apartment hunting. This problem assumes that you cannot go back to the previous candidate once you reject but there are other variations of this algorithm that allow it in case the selected candidate rejects your offer.

Selling a House
When selling a house, you need to determine the range of expected offers and cost of waiting for the best offer.

Finding a Parking Spot
Given a percentage of parking spots available, you determine the number of vacant spots that can be passed before a certain distance until you take the first spot.

2. Explore/Exploit

In this chapter, authors describe several algorithms for exploring available paths and then using the optimal path. Here is a sampling of the approaches based on explore/exploit:
Multi-armed bandit
Given expected value of a slot machine (winnings/# of pulls), you need to maximize winnings. There are several approaches such as:

  • Win-Stay
    You keep using a slot machine as long as you are winning and then switch to a different machine when you lose.
  • Gittins Index
    It is named after Gittins, who was a professor at Oxford. It tries to maximize payoffs for future by calculating a Gittins index for all slot machines and then selecting slot machine with the highest Gittins index.
  • Regret and optimism
    Many problems in life can be defined in terms of regrets and optimism by imagining being at the deathbed and thinking of decisions that you could have made differently.
  • Upper Confidence Bound
    It is also referred as optimism in the face of uncertainty, where you choose your actions as if the environment is as nice as is plausibly possible. Given a range of plausible values and you pick the option with the highest confidence interval.
  • A/B Testing
    It is often used to test new features by offering the new features to a subset of the customers.

One of insight the authors present is that people often explore longer by favoring new over the best older option.

3. Sorting

In this chapter, authors describe several algorithms for sorting and their computing cost in terms of O-notation. The O-notation is generally used to indicate algorithm’s worst performance such as:

  • O(1): Constant cost
  • O(N): Linear cost
  • O(N^2): Quadratic cost
  • O(2^N): Exponential cost
  • O(N!): Factorial cost

Merge-Sort
This algorithm breaks data recursively into smaller sets until there is a single element. It then merges those subsets to create a new sorted list.

Bucket-Sort
A group of n items can be grouped into m buckets in O(nm) time and this insight is used by bucket sorting where items are grouped into a number of sorted buckets. For example, you can use this approach to load returned books into carts based on the shelf numbers.

Sorting is a pre-requisite for searching and there are a lot of practical applications for sorting such as creating matchups between teams. For example, teams can use round-robin based matchup where each team plays each other team but it would result in a lot of matches (O(N^2)). Instead, competitions such as March Madness uses Merge-Sort to move from 64 teams to 32, 16, 8, 4 and finals. However, it doesn’t use full sort as there are only 63 games in the season instead of 192.

4. Caching

In computer design, John Von Neumann designed memory hierarchy to improve lookup performance. It was first used in IBM 360 mainframes. Other computer researchers such as Belady designed algorithms for page faults to load data from disk to memory. There are several algorithms for cache eviction such as First-In, First-Out, Least-Recently-Used, etc.

5. Scheduling

Here are a few of the scheduling algorithms described in this chapter:
Earliest Due Date Strategy
It minimizes maximum lateness by choosing task with the earliest due date first.

Moore’s algorithm
It is similar to Earliest Due Date but it throws out biggest task if the new job can’t be completed by due date.

The authors give an example of Getting Things Done (GTD) technique for time management where small tasks are handled first. The tasks can also have a weight or priority and then the scheduler minimizes the sum of weighted completion time by dividing weight by length of the task and selecting the task with the highest density.

Here are a few issues that can arise with priority based tasks:

  • Priority Inversion – when a low priority task possesses a resource and scheduler executes a higher priority task, which cannot make any progress. One way to address this issue is by allowing the low-priority task to inherit the priority of higher priority task and let it complete.
  • Thrashing – it occurs when system grinds to halt because work cannot be completed due to lack of resources.
  • Context switching – Modern operating system uses context switching to work on multiple tasks but each slice of time needs to be big enough so that the task can make progress. One technique to minimize context switching is interrupt coalescing, which delays hardware interrupt. Similar techniques can be used by batching small tasks, e.g. Getting Things Done technique encourages creating a chunk of time to handle similar tasks such as checking emails, making phone calls, etc.

6. Bayes’s Rule

Reverand Thomas Bayes postulated Bayes’s rule by looking at winning and losing tickets to determine overall ticket pool. It was later proved by Pierre-Simon Laplace, which is commonly referred as Laplace’s law. Laplace worked out Bayes’s Rule to use prior knowledge in prediction problems.

Copernican Principle
Richard Gott hypothesized that the moment you observe something, it is likely to be in the middle of its lifetime.

Normal or Gaussian distribution
It has a bell curve and can be used to predict average life span.

Power-law distribution
It uses range over many scales such as the population of cities or income of people.

Multiplicative Rule
It multiplies quantity observed with some constant factor.

Average Rule
It uses the distribution’s natural average.

Additive Rule
It predicts that the things that will go on just a constant amount longer such as a five more minute rule.

7. Overfitting

In machine learning, overfitting occurs when training data fits tightly with key factors so that it doesn’t accurately predict the outcome for the data that it has not observed.

Cross Validation
Overfitting can be solved with cross-validation by assessing model not just against training data but also against unseen data.

Regularization
It uses contents to penalize complexity.

Lasso
It uses penalty of the total weight of different factors to minimize complexity.

8. Relaxation

In constraint optimization problems, you need to find the best arrangement of a set of variables given a set of rules and scoring mechanism such as traveling salesman problem (O(N!)). Using constraint relaxation, you remove some of the problem constraints, e.g. you can create a minimum spanning tree that connects all nodes in O(N^2) amount of time. Techniques such as Lagrangian Relaxation removes some of the constraints and add them to the scoring system.

9. Randomness

This chapter describes examples of algorithms that are based on random numbers such as:

Monte Carlo Method
It uses random samples to handle qualitatively unmanageable problems.

Hill Climbing
It takes a solution and tries to improve it by permuting some of the factors. It only accepts changes if it results in improvements. However, it may not find the globally optimal solution.

Jitter
It makes random small changes and accepts them even if they don’t improve in order to find the better solution.

Metropolis algorithm
It uses Monte Carlo Method and accepts bad and good tweaks in trying different solutions.

Simulated Annealing
It optimizes problems like annealing by heating up and slowly cooling off.

10. Networking

This chapter describes algorithms used in the computer network such as:

Packet switching
One of key idea of Internet was to use packet switching where TCP/IP sends data packets over a number of connections as opposed to dedicated lines or circuit switching which were used by phone companies.

Acknowledgement
It is used to let the sender know that packet is received. TCP/IP uses the triple handshake to establish a connection and sender resends packets if ACK is not received.

Exponential Backoff
It increases average delay after successive failure.

Flow Control
TCP/IP uses Additive Increase Multiplicative Decrease to increase the number of packets sent and cut the transmission rate in half and ACK is not received.

Bufferbloat
A buffer is a queue that stores outgoing packets, but when the queue length is large, it can add a delay in sending ACK, which would result in redelivery. Explicit Congestion Notification can be used to address those issues.

11. Game Theory

In this chapter, authors discuss several problems from game theory such as:

Halting problem
This problem was first posed by Alan Turing who asserted that a computer program can never tell whether another program that it uses would take forever to compute something.

Prisoner’s dilemma
It is based on two prisoners who are caught and have to either cooperate or work against each other. In general, defection is the dominant strategy.

Nash Equilibrium
It is one of strategy where neither player changes their own play based on the opponent’s strategy.

The Tragedy of the Commons
It involves a shared-resource system where an individual can act independently in a selfish manner that is contrary to the common good of all participants, e.g. voluntary environmental laws where companies are not required to obey emission levels.

Information cascade
Information cascade occurs where an individual abandons their own information in favor of other people’s action. One application of this class of problems is auction systems. Here are a few variations of the auction systems:

  • Sealed-bid – where bidders are unaware of other bid prices so they would have to predict price that other bidders would use.
  • Dutch or descending auction – where bids start at a high price and is slowly lowered until someone accepts it.
  • English or ascending auction – where bid starts at a low price and is then increased.
  • Vickrey auction – it is similar to sealed-bid but winners pay second-place bid. It results in better valuation as bidders are incentivized to bid based on the true value.

Summary

This book presents several domains of algorithms and encourages computational kindness by applying these algorithms in real-life. For example, we can add constraints or reduce the number of available options when making a decision, which would lower the mental labor.

February 6, 2016

Building a Generic Data Service

Filed under: Web Services — admin @ 10:44 pm

As REST based Micro-Services have become prevalent, I often find that web and mobile clients have to connect to different services for gathering data. You may have to call dozens of services to display data on a single screen or page. Also, you may only need subset of data from each service but you still have to pay for the bandwidth and parsing cost.

I created a new Java framework PlexDataProviders for aggregating and querying data from various underlying sources, which can be used to build a general-purpose data service. PlexDataProviders is a light-weight Java framework that abstract access to various data providers such as databases, files, web services, etc. It allows aggregation of data from various data providers.

The PlexDataProviders framework is divided into two components:

  • Data Provider – This component defines interfaces that are implemented to access data sources such as database or web services.
  • Query Engine – This component is used for querying and aggregating data.

The query engine can determine dependency between providers and it also allow you to use output of one of the data provider as input to another data provider. For example, let’s assume:

  • data-provider A requires input-a1, input-a2 and produces output-a1, output-a2
  • data-provider B requires input-b1 and output-a1 and produces output-b1, output-b2

Then you can pass input-a1, input-a2 to the query engine and request output-a1, output-a2, output-b1, output-b2 output data fields.

Benefits

PlexDataProviders provides offers following benefits:

  • It provides a unified way to search data and abstracts integration to underlying data sources.
  • It helps simplifying client side logic as they can use a single data service to query all data instead of using multiple data services.
  • This also help with managing end-points as you only a single end-point instead of connecting to multiple web services.
  • As clients can specify the data they need, this helps with payload size and network bandwidth.
  • The clients only need to create a single data parser so it keeps JSON parsing logic simple.
  • As PlexDataProviders supports multi-threading, it also helps with latency of the data fetch requests.
  • It partial failure so that a failure in a single data provider doesn’t effect other data providers and the data service can still return partial results. User
  • It supports timeout so that clients can receive available data that completes in given timeout interval

Data Structure

Following are primary data structures:

  • MetaField – This class defines meta information for each data field such as name, kind, type, etc.
  • MetaFieldType – This enum class supports primitive data types supported, i.e.
    • SCALAR_TEXT – simple text
    • SCALAR_INTEGER – integer numbers
    • SCALAR_DECIMAL – decimal numbers
    • SCALAR_DATE – dates
    • SCALAR_BOOLEAN – boolean
    • VECTOR_TEXT – array of text
    • VECTOR_INTEGER – array of integers
    • VECTOR_DECIMAL – array of decimals
    • VECTOR_DATE – array of dates
    • VECTOR_BOOLEAN – array of boolean
    • BINARY – binary data
    • ROWSET – nested data rowsets
  • Metadata – This class defines a set of MetaFields used in DataRow/DataRowSet
  • DataRow – This class abstracts a row of data fields
  • DataRowSet – This class abstracts a set of rows

PlexDataProviders also supports nested structures where a data field in DataRow can be instance of DataRowSet.

Adding a Data Provider

The data provider implements following two interfaces

public interface DataProducer {
    void produce(DataRowSet requestFields, DataRowSet responseFields,
            QueryConfiguration config) throws DataProviderException;
}

Note that QueryConfiguration defines additional parameters such as:

  • pagination parameters
  • ordering/grouping
  • filtering parameters
  • timeout parameters

The timeout parameter can be used to return all available data within defined time, e.g. query engine may invoke underlying data providers in multiple threads and if underlying query takes a long time then it would return available data.

public interface DataProvider extends DataProducer, Comparable<DataProvider> {
    String getName();
 
    int getRank();
 
    Metadata getMandatoryRequestMetadata();
 
    Metadata getOptionalRequestMetadata();
 
    Metadata getResponseMetadata();
 
    TaskGranularity getTaskGranularity();
}

Each provider defines name, rank (or priority when matching for best provider), set of mandatory/optional input and output data fields. The data provider can also define granularity as coarse grain or fine grain and the implementation may execute those providers on different threads.

PlexDataProviders also provides interfaces for converting data from domain objects to DataRowSet. Here is an example of provider implementation:

public class SecuritiesBySymbolsProvider extends BaseProvider {
    private static Metadata parameterMeta = Metadata.from(SharedMeta.symbol);
    private static Metadata optionalMeta = Metadata.from();
    private static SecurityMarshaller marshaller = new SecurityMarshaller();
 
    public SecuritiesBySymbolsProvider() {
        super("SecuritiesBySymbolsProvider", parameterMeta, optionalMeta,
                marshaller.getMetadata());
    }
 
    @Override
    public void produce(DataRowSet parameter, DataRowSet response,
            QueryConfiguration config) throws DataProviderException {
        final String id = parameter.getValueAsText(SharedMeta.symbol, 0);
        Map<String, Object> criteria = new HashMap<>();
        criteria.put("symbol", id.toUpperCase());
        Collection<Security> securities = DaoLocator.securityDao.query(criteria);
        DataRowSet rowset = marshaller.marshal(securities);
        addRowSet(response, rowset, 0);
    }
}

Typically, you will create data-provider for each different kind of query that you want to support. Each data provider specifies set of required and optional data fields that can be used to generate output data fields.

Here is an example of marshalling data from Securty domain objects to DataRowSet:

public DataRowSet marshal(Security security) {
    DataRowSet rowset = new DataRowSet(responseMeta);
    marshal(rowset, security, 0);
    return rowset;
}
 
public DataRowSet marshal(Collection<Security> securities) {
    DataRowSet rowset = new DataRowSet(responseMeta);
    for (Security security : securities) {
        marshal(rowset, security, rowset.size());
    }
    return rowset;
}
...

PlexDataProviders provides DataProviderLocator interface for registering and looking up provider, e.g.

public interface DataProviderLocator {
    void register(DataProvider provider);
 
    Collection<DataProvider> locate(Metadata requestFields, Metadata responseFields);
...
}

PlexDataProviders comes with a small application that provides data services by implementing various data providers. It uses PlexService framework for defining the service, e.g.

@WebService
@Path("/data")
public class DataServiceImpl implements DataService {
    private DataProviderLocator dataProviderLocator = new DataProviderLocatorImpl();
    private QueryEngine queryEngine = new QueryEngineImpl(dataProviderLocator);
 
    public DataServiceImpl() {
        dataProviderLocator.register(new AccountsByIdsProvider());
        dataProviderLocator.register(new AccountsByUseridProvider());
        dataProviderLocator.register(new CompaniesBySymbolsProvider());
        dataProviderLocator.register(new OrdersByAccountIdsProvider());
        dataProviderLocator.register(new PositionGroupsBySymbolsProvider());
        dataProviderLocator.register(new PositionsBySymbolsProvider());
        dataProviderLocator.register(new QuotesBySymbolsProvider());
        dataProviderLocator.register(new SecuritiesBySymbolsProvider());
        dataProviderLocator.register(new UsersByIdsProvider());
        dataProviderLocator.register(new WatchlistByUserProvider());
        dataProviderLocator.register(new SymbolsProvider());
        dataProviderLocator.register(new UsersProvider());
        dataProviderLocator.register(new SymbolSearchProvider());
    }
 
    @Override
    @GET
    public DataResponse query(Request webRequest) {
        final DataRequest dataRequest = DataRequest.from(webRequest .getProperties());
        return queryEngine.query(dataRequest);
    }
}

As you can see the data service simply builds DataRequest with input data fields and sends back response back to clients.

Here is an example client that passes a search query data field and requests quote data fields with company details

public void testGetQuoteBySearch() throws Throwable {
    String jsonResp = TestWebUtils.httpGet("http://localhost:" + DEFAULT_PORT
                    + "/data?responseFields=exchange,symbol,quote.bidPrice,quote.askPrice,quote.sales,company.name&symbolQuery=AAPL");
    ...

Note that above request will use three data providers, first it uses SymbolSearchProvider provider to search for matching symbols with given query. It then uses the symbol data field to request company and quote data fields from QuotesBySymbolsProvider and CompaniesBySymbolsProvider. The PlexDataProviders framework will take care of all dependency management for providers.

Here is an example JSON response from the data service:

{
    "queryResponse": {
        "fields": [
            [{
                "symbol": "AAPL_X"
            }, {
                "quote.sales": [
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 56
                    }, {
                        "timeOfSale.exchange": "DOW"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 69.49132317180353
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 54
                    }, {
                        "timeOfSale.exchange": "NYSE"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 16.677774132458076
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 99
                    }, {
                        "timeOfSale.exchange": "NASDAQ"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 42.17891320885568
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 49
                    }, {
                        "timeOfSale.exchange": "DOW"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 69.61680149649729
                    }],
                    [{
                        "symbol": "AAPL_X"
                    }, {
                        "timeOfSale.volume": 69
                    }, {
                        "timeOfSale.exchange": "NYSE"
                    }, {
                        "timeOfSale.date": 1455426008762
                    }, {
                        "timeOfSale.price": 25.353316897552833
                    }]
                ]
            }, {
                "quote.askPrice": 54.99300665695502
            }, {
                "quote.bidPrice": 26.935682182171643
            }, {
                "exchange": "DOW"
            }, {
                "company.name": "AAPL - name"
            }],
            [{
                "symbol": "AAPL"
            }, {
                "exchange": "NASDAQ"
            }]
        ],
        "errorsByProviderName": {},
        "providers": ["QuotesBySymbolsProvider", "SymbolSearchProvider", "CompaniesBySymbolsProvider"]
    }
}
PlexDataProviders is available from github and is licensed under liberal MIT license. It also comes with a small sample application for demo purpose. Feel free to send me your suggestions.

 

August 17, 2014

PlexService Overview – a Micro-service framework for defining HTTP/Websockets and JMS based Services

Filed under: Uncategorized — admin @ 9:19 pm

I recently created a new framework PlexService for serving micro-services. which can be accessed by HTTP, Websockets or JMS interfaces. You can choose these different access mechanism by needs of your services. For example, as JMS services are inherently asynchronous, they provide good foundation for building scalable and reactive services. You may choose http stack for implementing REST services or choose websockets for implementing interactive services.

PlexService framework provides provides basic support for encoding POJO objects into JSON for service consumption. The developers define service configuration via annoations to specify gateway types, encoding scheme, end-points, etc.

PlexService provides support of role-based security, where you can specify list of roles who can access each service. The service providers implement how to verify roles, which are then enforced by PlexService framework.

If you implement all services in JMS, you can easily expose them via HTTP or Websockets by configuring web-to-jms bridge. The bridge routes all requests from HTTP/Websockets to JMS and listen for incoming messages, which are then routed back to web clients.

PlexService provides basic metrics such as latency, invocations, errors, etc., which are exposed via JMX interface. PlexService uses jetty for serving web services. The developers provide JMS containers at runtime if required.

Building/Installing

Checkout code using

 git clone git@github.com:bhatti/PlexService.git
 

Compile and build jar file using

 ./gradlew jar
 

Copy and add jar file manually in your application.

Defining role-based security

PlexService allows developers to define role-based security, which is invoked when accessing services, e.g.

 
 public class BuggerRoleAuthorizer implements RoleAuthorizer {
     private final UserRepository userRepository;
 
     public BuggerRoleAuthorizer(UserRepository userRepository) {
       this.userRepository = userRepository;
     }
 
     @Override
       public void authorize(Request request, String[] roles) throws AuthException {
         String sessionId = request.getSessionId();
         User user = userRepository.getUserBySessionId(sessionId);
         if (user == null) {
           throw new AuthException(Constants.SC_UNAUTHORIZED,
               request.getSessionId(), request.getRemoteAddress(),
               "failed to validate session-id");
         }
         for (String role : roles) {
           if (!user.getRoles().contains(role)) {
             throw new AuthException(Constants.SC_UNAUTHORIZED,
                 request.getSessionId(), request.getRemoteAddress(),
                 "failed to match role");
           }
         }
       }
 }
 

Typically, login-service will store session-id, which is then passed to the implementation of RoleAuthorizer, e.g.

 
 @ServiceConfig(gateway = GatewayType.HTTP, requestClass = Void.class, endpoint = "/login", method = Method.POST, codec = CodecType.JSON)
 public class LoginService extends AbstractUserService implements RequestHandler {
   public LoginService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
   public void handle(Request request) {
     String username = request.getStringProperty("username");
     String password = request.getStringProperty("password");
 
     User user = userRepository.authenticate(username, password);
     AbstractResponseBuilder responseBuilder = request.getResponseBuilder();
     if (user == null) {
       throw new AuthException(Constants.SC_UNAUTHORIZED,
               request.getSessionId(), request.getRemoteAddress(),
               "failed to authenticate");
     } else {
       responseBuilder.addSessionId(userRepository.getSessionId(user));
       responseBuilder.send(user);
     }   
   }
 }
 

In above example the session-id is added to response upon successful login, which is then passed for future requests. For http services, you may use cookies to store session-ids, otherwise you would need to pass session-id as a parameter.

Here is how you can invoke login-service from curl:

 
 curl --cookie-jar cookies.txt -v -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/login?username=erica&password=pass"
 

which would return:

 
 Content-Type: application/json
 Set-Cookie: PlexSessionID=5 Expires: Thu, 01 Jan 1970 00:00:00 GMT
 {"id":5,"username":"erica","email":"erica@plexobject.com","roles":["Employee"]}
 

Defining Services

Defining a REST service for creating a user

Here is how you can a REST service:

 @ServiceConfig(gateway = GatewayType.HTTP, requestClass = User.class, 
     rolesAllowed = "Administrator", endpoint = "/users", method = Method.POST, 
     codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements
 RequestHandler {
   public CreateUserService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }
 

The ServiceConfig annotation defines that this service can be accessed via HTTP at “/users” URI. PlexService will provide encoding from JSON to User object and will ensure that service can be accessed by user who has Administrator role.

Here is how you can invoke this service from curl:

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/users" -d "{\"username\":\"david\",\"password\":\"pass\",\"email\":\"david@plexobject.com\",\"roles\":[\"Employee\"]}"
 

Defining a Web service over Websockets for creating a user

Here is how you can a Websocket based service:

 
 @ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = User.class, 
     rolesAllowed = "Administrator", endpoint = "/users", method = Method.POST, 
     codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements
 RequestHandler {
   public CreateUserService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }
 

The ServiceConfig annotation defines that this service can be accessed via Websocketat “/users” endpoint. However, as opposed to HTTP based service, this endpoint is not enforced in HTTP request and can be in any format as long it’s unique for a service.

Here is how you can access websocket service from javascript:

script 
 var ws = new WebSocket("ws://127.0.0.1:8181/users");
 ws.onopen = function() {
   var req = {"payload":"", "endpoint":"/login", "method":"POST", "username":"scott", "password":"pass"};
   ws.send(JSON.stringify(req));
 };
 
 ws.onmessage = function (evt) {
   alert("Message: " + evt.data);
 };
 
 ws.onclose = function() {
 };
 
 ws.onerror = function(err) {
 };
 

Note that websockets are not supported by all browsers and above code will work only supported browsers such as IE 11+, FF 31+, Chrome 36+, etc.

Defining a JMS service for creating a user

Here is how you can create JMS service:

 
 @ServiceConfig(gateway = GatewayType.JMS, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "queue:{scope}-create-user-service-queue", 
       method = Method.MESSAGE, 
       codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements RequestHandler {
     public CreateUserService(UserRepository userRepository) {
     super(userRepository);
     }
 
     @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }
 

Note that the only difference is type of gateway. PlexService also support variables in end-points, which are populated from configurations. For example, you may create scope variable to create different queues/topics for different developers/environments. PlexService will serialize POJO classes into JSON when delivering messages over JMS.

Defining a REST service with parameterized URLs

PlexService allows developers to define URIs for services, that contains variables. These variables are then populated actual requests. These can be used for implementing REST services, e.g.

 
 @ServiceConfig(gateway = GatewayType.HTTP, requestClass = BugReport.class, 
       rolesAllowed = "Employee", endpoint = "/projects/{projectId}/bugreports", 
       method = Method.POST, 
       codec = CodecType.JSON)
 public class CreateBugReportService extends AbstractBugReportService implements RequestHandler {
     public CreateBugReportService(BugReportRepository bugReportRepository,
         UserRepository userRepository) {
       super(bugReportRepository, userRepository);
     }
 
     @Override
       public void handle(Request request) {
         BugReport report = request.getPayload();
         report.validate();
         BugReport saved = bugReportRepository.save(report);
         request.getResponseBuilder().send(saved);
       }
 }
 

Here is an example of invoking this service from curl:

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/projects/2/bugreports" -d "{\"title\":\"As an administrator, I would like to assign roles to users so that they can perform required actions.\",\"description\":\"As an administrator, I would like to assign roles to users so that they can perform required actions.\",\"bugNumber\":\"story-201\",\"assignedTo\":\"mike\",\"developedBy\":\"mike\"}"
 
 

Using variables with Websocket based service

You can also create variables for websocket’s endpoints similar to JMS, which are initialized from parameters.

 
 @ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = BugReport.class, 
       rolesAllowed = "Employee", endpoint = "{variable}-create-bugreport-service-channel", 
       method = Method.MESSAGE, codec = CodecType.JSON)
 public class CreateBugReportService extends AbstractBugReportService implements
         RequestHandler {
     public CreateBugReportService(BugReportRepository bugReportRepository,
             UserRepository userRepository) {
         super(bugReportRepository, userRepository);
     }
 
     @Override
     public void handle(Request request) {
         BugReport report = request.getPayload();
         report.validate();
         BugReport saved = bugReportRepository.save(report);
         request.getResponseBuilder().send(saved);
     }
 
 }
 

Here is another example of consuming websocket based service from javascript:

  
 var ws = new WebSocket("ws://127.0.0.1:8181/users");
 ws.onopen = function() {
   var req = {"payload":{"title":"my title", "description":"my description","bugNumber":"story-201", "assignedTo":"mike", "developedBy":"mike"},"PlexSessionID":"4", "endpoint":"/projects/2/bugreports/2/assign", "method":"POST"};
   ws.send(JSON.stringify(req));
 };
 
 ws.onmessage = function (evt) {
   alert("Message: " + evt.data);
 };
 
 ws.onclose = function() {
 };
 
 ws.onerror = function(err) {
 };
 

Defining a REST service for querying users

Here is an example REST service, which uses GET request to query users:

 
   @ServiceConfig(gateway = GatewayType.HTTP, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "/users", method = Method.GET, 
       codec = CodecType.JSON)
   public class QueryUserService extends AbstractUserService implements
   RequestHandler {
     public QueryUserService(UserRepository userRepository) {
       super(userRepository);
     }
     @Override
       public void handle(Request request) {
         Collection<User> users = userRepository.getAll(new Predicate<User>() {
             @Override
             public boolean accept(User u) {
             return true;
             }
             });
         request.getResponseBuilder().send(users);
       }
   }
 

Here is how you can invoke this service from curl

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" "http://127.0.0.1:8181/users"   
 

which would return json array such as:

 
 [{"id":2,"username":"alex","email":"alex@plexobject.com","roles":["Employee"]},{"id":3,"username":"jeff","email":"jeff@plexobject.com","roles":["Employee","Manager"]},{"id":4,"username":"scott","email":"scott@plexobject.com","roles":["Employee","Administrator","Manager"]},{"id":5,"username":"erica","email":"erica@plexobject.com","roles":["Employee"]}]
 

Defining a JMS service for querying users

Here is an example of defining query users via JMS service:

 
 @ServiceConfig(gateway = GatewayType.JMS, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "queue:{scope}-query-user-service-queue", 
       method = Method.MESSAGE, 
       codec = CodecType.JSON)
 public class QueryUserService extends AbstractUserService implements RequestHandler {
     public QueryUserService(UserRepository userRepository) {
       super(userRepository);
     }
     @Override
       public void handle(Request request) {
         Collection<User> users = userRepository.getAll(new Predicate<User>() {
             @Override
             public boolean accept(User u) {
             return true;
             }
             });
         request.getResponseBuilder().send(users);
       }
 }
 

The end-point can contain variables such as scope that are initialized from configuration.

Registering services and starting service container

You will need to register services with ServiceRegistry at runtime, which would initialize and start those services, e.g.

 
 Collection<RequestHandler> services = new HashSet<>();
 services.add(new CreateUserService(userRepository));
 services.add(new UpdateUserService(userRepository));
 services.add(new QueryUserService(userRepository));
 services.add(new DeleteUserService(userRepository));
 services.add(new LoginService(userRepository));
 services.add(new CreateProjectService(projectRepository, userRepository));
 services.add(new UpdateProjectService(projectRepository, userRepository));
 services.add(new QueryProjectService(projectRepository, userRepository));
 services.add(new AddProjectMemberService(projectRepository, userRepository));
 services.add(new RemoveProjectMemberService(projectRepository, userRepository));
 services.add(new CreateBugReportService(bugreportRepository, userRepository));
 services.add(new UpdateBugReportService(bugreportRepository, userRepository));
 services.add(new QueryBugReportService(bugreportRepository, userRepository));
 services.add(new QueryProjectBugReportService(bugreportRepository, userRepository));
 
 services.add(new AssignBugReportService(bugreportRepository, userRepository));
 serviceRegistry = new ServiceRegistry(config, services, new BuggerRoleAuthorizer(userRepository));
 serviceRegistry.start();
 
 

Creating Http to JMS bridge

You may choose to write all services as JMS and then expose them via HTTP using bridge provided by PlexService, e.g.

 
   final String mappingJson = IOUtils.toString(new FileInputStream( args[1]));
 Collection<HttpToJmsEntry> entries = new JsonObjectCodec().decode(
     mappingJson, new TypeReference<List<HttpToJmsEntry>>() {
     });
 WebToJmsBridge bridge = new WebToJmsBridge(new Configuration(args[0]), entries, GatewayType.HTTP);
 bridge.startBridge();
 

Creating Websocket to JMS bridge

Similarly, you may expose JMS services via websockets based transport using the bridge:

 
   final String mappingJson = IOUtils.toString(new FileInputStream( args[1]));
 Collection<HttpToJmsEntry> entries = new JsonObjectCodec().decode(
     mappingJson, new TypeReference<List<HttpToJmsEntry>>() {
     });
 WebToJmsBridge bridge = new WebToJmsBridge(new Configuration(args[0]), entries, GatewayType.WEBSOCKET);
 bridge.startBridge();
 

Here is JSON configuration for bridge:

   [
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports/{id}/assign","method":"POST",
     "destination":"queue:{scope}-assign-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports","method":"GET",
     "destination":"queue:{scope}-query-project-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users","method":"GET",
     "destination":"queue:{scope}-query-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects","method":"GET",
     "destination":"queue:{scope}-query-projects-service","timeoutSecs":30},
   {"codecType":"JSON","path":"/bugreports","method":"GET",
     "destination":"queue:{scope}-bugreports-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}/membership/add","method":"POST",
     "destination":"queue:{scope}-add-project-member-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}/membership/remove","method":"POST",
     "destination":"queue:{scope}-remove-project-member-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports","method":"POST",
     "destination":"queue:{scope}-create-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users","method":"POST",
     "destination":"queue:{scope}-create-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects","method":"POST",
     "destination":"queue:{scope}-create-projects-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users/{id}","method":"POST",
     "destination":"queue:{scope}-update-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users/{id}/delete","method":"POST",
     "destination":"queue:{scope}-delete-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}","method":"POST",
     "destination":"queue:{scope}-update-project-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports/{id}","method":"POST",
     "destination":"queue:{scope}-update-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/login","method":"POST",
     "destination":"queue:{scope}-login-service-queue","timeoutSecs":30}]
 

Defining a Streaming Quotes Service over Websockets

Suppose you are building a high performance streaming quote service for providing real-time stock quotes, you can easily build it using PlexService framework, e.g.

 @ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = Void.class, endpoint = "/quotes", method = Method.MESSAGE, codec = CodecType.JSON)
 public class QuoteServer implements RequestHandler {
     public enum Action {
         SUBSCRIBE, UNSUBSCRIBE
     }
 
     static final Logger log = LoggerFactory.getLogger(QuoteServer.class);
 
     private QuoteStreamer quoteStreamer = new QuoteStreamer();
 
     @Override
     public void handle(Request request) {
         String symbol = request.getProperty("symbol");
         String actionVal = request.getProperty("action");
         log.info("Received " + request);
         ValidationException
                 .builder()
                 .assertNonNull(symbol, "undefined_symbol", "symbol",
                         "symbol not specified")
                 .assertNonNull(actionVal, "undefined_action", "action",
                         "action not specified").end();
         Action action = Action.valueOf(actionVal.toUpperCase());
         if (action == Action.SUBSCRIBE) {
             quoteStreamer.add(symbol, request.getResponseBuilder());
         } else {
             quoteStreamer.remove(symbol, request.getResponseBuilder());
         }
     }
 
     public static void main(String[] args) throws Exception {
         Configuration config = new Configuration(args[0]);
         QuoteServer service = new QuoteServer();
         Collection<RequestHandler> services = new ArrayList<>();
         services.add(new QuoteServer());
         //
         ServiceRegistry serviceRegistry = new ServiceRegistry(config, services, null);
         serviceRegistry.start();
         Thread.currentThread().join();
     }
 }
 

Above example defines a service that listen to websockets and responds to subscribe or unsubscribe requests from web clients.

You can define mock QuoteStreamer as follows, which periodically sends quotes to all subscribers:

 public class QuoteStreamer extends TimerTask {
     private int delay = 1000;
     private Map<String, Collection<ResponseDispatcher>> subscribers = new ConcurrentHashMap<>();
     private QuoteCache quoteCache = new QuoteCache();
     private final Timer timer = new Timer(true);
 
     public QuoteStreamer() {
         timer.schedule(this, delay, delay);
     }
 
     public void add(String symbol, ResponseDispatcher dispatcher) {
         symbol = symbol.toUpperCase();
         synchronized (symbol.intern()) {
             Collection<ResponseDispatcher> dispatchers = subscribers
                     .get(symbol);
             if (dispatchers == null) {
                 dispatchers = new HashSet<ResponseDispatcher>();
                 subscribers.put(symbol, dispatchers);
             }
             dispatchers.add(dispatcher);
         }
     }
 
     public void remove(String symbol, ResponseDispatcher dispatcher) {
         symbol = symbol.toUpperCase();
         synchronized (symbol.intern()) {
             Collection<ResponseDispatcher> dispatchers = subscribers
                     .get(symbol);
             if (dispatchers != null) {
                 dispatchers.remove(dispatcher);
             }
         }
     }
 
     @Override
     public void run() {
         for (Map.Entry<String, Collection<ResponseDispatcher>> e : subscribers
                 .entrySet()) {
             Quote q = quoteCache.getLatestQuote(e.getKey());
             Collection<ResponseDispatcher> dispatchers = new ArrayList<>(
                     e.getValue());
             for (ResponseDispatcher d : dispatchers) {
                 try {
                     d.send(q);
                 } catch (Exception ex) {
                     remove(e.getKey(), d);
                 }
             }
         }
     }
 }
 

Here is a sample javascript/html client, which allows users to subscribe to different stock symbols:

    <script>
       var ws = new WebSocket("ws://127.0.0.1:8181/quotes");
       ws.onopen = function() {
       };
       var lasts = {};
       ws.onmessage = function (evt) {
         //console.log(evt.data);
         var quote = JSON.parse(evt.data).payload;
         var d = new Date(quote.timestamp);
         $('#time').text(d.toString());
         $('#company').text(quote.company);
         $('#last').text(quote.last.toFixed(2));
         var prev = lasts[quote.company];
         if (prev != undefined) {
           var change = quote.last - prev;
           if (change >= 0) {
             $('#change').css({'background-color':'green'});
           } else {
             $('#change').css({'background-color':'red'});
           }
           $('#change').text(change.toFixed(2));
         } else {
           $('#change').text('N/A');
         }
         lasts[quote.company] = quote.last;
       };
 
       ws.onclose = function() {
       };
 
       ws.onerror = function(err) {
       };
       function send(payload) {
         $('#input').text(payload);
         ws.send(payload);
       }
       $(document).ready(function() {
         $("#subscribe").click(function() {
           var symbol = $("#symbol").val();
           var req = {"endpoint":"/quotes", "symbol":symbol, "action":"subscribe"};
           send(JSON.stringify(req));
         });
       });
       $(document).ready(function() {
         $("#unsubscribe").click(function() {
           var symbol = $("#symbol").val();                                                                                            
           var req = {"endpoint":"/quotes", "symbol":symbol, "action":"unsubscribe"};
           send(JSON.stringify(req));
         });
       });
    <script>
 
   <body>
     <form>
       Symbol:<input type="text" id="symbol" value="AAPL" size="4" />
       <input type="button" id="subscribe" value="Subscribe"/>
       <input type="button" id="unsubscribe" value="Unsubscribe"/>
     </form>
 
     <br>
 
     <table id="quotes" class="quote" width="600" border="2" cellpadding="0" cellspacing="3">
       <thead>
         <tr>
           <th>Time</th>
           <th>Company</th>
           <th>Last</th>
           <th>Change</th>
         </tr>
       </thead>
       <tbody>
         <tr>
           <td id="time"></td>
           <td id="company"></td>
           <td id="last"></td>
           <td id="change"></td>
         </tr>
       </tbody>
     </table>
   </body>
 

PlexService includes this sample code, where you can start streaming quote server by running “quote.sh” command and then open quote.html file in your browser.

Using JMX

PlexService uses JMX to expose key metrics and lifecycle methods to start or stop services. You can use jconsole to access the JMX controls, e.g.

 
 jconsole localhost:9191
 



Summary

PlexService comes a full-fledged sample application under plexsvc-sample folder and you browse JavaDocs to view APIs.


April 23, 2014

Implementing Reactive Extensions (RX) using Java 8

Filed under: Java — admin @ 11:52 pm

In my last blog, I described new lambda support in Java 8. In order to try new Java features in more depth, I implemented Reactive extensions in Java 8. In short, reactive extensions allows processing synchronous and asynchronous in data uniform manner. It provides unified interfaces that can be used as an iterator or callback method for asynchronous processing. Though, Microsoft RX library is huge but I only implemented core features and focused on Observable API. Here is brief overview of implementation:

Creating Observable from Collection

Here is how you can create Observable from a collection:

    List<String> names = Arrays.asList("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribe(System.out::println, 
       Throwable::printStackTrace, () -> System.out.println("done"));
 

In Microsoft’s version of RX, Observable takes an Observer for subscription, which defines three methods: onNext, onError and onCompleted. onNext is invoked to push next element of data, onError is used to notify errors and onCompleted is called when data is all processed. In my implementation, the Observable interface defines two overloaded subscribe method, first takes callback functions for onNext and onError and second method takes three callback functions including onCompleted. I chose to use separate function parameters instead of a single interface so that caller can pass inline lambda functions instead of passing implementation of Observer interface.

Creating Observable from Array of objects

Here is how you can create Observable from stream:

    Observable.from("Erica", "Matt", "John", "Mike").subscribe(System.out::println, 
         Throwable::printStackTrace, () -> System.out.println("done"));
 

Creating Observable from Stream

Here is how you can create Observable from stream:

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    // note third argument for onComplete is optional
    Observable.from(names).subscribe(name -> System.out.println(name), 
       error -> error.printStackTrace());
 

Creating Observable from Iterator

Here is how you can create Observable from iterator:

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names.iterator()).subscribe(name -> System.out.println(name), 
       error -> error.printStackTrace());
 

Creating Observable from Spliterator

Here is how you can create Observable from spliterator:

    List<String> names = Arrays.asList("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names.spliterator()).subscribe(System.out::println, 
       Throwable::printStackTrace);
 

Creating Observable from a single object

Here is how you can create Observable from a single object:

    Observable.just("value").subscribe(v -> System.out.println(v), 
       error -> error.printStackTrace());
    // if a single object is collection, it would be treated as a single entity, e.g.
    Observable.just(Arrays.asList(1, 2, 3)).subscribe( num -> System.out.println(num), 
       error -> error.printStackTrace());
 

Creating Observable for an error

Here is how you can create Observable that would return an error:

    Observable.throwing(new Error("test error")).subscribe(System.out::println, 
       error -> System.err.println(error));
    // this will print error 
 

Creating Observable from a consumer function

Here is how you can create Observable that takes user function for invoking onNext, onError and onCompleted function:

    Observable.create(observer -> {
       for (String name : names) {
          observer.onNext(name);
       }
       observer.onCompleted();
    }).subscribe(System.out::println, Throwable::printStackTrace);
 

Creating Observable from range

Here is how you can create Observable from stream that would create numbers from start to end range exclusively.

    // Creates range of numbers starting at from until it reaches to exclusively
    Observable.range(4, 8).subscribe(num -> System.out.println(num), 
       error -> error.printStackTrace());
    // will print 4, 5, 6, 7
 

Creating empty Observable

It would call onCompleted right away:

    Observable.empty().subscribe(System.out::println, 
       Throwable::printStackTrace, () -> System.out.println("Completed"));
 

Creating never Observable

It would not call any of call back methods:

    Observable.never().subscribe(System.out::println, Throwable::printStackTrace);
 
 

Changing Scheduler

By default Observable notifies observer asynchronously using thread-pool scheduler but you can change default scheduler as follows:

Using thread-pool scheduler

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getThreadPoolScheduler()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Using new-thread scheduler

It will create new thread

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getNewThreadScheduler()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Using timer thread with interval

It will notify at each interval

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getTimerSchedulerWithMilliInterval(1000)).
       subscribe(System.out::println, Throwable::printStackTrace);
    // this will print each name every second
 

Using immediate scheduler

This scheduler calls callback functions right away on the same thread. You can use this if you synchronous data and don’t want to create another thread. On the downside, you cannot unsubscribe with this scheduler.

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getImmediateScheduler()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Transforming

Observables keep sequence of items as streams and they support map/flatMap operation as supported by standard Stream class, e.g.

Map

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).map(name -> name.hashCode()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

FlatMap

    Stream integerListStream = Stream.of( Arrays.asList(1, 2), 
       Arrays.asList(3, 4), Arrays.asList(5));
    Observable.from(integerListStream).flatMap(integerList -> integerList.stream()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Filtering

Observables supports basic filtering support as provided by Java Streams, e.g.

Filter

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", 
       "Five"); 
    Observable.from(names).filter(name -> name.startsWith("T")).
       subscribe(System.out::println, Throwable::printStackTrace);
    // This will only print Two and Three
 

Skip

skips given number of elements

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).skip(2).subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will skip One and Two
 

Limit

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).limit(2).subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will only print first two strings
 

Distinct

    Stream<String> names = Stream.of("One", "Two", "Three", "One");
    Observable.from(names).distinct.subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will print One only once
 

Merge

This concates two observable data:

    Observable<Integer> observable2 = Observable.from(Stream.of(4, 5, 6));
    observable1.merge(observable2).subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will print 1, 2, 3, 4, 5, 6
 

Summary

In summary, as Java 8 already supported a lot of functional primitives, adding support for reactive extensions was quite straight forward. For example, Nextflix’s implementation of reactive extensions in Java consists of over 80K lines of code but it took few hundred lines to implement core features with Java 8. You can download or fork the code from https://github.com/bhatti/RxJava8.


April 17, 2014

Introduction to Java 8 Lambda and Stream Syntax

Filed under: Java — admin @ 11:10 pm

Introduction

Java 8 was released in March 2014 with most language-level enhancements since Java 5 back in 2004. The biggest new feature is introduction to Lambda. Lambda or Closur is a block of code that you can pass to other methods or return from methods. Previously, Java supported a form of closure via anonymous class syntax, e.g.

 import java.awt.event.ActionEvent;
 import java.awt.event.ActionListener;
 import java.awt.Dimension;
 import java.awt.FlowLayout;
 import javax.swing.JButton;
 import javax.swing.JFrame;
 
 public class SwingExample extends JFrame {
   public SwingExample() {
     this.getContentPane().setLayout(new FlowLayout());
     final JButton btn = new JButton("Click Me");
     btn.setPreferredSize(new Dimension(400,200));
     add(btn);
     btn.addActionListener(new ActionListener() {
       public void actionPerformed(ActionEvent e) {
         btn.setText("Clicked");
         btn.setEnabled(false);
       }
     });
   }
 
   private static void createAndShowGUI() {
     JFrame frame = new SwingExample();
     frame.pack();
     frame.setVisible(true);
     frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
 
   }
 
   public static void main(String[] args) {
     javax.swing.SwingUtilities.invokeLater(new Runnable() {
       public void run() {
         createAndShowGUI(); 
       }
     });
   }
 }
 

In above example, using anonymous class could use locally defined data in method that declares it as long as it is defined with final. Here is how the example looks like with Java 8 syntax:

 import java.awt.event.ActionEvent;
 import java.awt.event.ActionListener;
 import java.awt.Dimension;
 import java.awt.FlowLayout;
 import javax.swing.JButton;
 import javax.swing.JFrame;
 
 public class SwingExample8 extends JFrame {
   public SwingExample8() {
     this.getContentPane().setLayout(new FlowLayout());
     JButton btn = new JButton("Click Me");
     btn.setPreferredSize(new Dimension(400,200));
     add(btn);
     btn.addActionListener(e -> {
         btn.setText("Clicked");
         btn.setEnabled(false);
       }
     );
   }
 
   private static void createAndShowGUI() {
     JFrame frame = new SwingExample8();
     frame.pack();
     frame.setVisible(true);
     frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
 
   }
 
   public static void main(String[] args) {
     javax.swing.SwingUtilities.invokeLater(new Runnable() {
       public void run() {
         createAndShowGUI(); 
       }
     });
   }
 }
 

As you can see, lambda syntax is very minimal. In addition, lambda syntax doesn’t require that you declare externally accessible data as final, though it cannot be changed. Java lambda also adds type inferencing so that you don’t have to define types of arguments. The lambda features are implemented using “invokedynamic” instruction to dispatch method calls, which was added in Java 7 to support dynamic languages. For example, let’s take a simple example:

 public class Run8 {
   public static void main(String[] args) {
     Runnable r = () -> System.out.println("hello there");
     r.run();
   }
 }
 

If you decompile it using

 javap -p Run8
 

You will see, it generated lambda$main$0 method, e.g.

 public class Run8 {
   public Run8();
   public static void main(java.lang.String[]);
   private static void lambda$main$0();
 }
 

You can see real byte code using

 javap -p -c Run8
 

and you will see new byte codes:

 public class Run8 {
   public Run8();
     Code:
        0: aload_0       
        1: invokespecial #1                  // Method java/lang/Object
        4: return        
 
   public static void main(java.lang.String[]);
     Code:
        0: invokedynamic #2,  0              // InvokeDynamic #0:run:()Ljava/lang/Runnable;
        5: astore_1      
        6: aload_1       
        7: invokeinterface #3,  1            // InterfaceMethod java/lang/Runnable.run:()V
       12: return        
 
   private static void lambda$main$0();
     Code:
        0: getstatic     #4                  // Field java/lang/System.out:Ljava/io/PrintStream;
        3: ldc           #5                  // String hello there
        5: invokevirtual #6                  // Method java/io/PrintStream.println:(Ljava/lang/String;)V
        8: return        
 }
 

This means lambdas don’t have to keep reference of enclosing class and “this” inside lambda does not create new scope.

Types of Functions

Java 8 provides predefined functions (See http://docs.oracle.com/javase/8/docs/api/java/util/function/package-summary.html), but there are four major types:

                 Supplier: () -> T
                 Consumer: T -> ()
                 Predicate: T -> boolean
                 Function: T -> R
 

The supplier method takes not arguments and produces an object, the consumer takes an argument for consumption, predicate evaluates given argument by returning true/false and function maps an argument of type T and returns an object of type R.

Supplier example

 import java.util.function.*;
 
 public class Supply8 {
   public static void main(String[] args) {
     Supplier<Double>> random1 = Math::random;
     System.out.println(random1.get());
     //
     DoubleSupplier random2 = Math::random;
     System.out.println(random2.getAsDouble());
   }
 }
 

Note: Java 8 provides special functions for primitive types that you can use instead of using wrapper classes for primitive types. Here is another example that shows how you can write efficient log messages:

 import java.util.function.*;
 
 public class SupplyLog {
   private static boolean debugEnabled; 
   public static void debug(Supplier<String> msg) {
     if (debugEnabled) {
       System.out.println(msg.get());
     }
   }
   public static void main(String[] args) {
     debug(() -> "this will not be printed");
     debugEnabled = true;
     debug(() -> "this will be printed");
   }
 }
 

Consumer example

 import java.util.function.*;
 
 public class Consume8 {
   public static void main(String[] args) {
     Consumer<String> consumer = s -> System.out.println(s);
     consumer.accept("hello there");
     consumer.andThen(consumer).accept("this will be printed twice");
   }
 }
 

Predicate example

 import java.util.function.*;
 
 public class Predicate8 {
   public static void main(String[] args) {
     Predicate<Integer> gradeA = score -> score >= 90;
     System.out.println(gradeA.test(80));
     System.out.println(gradeA.test(90));
   }
 }
 

In addition to test method, you can also use and, negate, or method to combine other predicates.

Function example

 import java.util.function.*;
 
 public class Function8 {
   public static void main(String[] args) {
     BinaryOperator<Integer> adder = (n1, n2) -> n1 + n2;
     System.out.println("sum " + adder.apply(4, 5));
     Function<Double>,Double> square = x -> x * x;
     System.out.println("square " + square.apply(5.0));
   }
 }
 

Custom Functions

In addition to predefined functions, you can define your own interface for functions as long as there is a single method is declared. You can optionally declare interface with @FunctionalInterface annotation so that compiler can verify it, e.g.

 public class CustomFunction8 {
   @FunctionalInterface
   interface Command<T> {
     void execute(T obj);
   }
 
   private static <T> void invoke(Command<T> cmd, T arg) {
     cmd.execute(arg);
   }
 
 
   public static void main(String[] args) {
     Command<Integer> cmd = arg -> System.out.println(arg);
     invoke(cmd, 5);
   }
 }
 

Method Reference

In addition to passing lambda, you can also pass instance or static methods as closures using method reference. There are four kinds of method references:

  • Reference to a static method ContainingClass::staticMethodName
  • Reference to an instance method of a particular object ContainingObject::instanceMethodName
  • Reference to an instance method of an arbitrary object of a particular type ContainingType::methodName
  • Reference to a constructor ClassName::new

Streams

In addition to lambda support, Java 8 has updated collection classes to support streams. Streams don’t really store anything but they behave as pipes for computation lazily. Though, collections have limited size, but streams can be unlimited and they can be only consumed once. Streams can be accessed from collections using stream() and parallelStream() methods or from an array via Arrays.stream(Object[]). There are also static factory methods on the stream classes, such as Stream.of(Object[]), IntStream.range(int, int), etc. Common intermediate methods using as pipes that you can invoke on streams:

  • filter()
  • distinct()
  • limit()
  • map()
  • peek()
  • sorted()
  • unsorted()

In above examples sorted, distinct, unsorted are stateful, whereas filter, map, limit are stateless. And here are terminal operations that trigger evaluation on streams:

  • findFirst()
  • min()
  • max()
  • reduce()
  • sum()

You can implement your own streams by using helper methods in StreamSupport class.

Iterating

Java 8 streams support forEach method for iterating, which can optionally take a consumer function, e.g.

 import java.util.stream.Stream;
 
 public class StreamForEach {
   public static void main(String[] args) {
     Stream<String> symbols = Stream.of("AAPL", "MSFT", "ORCL", "NFLX", "TSLA");
     symbols.forEach(System.out::println);
   }
 }
 

Parallel iteration:

 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Stream;
 
 public class ParStreamForEach {
   public static void main(String[] args) {
     List<String> symbols = Arrays.asList("AAPL", "MSFT", "ORCL", "NFLX", "TSLA");
     System.out.println("unordered");
     symbols.parallelStream().forEach(System.out::println);
     System.out.println("ordered");
     symbols.parallelStream().forEachOrdered(System.out::println);
   }
 }
 

Note that by default iterating parallel stream would be unordered but you can force ordered iteration using forEachOrdered method instead of forEach.

Filtering

We already saw predicate functions and filtering support in streams allow extract elements of collections that evaluates true to given predicate. Let’s create a couple of classes that we will use later:

 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.function.IntPredicate;
 
 
 public class Game implements IntPredicate {
   enum Type {
     AGE_ALL,
     AGE_13_OR_ABOVE,
     AGE_18_OR_ABOVE
   }
 
   public final String name;
   public final Type type;
   public final Collection<Player> players = new ArrayList<>();
 
 
   public Game(String name, Type type) {
     this.name = name;
     this.type = type;
   }
 
   public boolean suitableForAll() {
     return type == Type.AGE_ALL;
   }
   public void add(Player player) {
     if (test(player.age)) {
       this.players.add(player);
     }
   }
 
   @Override
   public boolean test(int age) {
     switch (type) {
       case AGE_18_OR_ABOVE:
         return age >= 18;
       case AGE_13_OR_ABOVE:
         return age >= 13;
       default:
         return true;
     }
   }
   @Override 
   public String toString() {
     return name;
   }
 }
 
 public class Player {
   public final String name;
   public final int age;
   public Player(String name, int age) {
     this.name = name;
     this.age = age;
   }
   @Override 
   public String toString() {
     return name;
   }
 }
 

Now let’s create a class that will filter games by types:

 import java.util.Arrays;
 import java.util.Collection;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 
 
 public class GameFilter {
   public static void main(String[] args) {
     Collection<Game>> games = Arrays.asList(new Game("Birdie", Game.Type.AGE_ALL), new Game("Draw", Game.Type.AGE_ALL), new Game("Poker", Game.Type.AGE_18_OR_ABOVE), new Game("Torpedo", Game.Type.AGE_13_OR_ABOVE));
     Collection<Game>> suitableForAll = games.stream().filter(Game::suitableForAll).collect(toList());
     System.out.println("suitable for all");
     suitableForAll.stream().forEach(System.out::println);
     Collection<Game>> adultOnly = games.stream().filter(game -> game.type == Game.Type.AGE_18_OR_ABOVE).limit(10).collect(toList());
     System.out.println("suitable for adults only");
     adultOnly.stream().forEach(System.out::println);
   }
 }
 

As you can see, filter can accept lambda or method reference.

Map

Map operation on streams applies a given function to transform each element in stream and produces another stream with transformed elements.

 import java.util.Arrays;
 import java.util.Collection;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 
 
 public class GameMap {
   public static void main(String[] args) {
     Collection<Game>> games = Arrays.asList(new Game("Birdie", Game.Type.AGE_ALL), new Game("Draw", Game.Type.AGE_ALL), new Game("Poker", Game.Type.AGE_18_OR_ABOVE), new Game("Torpedo", Game.Type.AGE_13_OR_ABOVE));
     Collection<Player> players = Arrays.asList(new Player("John", 10), new Player("David", 15), new Player("Matt", 20), new Player("Dan", 30), new Player("Erica", 5));
     for (Game game : games) {
       for (Player player : players) {
         game.add(player);
       }
     }
     //
     Collection<Game>.Type> types = games.stream().map(game -> game.type).collect(toList());
     System.out.println("types:");
     types.stream().forEach(System.out::println);
     Collection<Player> allPlayers = games.stream().flatMap(game -> game.players.stream()).collect(toList());
     System.out.println("\nplayers:");
     players.stream().forEach(System.out::println);
   }
 }
 
 

Note that flatMap takes collection of objects for each input and flattens it and produces a single collection. Java streams also produces map methods for primitive types such as mapToLong, mapToDouble, etc.

Sorting

Previously, you had to implement Comparable interface or provide Comparator for sorting but you can now pass lambda for comparison, e.g.

 import java.util.Arrays;
 import java.util.List;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 import java.util.Comparator;
 
 
 public class GameSort {
   public static void main(String[] args) {
     List<Player> players = Arrays.asList(new Player("John", 10), new Player("David", 15), new Player("Matt", 20), new Player("Dan", 30), new Player("Erica", 5));
     players.sort(Comparator.comparing(player -> player.age));
     System.out.println(players);
   }
 }
 

Min/Max

Java streams provide helper methods for calculating min/max, e.g.

 import java.util.Arrays;
 import java.util.Collection;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 import java.util.Comparator;
 
 
 public class GameMinMax {
   public static void main(String[] args) {
     Collection<Player> players = Arrays.asList(new Player("John", 10), new Player("David", 15), new Player("Matt", 20), new Player("Dan", 30), new Player("Erica", 5));
     Player min = players.stream().min(Comparator.comparing(player -> player.age)).get();
     Player max = players.stream().max(Comparator.comparing(player -> player.age)).get();
     System.out.println("min " + min + ", max " + max);
   }
 }
 
 

Reduce/Fold

Reduce or fold generalizes the problem where we compuate a single value from collection, e.g.

 import java.util.Arrays;
 import java.util.Collection;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 
 
 public class GameReduce {
   public static void main(String[] args) {
     Collection<Player> players = Arrays.asList(new Player("John", 10), new Player("David", 15), new Player("Matt", 20), new Player("Dan", 30), new Player("Erica", 5));
     double averageAge1 = players.stream().mapToInt(player -> player.age).average().getAsDouble();
     double averageAge2 = players.stream().mapToInt(player -> player.age).reduce(0, Integer::sum) / players.size();
     double averageAge3 = players.stream().mapToInt(player -> player.age).reduce(0, (sum, age) -> sum + age) / players.size();
     System.out.println("average age " + averageAge1 + ", " + averageAge2 + ", or " + averageAge3);
   }
 }
 

Grouping/Partitioning

groupingBy method of Collectors allows grouping collection, e.g.

 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 
 
 public class GameGrouping {
   public static void main(String[] args) {
     Collection<Player> players = Arrays.asList(new Player("John", 10), new Player("David", 15), new Player("Matt", 20), new Player("Dan", 30), new Player("Erica", 5));
     Map<Integer, List<Player>> playersByAge = players.stream().collect(groupingBy(player -> player.age));
     System.out.println(playersByAge);
   }
 }
 

partitioningBy groups collection into two collection, e.g.

 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 
 
 public class GamePartition {
   public static void main(String[] args) {
     Collection<Player> players = Arrays.asList(new Player("John", 10), new Player("David", 15), new Player("Matt", 20), new Player("Dan", 30), new Player("Erica", 5));
     Map<Boolean, List<Player>> playersByAge = players.stream().collect(groupingBy(player -> player.age >= 18));
     System.out.println(playersByAge);
   }
 }
 

String joining

Here is an example of creating a string from collection:

 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 
 
 public class GameJoin {
   public static void main(String[] args) {
     Stream<Player> players = Stream.of(new Player("John", 10), new Player("David", 15), new Player("Matt", 20), new Player("Dan", 30), new Player("Erica", 5));
     System.out.println(players.map(Player::toString).collect(joining(",", "[", "]")));
   }
 }
 

Lazy Evaluation

Java Map interface now supports lazy evaluation by adding object to Map if the key is not present:

 import java.util.HashMap;
 import java.util.Map;
 
 public class Fib {
   private final static Map<Integer,Long> cache = new HashMap<Integer, Long>() {{
     put(0,0L);
     put(1,1L);
   }};
   public static long fib(int x) {
     return cache.computeIfAbsent(x, n -> fib(n-1) + fib(n-2));
   }
 
   public static void main(String[] args) {
     System.out.println(fib(10));
   }
 }
 

Parallel Streams

Though, streams processes elements serially but you can change stream() method of collection to parallelStream to take advantage of parallel processing of stream. Parallel stream use ForkJoinPool by default and use as many threads as you have processors (Runtime.getRuntime().availableProcessors()), e.g.

 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 public class ParStreamPrime {
   private static boolean isPrime(int n) {
     if (n%2==0) return false;
     for(int i=3;i*i<=n;i+=2) {
       if(n%i==0) return false;
     }
     return true;
   }
   private static long serialTest(int max) {
     long started = System.currentTimeMillis();
     IntStream.rangeClosed(1, max).forEach(num -> isPrime(num));
     return System.currentTimeMillis() - started;
   }
   private static long parallelTest(int max) {
     long started = System.currentTimeMillis();
     IntStream.rangeClosed(1, max).parallel().forEach(num -> isPrime(num));
     return System.currentTimeMillis() - started;
   }
   //
   public static void main(String[] args) {
     int max = 1000000;
     System.out.println("Serial " + serialTest(max));
     System.out.println("Parallel " + parallelTest(max));
   }
 }
 

If you need to customize thread pool size, you can create parallel stream inside fork-join-pool, e.g.

   private static void parallelTest(final int max) {
     ForkJoinPool forkJoinPool = new ForkJoinPool(2);
     forkJoinPool.submit(() ->
        IntStream.rangeClosed(1, max).parallel().forEach(num -> isPrime(num));
     ).get();
   }
 

Default methods and Mixins

For anyone who has to support interfaces for multiple clients knows the frustration of adding new methods because it requires updating all clients. You can now add default methods on interfaces and add static methods, e.g.

 interface Vehicle {
   float getMaxSpeedMPH();
   public static String getType(Vehicle v) {
     return v.getClass().getSimpleName();
   }
 }
 
 interface Car extends Vehicle {
   void drive();
   public default float getMaxSpeedMPH() {
     return 200;
   }
 }
 interface Boat extends Vehicle {
   void row();
   public default float getMaxSpeedMPH() {
     return 100;
   }
 }
 
 interface Plane extends Vehicle {
   void fly();
   public default float getMaxSpeedMPH() {
     return 500;
   }
 }
 
 public class AmphiFlyCar implements Car, Boat, Plane {
   @Override
   public void drive() {
     System.out.println("drive");
   }
   @Override
   public void row() {
     System.out.println("row");
   }
   @Override
   public void fly() {
     System.out.println("fly");
   }
   public float getMaxSpeedMPH() {
     return Plane.super.getMaxSpeedMPH();
   }
   public static void main(String[] args) {
     AmphiFlyCar v = new AmphiFlyCar();
     System.out.println(Vehicle.getType(v) + ": " + v.getMaxSpeedMPH());
   }
 }
 

Optional

One of biggest bane of Java applications is NullPointerException and you can get rid of those using Optional, which acts as Maybe monads in other languages, e.g.

 import java.util.HashMap;
 import java.util.Map;
 import static java.util.stream.Collectors.*;
 import java.util.stream.Stream;
 import java.util.Optional;
 
 
 public class OptionalExample {
   private static Map<String, Player> players = new HashMap<String, Player>() {{
     put("John", new Player("John", 10));
     put("David", new Player("David", 15));
     put("Matt", new Player("Matt", 20));
     put("Erica", new Player("Erica", 25));
   }};
 
   private static Optional<Player> findPlayerByName(String name) {
     Player player = players.get(name);
     return player == null ? Optional.empty() : Optional.of(player);
   }
 
   private static Integer getAge(Player player) {
     return player.age;
   }
 
 
   public static void main(String[] args) {
     findPlayerByName("John").ifPresent(System.out::println);
     Player player = findPlayerByName("Jeff").orElse(new Player("Jeff", 40));
     System.out.println("orElse " + player);
     Integer age = findPlayerByName("Jeff").map(OptionalExample::getAge).orElse(-1);
     System.out.println("Jeff age " + age);
   }
 }
 

CompletableFuture

Java has long supported future, but previously you had to call blocking get() to retrieve the result. With Java 8, you can use CompletableFuture to define the behavior when asynchronous processing is completed, e.g.

 private static final ExecutorService executor = Executors.newFixedThreadPool(2);
 public static CompletableFuture getPlayer(String name) {
    return CompletableFuture.supplyAsync(() -> new Player(), executor);
 }
 getQuote("name").thenAccept(player -> System.out.println(player));
 
 

Summary

Over the past few years, functional programming languages have become mainstream and Java 8 brings many of those capabilites despite being late. These new features help write better and more concise code. You will have to change existing code to make more use of immutable objects and use streams instead of objects when possible. As far as stability, I found java 8 compiler on Linux environment a bit buggy that crashed often so it may take a little while before Java 8 can be used in production.


September 12, 2013

NFJS Seattle 2013 Review

Filed under: Computing — admin @ 7:38 pm

I attended NFJS conference over the weekend. It was a short conference from Friday, Sep 6 to Sunday, Sep 8. It had a number of sessions on Java 8, Javascript, Mobile, and functional areas. Here are some of the sections that I enjoyed:

Java 8 Language Capabilities

This session gave a brief overview of new features of Java 8 mainly new closure/lambda syntax. Venkat is a great speaker and he was coding live throughout the session.

Concurrency without Pain in Pure Java

This was related to first talk by Venkat Subramaniam and covered a number of patterns such as Actors and STM for concurrency.

Functional SOLID

In this talk Matt Stine gave overview of SOLID principles and how functional programming make it easier to apply these patterns. This was more abstract talk and didn’t go into examples of those patterns.

Programming with Immutability

This was more practical talk by Matt Stine and he gave examples of immutability and functional programming in Java and Groovy with live coding. He mentioned a number of tools that can make it easier to build mutable and immutable pair of classes and how immutable classes can be used in other frameworks such as Hibernate.

Rich Web Apps with Angular

This was a short introduction to Angular by Raju Gandhi.

Vagrant: Virtualized Development Environments Made Simple

This was a great introduction to Vagrant and how to setup a complete development, testing and production environments on your local desktop.

Simulation Testing with Simulant

This was a short introduction to Simulant testing library that Stuart Halloway has been using for testing Datomic database. This testing library can be used for functional testing and load testing. It saves all data in datomic database and can be populated from existing data.

Generative Testing

This was another session of testing framework by Stuart Halloway. This framework provides great support for generating test data and is somewhat similar to QuickCheck, though it doesn’t offer reduction.

Summary

I didn’t go to OSCON this year and enjoyed smaller pavilion that NFJS provided. There were a couple of dud sessions, but overall I enjoyed it.


March 6, 2013

Tour of iOS game – ‘Passion Investment Portfolio’

Filed under: iPhone development — admin @ 9:25 pm

I recently finished an iOS (iPhone/iPad/iPod Touch) game called “Passion Investment Portfolio”, which provides a virtual stock trading for learning purpose. A unique feature of this application is that it chooses stocks for you based on your lifestyle. When you join this game, you receive $10,000 virtual cash money and then you are asked to add your life passions, interests, hobbies, products you like, stores where you shop, etc. The Passion Investment Portfolio game then finds companies that match your passions. It keeps updating those companies on daily basis and gives away free stocks from your matching companies. You can watch which companies grow with time and customize your favorite companies. The Passion Investment Portfolio game also provides analysts ratings, charts, fundamental data and news for your companies. Here is a short tour of the app:

Adding your passions

On iPhone/iPod touch, you can just select ‘My Passions’ from the menu, then choose category of passion. You can add new passions by touching (+) icon. You can also edit your existing passions by touching them and rewriting your passions. If you need to delete any passion, then select Edit option and touch (-) button.



On iPad, you can see categories and list of passions on the same screen but the process of adding or removing passions is same as iPhone, e.g.


Your portfolio

When you initially join “Passion Investment Portfolio”, you don’t have any existing stocks, but as you play the game, you will see how your portoflio grows over time, e.g.




Your Passion Companies

You can view your passion companies by selecting “My Passion Quotes” from the menu. You will be able to see all companies next to each passion along with delayed quotes. Also, you will be able to see thumbs up sign if the analysts rating is buy and thumbs down sign if analysts rating is sell.



Your Stocks

You can view all the stocks you own by selecting “My Stocks” from the menu. You will also see latest quotes for those companies and gain/loss for your stocks based on market prices. If you choose to dump the stocks, then just touch “Sell” button.



Your Capital Gains/Loss

When you sell stocks, the difference between initial purchase and final sell price is calculated and is recorded as your capital gain/loss. You can view history of all your capital gains by selecting “My Capital Gains”:

Orders History

When you buy or sell stocks or the game buys stocks for free giveaways, an order is created. You can browse past orders by selecting “My Orders”.



Quote Lookup

You can lookup any stock by symbol, name or description using “Quote Lookup” option. You can view delayed quotes, fundamental data, analysts rating and news.



Badges and Leaderboard

The “Passion Investment Portfolio” game gives away badges based on performance of your portfolio and top users are listed on the leaderboard.

Help and Feedback

The “Passion Investment Portfolio” also comes with help tips and a quick way to send any feedback, suggestions and comments.

Screencasts

You can also watch at for iPhone and for iPad respectively.

Download

You can download the app from Apple Appstore. Here are a few promotion codes that you can use to try the game (first come first serve):

RRTWFNNJRTAY
3WM96W9NLYTK
KLR7PE963K7E
NWXWJ7ATY6R7
7NHREJHWJMTH
XJPFHLAAXJLW
PTJXP6XXFRP3
FHLNATLKNRR3
X3KHXXT9LAY6
KKMJWA76MW7W
FHYR6XFLXJR3
3F6HHETJWR36
K6XJLMARMMET
H63RRP44KWWH
3P7KH763TEMA
X7NA4R7YN3H7
MX43TPNEJLL

Twitter

Follow the app at http://twitter.com/#!/passionportfol.


October 8, 2012

Comparing Server side Websockets using Atmosphere, Netty, Node.js and Vertx.io

Filed under: Javascript — admin @ 1:31 pm

My last two bogs (Pub/sub in Node.js and Websockets and Scaling Node.js) described how I have been evaluating Node.js and Websockets for streaming data. I would describe results of that evaluation along with a few other frameworks that I have been testing.

Use Case

The primary use case for my application is pub/sub where clients subscribe to some channel and receive updates such as subscribing for stock quotes or notifications about orders.

Node.js

In my last blog, I described all the issues I had with socket.io library for Node.js and after a lot of frustration I gave up on it. Instead, I used WS library for WebSockets and though it doesn’t provide fallback options for other protocols but’s it much more stable and efficient.

Pub/Sub Server

Here is the pub/sub server that provides methods to subscribe some channel identifier and sends update to a random channel continuously.

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 5;
 var MAX_THREADS = process.env.MAX_THREADS || 5;
 var MAX_ROWS = process.env.MAX_ROWS || 10;
 var SEND_MESSAGE_INTERVAL = SEND_MESSAGE_INTERVAL || 1;
 
 var nextClientId = 0;
 var errors = 0;
 //
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')(OUTLIER_SIZE);
 var cluster = require('cluster');
 hwUsage.start();
 
 if (cluster.isMaster) {
     for (var i = 0; i < MAX_THREADS; i++) {
         cluster.fork();
     }
     cluster.on('exit', function(worker, code, signal) {
     });
     console.log('clients, errors, ' + metrics.heading() + ',' + hwUsage.heading());
 } else {
     var pubsubController = require('pubsub_controller')(cluster.worker.id, MAX_ROWS, hwUsage, metrics);
     var WebSocketServer = require('ws').Server
     , wss = new WebSocketServer({port: 8124});
     wss.on('connection', function(ws) {
         ws.key = ++nextClientId;
         ws.on('close', function() {
             pubsubController.unsubscribeAll(ws);
         });
         ws.on('error', function() {
             errors++;
         });
         ws.on('message', function(text) {
             var msg = JSON.parse(text);
             if (msg.action === 'subscribe') {
                 pubsubController.subscribe(ws, msg.identifier);
             } else if (msg.action === 'unsubscribe') {
                 pubsubController.subscribe(ws, msg.identifier);
             }
         });
     });
 
     setInterval(function() {
         pubsubController.push();
     }, SEND_MESSAGE_INTERVAL);
     setInterval(function() {
         hwUsage.stop(function(usage) {
             console.log(nextClientId+ ',' + errors + metrics.summary().toString() + ',' + usage.toString());
         });
     }, 15000);
 }
 
 
 process.on('uncaughtException', function(err) {
     console.log("GLOBAL " + err + "\n" + err.stack);
 });
 
 

Subscription management

Following module defines APIs for managing subscriptions and as I mentioned when number of messages received by client reaches maxMessages, it unsubscribes from the server.

 var helper = require('helper');
 
 module.exports = function(workerId, numElements, hwUsage, metrics) {
     var channelSubscribers = {};
     var nextRequestId = 0;
     var allKeys = {};
     allKeys[workerId] = [];
     var generatePayload = function(identifier) {
         var elements = [];
         var date = new Date();
         for (var i=0; i<numElements; i++) {            elements.push({sequence:i, price: helper.random(100), identifierId:identifier, symbol:"QQQ", transaction:"STO", qty: helper.random(200)});
         }
         var reqId = Number(++nextRequestId + '.' + workerId);
 
         var payload = {request: reqId, rows:numElements, timestamp : date.getTime(), identifier:Number(identifier), elements: elements};
         return payload;
     };
     var makeKey = function(socket) {
         return workerId + '_' + socket.key;
     };
     var getSubscriber = function(socket) {
         return channelSubscribers[makeKey(socket)];
     };
     var setSubscriber = function(socket, rec) {
         channelSubscribers[makeKey(socket)] = rec;
     };  
     var removeSubscribers = function(socket) {
         delete channelSubscribers[makeKey(socket)];
     };  
     return {
         subscribe: function(socket, identifier) {
             var rec = getSubscriber(socket);
             if (typeof rec === "undefined") {
                 rec = {socket : socket, identifiers : []};
             }
             rec.identifiers.push(identifier);
             allKeys[workerId].push(makeKey(socket)); 
             setSubscriber(socket, rec);
         },  
         unsubscribe: function(socket, identifier) {
             var rec = getSubscriber(socket);
             if (typeof rec === "undefined") {
                 return false;
             }
             var ndx = rec.identifiers.indexOf(identifier);
             rec.identifiers.splice(ndx, 1);
             if (rec.identifiers.length == 0) {
                 removeSubscribers(socket);
             } else {
                 setSubscriber(socket, rec);
             }
         },
         unsubscribeAll: function(socket) {
             removeSubscribers(socket);
         },
         count: function() {
             return helper.size(channelSubscribers);
         },
         push: function() {
             if (allKeys[workerId].length == 0) return 0;
             var key = allKeys[workerId][helper.random(allKeys[workerId].length)];
             var rec = channelSubscribers[key];
             if (typeof rec === "undefined") {
                 return -1;
             }
             //
             for (var i=0; i<rec.identifiers.length; i++) {
                 var identifier = rec.identifiers[i];
                 try {
                     var request = generatePayload(identifier);
                     var text = JSON.stringify(request);
                     rec.socket.send(text);
                     metrics.update(nextRequestId, new Date().getTime(), text.length);
                 } catch (err) {
                     console.log('failed to send execution report' + err);
                     try {
                         rec.socket.disconnect();
                     } catch (ex) {
                         console.log('failed to close socket ' + ex);
                     }
                     delete channelSubscribers[key];
                 }
             }
         }
     };
 }
 
 

I am skipping some of helper modules, but you can find them from the source code.

Vertx.io

The Vertx.io framework follows Node.js design for single-thread event loop with asynchronous I/O but is written in Java. It also provides polyglot support for a number of languages such as Java, Javascript, Ruby, Groovy, and Python. Here is similar implementation of Pub/Sub server in Java:

 package com.plexobject.vertx;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 
 import org.vertx.java.core.Handler;
 import org.vertx.java.core.buffer.Buffer;
 import org.vertx.java.core.http.HttpServerRequest;
 import org.vertx.java.core.http.ServerWebSocket;
 import org.vertx.java.deploy.Verticle;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class VertxServer extends Verticle {
     private static final Logger logger = LoggerFactory.getLogger(VertxServer.class);
     private final ObjectMapper mapper = new ObjectMapper();
     private final JsonFactory jsonFactory = mapper.getJsonFactory();
     private Map<String, Set<ServerWebSocket>> subscriptions = new ConcurrentHashMap<String, Set<ServerWebSocket>>();
     private Timer sendTimer = new Timer(true);
     private Timer metricsTimer = new Timer(true);
     private final AtomicLong nextRequestId = new AtomicLong();
     private final Metrics metrics = new Metrics();
     private final StringBuilder elements = new StringBuilder();
     private final Random random = new Random();
     private int maxIdentifier;
 
     public VertxServer() {
         logger.info("identifiers," + Metrics.getHeading());
         for (int i=0; i<500; i++) {
             elements.append(String.valueOf(i));
         }
         metricsTimer.schedule(new TimerTask() {
             @Override
             public void run() {
                 logger.info(subscriptions.size() + metrics.getSummary().toString());
             }
         }, 5000, 15000);
         sendTimer.schedule(new TimerTask() {
             @Override
             public void run() {
                while (true) {
                   sendMessageForRandomIdentifier();
                }
             }
         }, 1000);
     }
     public void start() {
         vertx.createHttpServer().setAcceptBacklog(10000).websocketHandler(new Handler<ServerWebSocket>() {
             public void handle(final ServerWebSocket ws) {
                 if (ws.path.equals("/channels")) {
                     ws.dataHandler(new Handler<Buffer>() {
                         public void handle(Buffer data) {
                             try {
                                 JsonParser jp = jsonFactory.createJsonParser(data.toString());
                                 JsonNode jsonObj = mapper.readTree(jp);
                                 String action = jsonObj.get("action").asText();
                                 String identifier = jsonObj.get("identifier").asText();
                                 if (action == null || identifier == null) {
                                     return;
                                 }
                                 if ("subscribe".equals(action)) {
                                     maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier);
                                     logger.info("Max " + maxIdentifier);
                                     Set<ServerWebSocket> sockets = subscriptions.get(identifier);
                                     if (sockets == null) {
                                         sockets = new HashSet<ServerWebSocket>();
                                         subscriptions.put(identifier, sockets);
                                     }
                                     sockets.add(ws);
                                 } else if ("unsubscribe".equals(action)) {
                                     Set<ServerWebSocket> sockets = subscriptions.get(identifier);
                                     if (sockets == null) {
                                         return;
                                     }
                                     sockets.remove(ws);
                                     if (sockets.size() == 0) {
                                         subscriptions.remove(identifier);
                                     }
                                 }
                             } catch (IOException e) {
                                 logger.error("Failed to handle " + data, e);
                             }
                         }
                     });
                 } else {
                     ws.reject();
                 }
             }
         }).requestHandler(new Handler<HttpServerRequest>() {
             public void handle(HttpServerRequest req) {
                 //if (req.path.equals("/")) req.response.sendFile("websockets/ws.html"); // Serve the html
             }
         }).listen(8080);
     }
 
     //
     private String getRandomIdentifier() {
         return String.valueOf(Math.abs(random.nextInt(maxIdentifier)) + 1);
     }
 
     public void sendMessageForRandomIdentifier() {
         if (subscriptions.size() == 0) {
             return;
         }
         String identifier = getRandomIdentifier();
         if (identifier == null) {
             return;
         }
         Set<ServerWebSocket> sockets = subscriptions.get(identifier);
         if (sockets == null || sockets.size() == 0) {
             return;
         }
         final long t = System.currentTimeMillis();
         String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}";
         for (ServerWebSocket ws : sockets) {
             try {
                 ws.writeTextFrame(msg);
             } catch (Exception e) {
                 subscriptions.remove(ws);
                 logger.error("Failed to send " + msg, e);
             }
         }
         metrics.update(msg);
     }
 }
 
 

Note that Vertx.io requires JDK 7 and here is how you can start the server:

 export PATH=/home/sbhatti/jdk1.7.0_10/bin:$PATH
 export JAVA_HOME=/home/sbhatti/jdk1.7.0_10
 mvn package
 bin/vertx run com.plexobject.vertx.VrtxServer -cp target/classes/ -instances=5
 

Note that I started Vertx.io with 5 instances of the server to match Node.js’ clustering options.

Netty

Netty.io is a lightweight application server that provides asynchronous events and I/O so I also tested its Websocket’s support in its latest 4.0-beta version.

Bootstrap

Here is the main server implementation that bootstraps web-socket server:

 package com.plexobject.netty.server;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.socket.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 
 public class WebSocketServer {
 
         private final int port;
 
         public WebSocketServer(int port) {
                 this.port = port;
         }
 
         public void run() throws Exception {
                 ServerBootstrap b = new ServerBootstrap();
                 try {
                         b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                                         .channel(NioServerSocketChannel.class).localAddress(port)
                                         .childHandler(new WebSocketServerInitializer());
 
                         Channel ch = b.bind().sync().channel();
 
                         ch.closeFuture().sync();
                 } finally {
                         b.shutdown();
                 }
         }
 
         public static void main(String[] args) throws Exception {
                 int port;
                 if (args.length > 0) {
                         port = Integer.parseInt(args[0]);
                 } else {
                         port = 8124;
                 }
                 new WebSocketServer(port).run();
         }
 }
 
 

Pub/Sub Server

Here is implementation of websocket handler that provides pub/sub functionality:

 package com.plexobject.netty.server;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.math.BigInteger;
 import java.security.SecureRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
 import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
 import static io.netty.handler.codec.http.HttpMethod.GET;
 import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundMessageHandlerAdapter;
 import io.netty.handler.codec.http.DefaultHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 import io.netty.logging.InternalLogger;
 import io.netty.logging.InternalLoggerFactory;
 import io.netty.util.CharsetUtil;
 
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.ObjectMapper;
 
 public class WebSocketServerHandler extends ChannelInboundMessageHandlerAdapter<Object> {
     @SuppressWarnings("unused")
     private static final InternalLogger LOGGER = InternalLoggerFactory
             .getInstance(WebSocketServerHandler.class);
     private static final ObjectMapper jsonMapper = new ObjectMapper();
     private static final JsonFactory jsonFactory = jsonMapper.getJsonFactory();
 
     private static final String WEBSOCKET_PATH = "/";
     private WebSocketServerHandshaker handshaker;
     private static Map<String, Set<ChannelHandlerContext>> subscriptions = new ConcurrentHashMap<String, Set<ChannelHandlerContext>>();
     private static Timer timer = new Timer(true);
     private static final AtomicLong REQUEST_ID = new AtomicLong();
     private static final Metrics METRICS = new Metrics();
 
     static {
         timer.schedule(new TimerTask() {
             @Override
             public void run() {
                while (true) {
                   sendMessageForRandomIdentifier();
                }
             }
         }, 1000);
     }
 
     @Override
     public void messageReceived(ChannelHandlerContext ctx, Object msg)
             throws Exception {
         if (msg instanceof HttpRequest) {
             handleHttpRequest(ctx, (HttpRequest) msg);
         } else if (msg instanceof WebSocketFrame) {
             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
         }
     }
 
     private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req)
             throws Exception {
         // Allow only GET methods.
         if (req.getMethod() != GET) {
             sendHttpResponse(ctx, req, new DefaultHttpResponse(HTTP_1_1,
                     FORBIDDEN));
             return;
         }
         if (req.getUri().equals("/favicon.ico")) {
             HttpResponse res = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
             sendHttpResponse(ctx, req, res);
             return;
         }
 
         // Handshake
         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                 getWebSocketLocation(req), null, false);
         handshaker = wsFactory.newHandshaker(req);
         if (handshaker == null) {
             wsFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
         } else {
             handshaker.handshake(ctx.channel(), req);
         }
     }
 
     private void handleWebSocketFrame(ChannelHandlerContext ctx,
             WebSocketFrame frame) throws IOException {
         // Check for closing frame
         if (frame instanceof CloseWebSocketFrame) {
             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame);
             return;
         } else if (frame instanceof PingWebSocketFrame) {
             ctx.channel().write(new PongWebSocketFrame(frame.getBinaryData()));
             return;
         } else if (!(frame instanceof TextWebSocketFrame)) {
             throw new UnsupportedOperationException(String.format(
                     "%s frame types not supported", frame.getClass().getName()));
         }
 
         String jsonText = ((TextWebSocketFrame) frame).getText();
         JsonParser jp = jsonFactory.createJsonParser(jsonText);
         JsonNode actualObj = jsonMapper.readTree(jp);
         String action = actualObj.get("action").getTextValue();
         String identifier = actualObj.get("identifier").getTextValue();
         if (action == null || identifier == null) {
               return;
         }
         if ("subscribe".equals(action)) {
             Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
             if (contexts == null) {
                 contexts = new HashSet<ChannelHandlerContext>();
                 subscriptions.put(identifier, contexts);
             }
             contexts.add(ctx);
         } else if ("unsubscribe".equals(action)) {
             Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
             if (contexts == null) {
                 return;
             }
             contexts.remove(ctx);
             if (contexts.size() == 0) {
                 subscriptions.remove(identifier);
             }
         }
     }
 
     private static String getRandomIdentifier() {
         List<String> identifiers = new ArrayList<String>(subscriptions.keySet());
         if (identifiers.size() == 0) {
             return null;
         }
         int n = new Random().nextInt(identifiers.size());
         return identifiers.get(n);
     }
 
 
     public static void sendMessageForRandomIdentifier() {
         String identifier = getRandomIdentifier();
         if (identifier == null) {
             return;
         }
         Set<ChannelHandlerContext> contexts = subscriptions.get(identifier);
         if (contexts == null || contexts.size() == 0) {
             return;
         }
         SecureRandom random = new SecureRandom();
         String elements = new BigInteger(500, random).toString(32);
         String json = "{\"request\":" + REQUEST_ID.incrementAndGet() + ", \"timestamp\":" + System.currentTimeMillis() + ", \"identifier\":\"" + identifier + "\", \"elements\": \"" + elements + "\"}";
         TextWebSocketFrame frame = new TextWebSocketFrame(json);
         METRICS.update(json);
         for (ChannelHandlerContext ctx : contexts) {
             try {
                 ctx.channel().write(frame);
             } catch (Exception e) {
                 subscriptions.remove(ctx);
             }
         }
         if (System.currentTimeMillis() % 1000 == 0) {
             System.out.println("identifiers," + Metrics.getHeading());
             System.out.println(subscriptions.size() + METRICS.getSummary().toString());
         }
     }
 
     private static void sendHttpResponse(ChannelHandlerContext ctx,
             HttpRequest req, HttpResponse res) {
         // Generate an error page if response status code is not OK (200).
         if (res.getStatus().getCode() != 200) {
             res.setContent(Unpooled.copiedBuffer(res.getStatus().toString(),
                     CharsetUtil.UTF_8));
             setContentLength(res, res.getContent().readableBytes());
         }
 
         // Send the response and close the connection if necessary.
         ChannelFuture f = ctx.channel().write(res);
         if (!isKeepAlive(req) || res.getStatus().getCode() != 200) {
             f.addListener(ChannelFutureListener.CLOSE);
         }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
             throws Exception {
         if (cause instanceof java.nio.channels.ClosedChannelException == false) {
             //cause.printStackTrace();
         }
         ctx.close();
     }
 
     private static String getWebSocketLocation(HttpRequest req) {
         return "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
     }
 }
 
 

As, you can see its implementation is a bit verbose and I am skipping some of other boiler code.

Atmosphere

Atmosphere is another Java framework that provides support for Websockets so I tried it as well and here is its implementation:

 package com.plexobject;
 
 import org.atmosphere.config.service.WebSocketHandlerService;
 import org.atmosphere.cpr.Broadcaster;
 import org.atmosphere.cpr.BroadcasterFactory;
 import org.atmosphere.websocket.WebSocket;
 import org.atmosphere.websocket.WebSocketHandler;
 import org.atmosphere.websocket.WebSocketProcessor.WebSocketException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 
 import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.atmosphere.cpr.AtmosphereServlet;
 import org.atmosphere.websocket.WebSocketHandler;
 
 @WebSocketHandlerService
 public class ChannelResponseHandler extends WebSocketHandler {
     private static final Logger logger = LoggerFactory.getLogger(ChannelResponseHandler.class);
 
     private static final Metrics METRICS = new Metrics();
     private static final String PATH_PREFIX = "/channels";
     private boolean started;
     private int maxIdentifier;
     private final Random random = new Random();
     private Timer timer = new Timer(true);
     private final ObjectMapper mapper = new ObjectMapper();
     private final JsonFactory jsonFactory = mapper.getJsonFactory();
     private final AtomicLong opened = new AtomicLong();
     private final AtomicLong closed = new AtomicLong();
     private final AtomicLong nextRequestId = new AtomicLong();
     private final StringBuilder elements = new StringBuilder();
 
     public ChannelResponseHandler() {
         for (int i=0; i<500; i++) {
             elements.append(String.valueOf(i));
         }
     }
 
     @Override
     public void onTextMessage(WebSocket webSocket, String msg) {
         try {
             JsonParser jp = jsonFactory.createJsonParser(msg);
             JsonNode jsonObj = mapper.readTree(jp);
             String action = jsonObj.get("action").asText();
             String identifier = jsonObj.get("identifier").asText();
             logger.debug("onMessage " + action + " - " + identifier);
             if (action == null || identifier == null) {
                   return;
             }
             if ("subscribe".equals(action)) {
                 getBroadcaster(identifier).addAtmosphereResource(webSocket.resource());
                 maxIdentifier = Math.max(Integer.parseInt(identifier), maxIdentifier);
             } else if ("unsubscribe".equals(action)) {
                 getBroadcaster(identifier).removeAtmosphereResource(webSocket.resource());
             }
             start();
         } catch (Exception e) {
             logger.error("Failed to handle message " + msg, e);
         }
     }
 
     @Override
     public void onOpen(WebSocket webSocket) {
         logger.trace("onOpen {}", webSocket.resource().getRequest());
         webSocket.resource().suspend();
         opened.incrementAndGet();
     }
 
     @Override
     public void onClose(WebSocket webSocket) {
         logger.trace("onClose");
         getBroadcaster().removeAtmosphereResource(webSocket.resource());
         super.onClose(webSocket);
         closed.incrementAndGet();
     }
 
     @Override
     public void onError(WebSocket webSocket, WebSocketException t) {
         System.err.println("error " + t);
         logger.trace("onError {}", t.getStackTrace());
         super.onError(webSocket, t);
     }
 
     public void sendMessageForRandomIdentifier() {
         try {
             if (opened.get() <= closed.get()) {
                 return;
             }
             String identifier = getRandomIdentifier();
             Broadcaster bc = getBroadcaster(identifier);
             if (bc == null) {
                 logger.debug("Failed to find broadcaster for " + identifier);
                 return;
             }
             long t = System.currentTimeMillis();
             String msg = "{\"request\":" + nextRequestId.incrementAndGet() + ", \"timestamp\":" + t + ", \"identifier\":" + identifier + ", \"elements\": \"" + elements + "\"}";
             bc.broadcast(msg);
             METRICS.update(msg);
             if (t % 1000 == 0) {
                 logger.info(METRICS.getSummary().toString());
             }
         } catch (Exception e) {
             logger.error("Failed to send message", e);
         }
     }
     private Broadcaster getBroadcaster() {
         return getBroadcaster(null);
     }
     //
     private Broadcaster getBroadcaster(String identifier) {
         String path = identifier == null ? PATH_PREFIX : PATH_PREFIX + "/" + identifier;
         BroadcasterFactory factory = BroadcasterFactory.getDefault();
         return factory != null ? factory.lookup(path, true) : null;
     }
 
     synchronized void start() {
         if (started) return;
         started = true;
         timer.schedule(new TimerTask() {
                 @Override
                 public void run() {
                   while (started) {
                      sendMessageForRandomIdentifier();
                   }
                 }
             }, 1000);
     }
 
     synchronized void stop() {
         timer.cancel();
     }
 
     private String getRandomIdentifier() {
         return String.valueOf(random.nextInt(maxIdentifier) + 1);
     }
 
     public static void main(String[] arguments) throws Exception {
         try {
             Server server = new Server();
             Connector con = new SelectChannelConnector();
             con.setPort(8124);
             server.addConnector(con);
             ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
             context.setContextPath("/");
             server.setHandler(context);
             context.addServlet(new ServletHolder(new AtmosphereServlet()), "/*");
             server.start();//WebSocketHandler
             server.join();
         } catch (Exception ex) {
             System.err.println(ex);
         }
     }
 }
 
 

Note that I had to deploy Atmosphere in Jetty 8.1.7 as a war file and here is web.xml

 <?xml version="1.0" encoding="UTF-8"?>
 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
          id="WebApp_ID" version="2.5">
 
     <display-name>WebSocket Load Test</display-name>
     <servlet>
         <description>AtmosphereServlet</description>
         <servlet-name>AtmosphereServlet</servlet-name>
         <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
         <load-on-startup>1</load-on-startup>
     </servlet>
     <servlet-mapping>
         <servlet-name>AtmosphereServlet</servlet-name>
         <url-pattern>/channels/*</url-pattern>
     </servlet-mapping>
 </web-app>
 
 

Client

I previously wrote client in Javascript but I rewrote in Java so that I can use Java’s concurrent library to manage counters.

WebSocket Client

I tried Netty and weberknecht libraries and settled on weberknecht. Here is its implementation:

 package com.plexobject.websocket.client;
 
 import java.io.BufferedReader;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Map;
 import de.roderick.weberknecht.*;
 
 public class WebSocketClient implements ClientProtocol {
     private WebSocket webSocket;
         private final String identifier;
 
         public WebSocketClient(final String identifier, final URI server, final ClientEventListener listener) throws Exception {
                 this.identifier = identifier;
 
         webSocket = new WebSocketConnection(server);
         webSocket.setEventHandler(new WebSocketEventHandler() {
 
             @Override
             public void onClose() {
                         listener.onClose();
             }
 
             @Override
             public void onMessage(WebSocketMessage msg) {
                             listener.onMessage(identifier, msg.getText());
             }
 
             @Override
             public void onOpen() {
                         listener.onOpen();
             }
         });
         webSocket.connect();
         }
 
     @Override
     public void close() throws Exception {
         webSocket.close();
     }
     @Override
     public void send(String text) throws Exception {
         webSocket.send(text);
     }
 
     @Override
     public String getIdentifier() {
         return identifier;
     }
 }
 
 

Kaazing

I also tested Kaazing a bit but it is a commercial software and their trial license limited connections so I gave up on it.

Load Tester

The load tester takes arguments about how many connections to open and how long to run the test. It generates identifiers from 1 to 10,000 and sends subscription message to the server for each of those channel identifiers. It then keeps track of messages received in a helper class Metrics and logs that information on regular interval, i.e.,

 package com.plexobject.websocket.client;
 
 
 import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.net.URI;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
 import java.util.concurrent.atomic.AtomicLong;
 
 public class LoadTester implements ClientEventListener {
     private static final AtomicLong OPEN_CONNECTIONS = new AtomicLong();
     private static final AtomicLong CLOSE_CONNECTIONS = new AtomicLong();
     private static final AtomicLong REQUEST_ID = new AtomicLong();
     private static final Metrics METRICS = new Metrics();
     private List<ClientProtocol> clients = new ArrayList<ClientProtocol>();
     private static final List<Integer> identifiers = new ArrayList<Integer>();
     static {
         for (int i=1; i<=10000; i++) {
             identifiers.add(i);
         }
     }
 
     public LoadTester(String url, int maxConnections) throws Exception {
         try {
             URI uri = new URI(url);
             for (int i=0; i<maxConnections; i++) {
                 String identifier = String.valueOf(identifiers.get(i%identifiers.size()));
                 ClientProtocol client = new WebSocketClient(identifier, uri, this);
                 String json = "{\"action\":\"subscribe\", \"identifier\":\"" + identifier + "\"}";
                 client.send(json);
                 clients.add(client);
                 try {
                     Thread.sleep(1);
                 } catch (InterruptedException e) {
                 }
             }
         } catch (OutOfMemoryError e) {
             System.err.println(e);
             System.exit(0);
         }
     }
 
     //
     public void close() {
         for (ClientProtocol client : clients) {
             try {
                 client.close();
             } catch (Exception e) {
             }
         }
         System.exit(0);
     }
 
     @Override
     public void onError(Throwable e) {
         System.err.println(e);
     }  
 
     @Override
     public void onMessage(String identifier, String message) {
         String act = METRICS.update(message);
         if (!identifier.equals(act)) {
             System.out.println("Expected " + identifier + ", but received " + act + " for " + message);
         }
     }
 
     @Override
     public void onClose() {
         CLOSE_CONNECTIONS.incrementAndGet();
     }
 
     @Override
     public void onOpen() {
         OPEN_CONNECTIONS.incrementAndGet();
     }
     public static void main(String[] args) throws Exception {
         if (args.length == 0) {
             System.err.println("Usage client.LoadTester url connections max-runtime-in-minutes");
             System.exit(1);
         }
         final String url = args[0]; // "ws://localhost:8124/"
         final int maxConnections = Integer.parseInt(args[1]);
         final int maxRuntimeMins = Integer.parseInt(args[2]);
         System.out.println("connections,connected," + Metrics.getHeading());
         final long started = System.currentTimeMillis();
         final Timer timer = new Timer(true);
         timer.schedule(new TimerTask() {
                 @Override
                 public void run() {
                     System.out.println(maxConnections + "," + (OPEN_CONNECTIONS.get()-CLOSE_CONNECTIONS.get()) + "," + METRICS.getSummary());
                     long elapsed = System.currentTimeMillis() - started;
                     if (elapsed > (maxRuntimeMins * 1000 * 60)) {
                         System.out.println("time done");
                         System.exit(0);
                     }
                 }
             }, 5000, 15000);
         //
         LoadTester runner = new LoadTester(url, maxConnections);
         while (true) {
             try {
                 Thread.sleep(5000);
             } catch (InterruptedException e) {
             }
             //
             if (CLOSE_CONNECTIONS.get() == maxConnections) {
                 System.out.println("all clients closed");
                 runner.close();
             }
         }
     }
 
 }
 
 
 

I am skipping some of helper classes and interfaces, but you can find them from the source code.

Test Script

I created a simple shell script to launch load test for 5 minutes and given number of connections and collected results in a comma delimited file, i.e,

 #!/bin/bash 
 mvn package
 export MAVEN_OPTS="-Xmx1000m -Xss128k"
 export url=ws://localhost:8080/atmosphere-1.0/channels
 #export url=ws://localhost:8080/channels
 #export url=ws://localhost:8124/
 clients=500
 while [ $clients -lt 100000 ]
 do
    mvn exec:java -Dexec.mainClass="com.plexobject.websocket.client.LoadTester" -Dexec.classpathScope=runtime -Dexec.args="$url $clients 5"
    clients=`expr $clients + 500|awk '{print $1}'`
 done
 
 

Note that I specified stack size of thread to be 128k as default stack size per thread is 2MB, which takes a lot of memory.

TCP Settings

As I mentioned in my last blog, I made some changes to my Linux machine to improve TCP settings, i.e.,

 net.core.rmem_max = 33554432
 net.core.wmem_max = 33554432
 net.ipv4.tcp_rmem = 4096 16384 33554432
 net.ipv4.tcp_wmem = 4096 16384 33554432
 net.ipv4.tcp_mem = 786432 1048576 26777216
 net.ipv4.tcp_max_tw_buckets = 360000
 net.core.netdev_max_backlog = 2500
 vm.min_free_kbytes = 65536
 vm.swappiness = 0
 net.ipv4.ip_local_port_range = 1024 65535
 

I then applied above changes using:

 sudo sysctl -w net.core.somaxconn=10000
 sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000 
 sudo sysctl -p
 

Results

Here are results of testing up to 5000 connections, where server sends a message to a random channel continuously:

Atmosphere

connections connected average response (ms) average size messages msg/sec walltime(secs)
500 500 1 1462 282431 943.317 301
1000 1000 1 1462 282100 940.967 302
1500 1500 1 1463 274156 920.233 304
2000 2000 1 1463 268180 936.733 309
2500 2500 1 1463 263443 926 307
3000 2985 1 1463 280809 932.233 332
3500 3500 2 1463 278412 934.333 384

 

 

 

Netty

connections connected average response (ms) average size messages msg/sec walltime(secs)
500 500 0.000 172.000 284165 937.833 305.000
1000 1000 0.000 172.000 279883 921.717 305.000
1500 1500 0.000 173.000 290317 959.983 305.000
2000 2000 0.000 173.000 286110 946.467 305.000
2500 2500 0.000 174.000 285122 949.550 305.000
3000 3000 0.000 174.000 281650 939.150 305.000
3500 3500 0.000 174.000 282833 942.700 305.000
4000 4000 0.000 174.000 283829 948.700 305.000
4500 4500 0.000 174.000 273293 921.583 305.000
5000 5000 0.000 174.000 264433 881.733 305.000
1000 1000 0.000 172.000 279883 921.717 305.000
1500 1500 0.000 173.000 290317 959.983 305.000
2000 2000 0.000 173.000 286110 946.467 305.000
2500 2500 0.000 174.000 285122 949.550 305.000
3000 3000 0.000 174.000 281650 939.150 305.000
3500 3500 0.000 174.000 282833 942.700 305.000
4000 4000 0.000 174.000 283829 948.700 305.000
4500 4500 0.000 174.000 273293 921.583 305.000
5000 5000 0.000 174.000 264433 881.733 305.000
5500 5500 0.000 174.000 289230 977.300 305.000
6000 6000 0.000 174.000 277692 934.800 305.000
6500 6500 0.000 174.000 148223 847.517 185.000

 

 

 

Vertx.io

connections average response (ms) average size messages msg/sec walltime(secs)
1000 1 1462 1487141 4056.433 365
2000 1 1464 1607431 5003.033 365
3000 1 1464 1663777 5061.717 365
4000 1 1464 1607104 5109.65 365
5000 1 1464 1706898 5374.433 365
6000 2 1465 1750757 5341.05 365
7000 3 1465 1723124 5263.283 365
8000 3 1465 1758174 5224.85 365
9000 2 1465 1717950 5038.45 365
10000 3 1465 1789671 5160.517 365
11000 3 1465 1789760 5140.8 365
12000 3 1465 1792477 5097.5 365
13000 4 1465 1778748 5128.917 365
14000 4 1465 1746830 4895.433 365
15000 3 1465 1760734 4960.367 365
16000 3 1465 1739786 4933.583 365
17000 4 1465 1727759 4902.883 365
18000 3 1465 1730165 5158.7 365
19000 4 1465 1725660 4904.2 367
20000 4 1465 1704129 4852.567 365
21000 4 1465 1703201 4849.833 365
22000 3 1465 1705387 5077.417 366
23000 3 1465 1671165 4861.233 365
24000 4 1465 1670471 4942.217 365
25000 4 1465 1639296 5108.317 365

 

 

 

Node.js

connections average response (ms) average size messages msg/sec walltime(secs)
1000 2 1363 1550155 4252.95 365
2000 1 1368 863997 2384.8 365
3000 1 1368 644806 1776.717 365
4000 1 1369 555496 1541.683 365
5000 1 1370 483624 1338 365
6000 1 1370 426728 1185.883 365
7000 1 1371 358331 994.017 365
8000 1 1371 322960 898.55 365
9000 1 1371 281227 787.633 365
10000 1 1371 260502 732.3 365
11000 1 1370 262209 733.633 365
12000 1 1371 243954 690.067 365
13000 2 1370 230117 647.833 365
14000 2 1371 235591 668.983 365
15000 2 1371 240632 685.35 365
16000 2 1371 192929 551.083 365
17000 2 1371 166705 474.217 365
18000 2 1371 142557 406.4 365
19000 2 1371 110873 319.467 365
20000 2 1371 106047 305.967 365
21000 2 1371 107947 307.633 365
22000 1 1371 89779 257.883 365
23000 2 1371 89890 259.367 365
24000 2 1371 34414 240.833 155

 

 

 

Summary

I tried to push limits for each framework, however Atmosphere could not handle more than 3500 connections and it slowed my machine to crawl. I was using 4.0-beta version of Netty, which also crashed after 6500 connections. Vertx.io was able to scale for upto 25,000 connections, but then server ran out of memory on my machine. Similarly, node.js kept going for about 24,000 but it became very slow and my load test client could not allocate more memory for connections. Both Vertx.io and Node.js proved to be leading contenders but Vertx.io was able to scale much better and maintain throughput better than Node.js. As a result, I picked Vertx.io for my project. You can download the source code and compare results yourself.


September 24, 2012

Scaling Node.js

Filed under: Javascript — admin @ 12:22 pm

As I described in my earlier blog, I have been testing Node.js and Websockets for streaming data to large number of clients. I am evaluating Node.js and WebSockets in both pub/sub model such as clients connecting to receive streaming quotes and request/reply model where clients are invoking an API and waiting for reply. Below are some of my experience with those technologies so far:

Pub/Sub Server

The server code uses cluster module to provide multiprocessing support and each process starts a Websocket server and listens for incoming connections. I used socket.io library for WebSockets. The server keeps track of all clients that have subscribed and then send random data every 10-100 milli-seconds.
Here is basic implementation of the server:

 var MAX_THREADS = process.env.MAX_THREADS || 10;
 var MAX_ROWS = process.env.MAX_ROWS || 2;
 var nextClientId = 0;
 //
 var helper = require('helper')
 
 var pubsubController = require('pubsub_controller')(MAX_ROWS)
 var cluster = require('cluster');
 
 if (cluster.isMaster) {
    for (var i = 0; i < MAX_THREADS; i++) {
       cluster.fork();
    }
    cluster.on('exit', function(worker, code, signal) {
       console.log('worker ' + worker.process.pid + ' died');
    });
 } else {
    var socketsByAddr = {};
    var io = require('socket.io').listen(8124);
    io.set('log level', 1);
    io.set('transports', ['websocket']);
    io.set('force new connection', true);
    io.enable('browser client gzip');
 
    io.sockets.on('connection', function(socket) {
       socket.address = socket.handshake.address.address + ':' + socket.handshake.address.port;
       socket.key = socket.handshake.address.address + ':' + socket.handshake.address.port + ':' + (++nextClientId);
       socketsByAddr[socket.address] = true;
       pubsubController.subscribe(socket);
       socket.on('disconnect', function() {
          pubsubController.unsubscribe(socket);
          delete socketsByAddr[socket.address];
       });
 
       socket.on('end', function() {
       });
    });
 
    setInterval(function() {
       pubsubController.pushUpdates();
    }, helper.random(100) + 10);
 }
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
 });
 
 

Pub/Sub Client

The pub/sub client subscribes for data and receies data every 10-100 milli-seconds. The client code builds up 100 more connections every 15 seconds and it logs the response time information. On client side, I used socket.io-client library. The client calls 'subscribe-execs' to subscribe and then receive messages under 'receive-execs'. Upon receiving message, the client tracks various metrics such as response time, CPU, memory usage, etc. As, I am running both client and server on the same machine, it gives accurate representation of latency without network hops or clocks mismatch. Another thing to note is that the client passes 'force new connection' flag to force new socket for each client as by default all clients share same socket.

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 10;
 
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 hwUsage.start();
       
 var client = require('pubsub_client')(metrics, hwUsage);
 var clients = [];
 
 // create connections for this client
 var buildClients = function() {
    for (var i=0; i<100; i++) {
       var client = client.newClient(i+0);
       clients.push(client);
    }
    hwUsage.stop(function(usage) {
       console.log('clients,connected,', hwUsage.heading() + ',' + metrics.heading());
       console.log(clients.length + ',' + client.totalConnected() + ',' + usage.toString() + ',' + metrics.summary().toString());
    });
 };
 
 
 setInterval(buildClients, 15000);
 
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
 });
 
 

The actual client is defined as a module, e.g.:

 var io = require('socket.io-client');
 var helper = require('helper');
 var totalConnected = 0;
 
 module.exports = function(metrics, hwUsage) {
    var ExecClient = function(id) {
       this.id = id;
       this.client = io.connect('localhost', { port: 8124,
                               transports: ['websocket'],
                               'force new connection': true,
                               'sync on disconnect' : true,
                               'connect timeout': 500,
                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
    };
    
    // 
    ExecClient.prototype.setupConnectListener = function() {
       if (typeof this.client === "undefined") return;
       var that = this;
       this.client.on('connect_failed', function(error) {
       });
 
       this.client.on('connect', function() {
          totalConnected++;
       });
 
       this.client.on('disconnect', function() {
          totalConnected--;
       });
 
       this.client.on('reconnect_failed', function() {
          //process.exit(1);
       });
 
       this.client.on('end', function() {
          //process.exit(1);
       });
    };
 
    ExecClient.prototype.setupReceiveListener = function() {
       if (typeof this.client === "undefined") return;
       var that = this;
       this.client.on('receive-execs', function(msg) {
          console.log('receive');
          var metric = metrics.update(msg.address, msg.request, msg.timestamp, JSON.stringify(msg).length);
        });
    };
 
    return {
       totalConnected : function() {
          return totalConnected;
       },
       newClient: function(id) {
          var client = new ExecClient(id);
          client.setupConnectListener();
          client.setupReceiveListener();
          return client;
       }
    };
 }
 
 

Subscription management

Following module defines APIs for managing subscriptions and as I mentioned when number of messages received by client reaches maxMessages, it unsubscribes from the server.

 var helper = require('helper');
 var emitVolatile = typeof process.env.EMIT_VOLATILE !== "undefined" && process.env.EMIT_VOLATILE === 'true';
 
 module.exports = function(numExecs) {
    var execSubscribers = {};
    var counter = 0; 
    var generatePayload = function(address) {
       var execs = [];
       var date = new Date();
       for (var i=0; i<numExecs; i++) {
          execs.push({sequence:i, activityDateStr:date.toString(), com: helper.random(5),price: helper.random(100), accountId:1772139, symbol:"QQQ", transaction:"STO", description:"QQQ Stock", qty: helper.random(200), key: "QQQ:::S", netAmount: helper.random(1000)});
       }
       var payload = {request: counter, rows:numExecs, timestamp : date.getTime(), address:address, execs: execs};
       return payload;
    }
 
    return {
       subscribe: function(socket) {
          execSubscribers[socket.key] = socket;
       },
       unsubscribe: function(socket) {
          delete execSubscribers[socket.key];
       },
       count: function() {
          return helper.size(execSubscribers);
       },
       pushUpdates: function() {
          var count = 0;
          for (var addr in execSubscribers) {
             count++;
             try {
                var socket = execSubscribers[addr];
                //
                if (emitVolatile) {
                   socket.volatile.emit('receive-execs', generatePayload(addr));
                } else {
                   socket.emit('receive-execs', generatePayload(addr));
                }
                //
                if (typeof socket.numMessages === "undefined") {
                   socket.numMessages = 1;
                } else {
                   ++socket.numMessages;
                   //delete execSubscribers[socket.key];
                }
             } catch (err) {
                console.log('failed to send execution report' + err);
                try {
                   socket.disconnect();
                } catch (ex) {
                   console.log('failed to close socket ' + ex);
                }
                delete execSubscribers[socket.key];
             }
          }
          return count;
       }
    };
 } 
 

Hardware Measurement

Following module defines API for measuring CPU time, load average, memory, etc:

 var fs = require('fs');
 var os = require('os');
 var helper = require('helper');
 
 var HardwareUsage = function() {
    this.startCpuTime = 0;
    this.startLoadAvg = os.loadavg();
    this.startMemory = process.memoryUsage();
    this.endCpuTime = 0;
    this.endLoadAvg = 0;
    this.endMemory = 0;
    this.started = new Date().getTime();
    var that = this;
    this.cpuUsage (function(totalCpu) {
       that.startCpuTime = totalCpu;
    });
 }
 
 HardwareUsage.prototype.cpuUsage = function(callback) {
    if (!fs.existsSync("/proc/" + process.pid + "/stat")) {
       callback();
       return;
    }
    var that = this;
 
    fs.readFile("/proc/" + process.pid + "/stat", function(err, data){
       var elems = data.toString().split(' ');
       var utime = parseInt(elems[13]);
       var stime = parseInt(elems[14]);
       var totalCpu = utime + stime;
       callback(totalCpu);
    });
 };
 
 
 HardwareUsage.prototype.percCpuChange = function() {
    return helper.percentChange(this.startCpuTime, this.endCpuTime);
 }
 
 HardwareUsage.prototype.percLoadChange = function() {
    return helper.percentChange(this.startLoadAvg[0], this.endLoadAvg[0]);
 }
 
 HardwareUsage.prototype.percMemoryChange = function() {
    return helper.percentChange(this.startMemory.heapTotal, this.endMemory.heapUsed);
 }
 
 
 HardwareUsage.prototype.toString = function() {
    return helper.round(this.endCpuTime) + ',' + helper.round(this.endLoadAvg[0]) + ',' + helper.round(this.startMemory.heapTotal/1024.0/1024.0) + ',' + helper.round(this.endMemory.heapUsed/1024.0/1024.0);
 };
 
 HardwareUsage.prototype.stop = function(callback) {
    var that = this;
    this.cpuUsage (function(totalCpu) {
       that.endCpuTime = totalCpu;
       that.endMemory = process.memoryUsage();
       that.endLoadAvg = os.loadavg();
       that.elapsed = new Date().getTime() - that.started;
       callback(that);
    });
 };
 
 var usage = new HardwareUsage();
 
 exports.heading = function() {
    return 'cpu time, load avg, total memory (M), used memory(M)';
 };
 exports.start = function() {
    usage = new HardwareUsage();
 };
 
 exports.stop = function(callback) {
    usage.stop(callback);
 };
 

Response time Measurement

Following module defines API for measuring response time:

 var helper = require('helper');
    
 module.exports = function(store, outlierSize) {
    var MetricResultSummary = function(metric) {
       this.averageResponseTime = helper.round(metric.totalTime / metric.totalRequests);
       this.averageHighResponseTime = helper.round(helper.sum(metric.topTenHigh)/ metric.topTenHigh.length);
       this.averageLowResponseTime = helper.round(helper.sum(metric.topTenLow)/ metric.topTenLow.length);
       this.averageSize = helper.round(metric.totalPayloadSize / metric.totalRequests);
       this.messagesReceived = metric.messagesReceived;
       this.count = 0;
    }
    MetricResultSummary.prototype.toString = function() {
       return this.count + ',' + this.averageResponseTime + ',' + this.averageHighResponseTime + ',' + this.averageLowResponseTime + ',' + this.averageSize + ',' + this.messagesReceived;
    };
 
    var Metric = function() {
       this.totalTime = 0;
       this.totalRequests = 0;
       this.topTenHigh = [];
       this.topTenLow = [];
       this.totalPayloadSize = 0;
       this.messagesReceived = 0;
    }
 
    Metric.prototype.addTo = function(target) {
       target.totalTime += this.totalTime;
       target.totalRequests += this.totalRequests;
       for (var i=0; i<this.topTenHigh.length; i++) {
          target.topTenHigh.push(this.topTenHigh[i]);
       }
       for (var i=0; i<this.topTenLow.length; i++) {
          target.topTenLow.push(this.topTenLow[i]);
       }
       target.totalPayloadSize += this.totalPayloadSize;
       target.messagesReceived += this.messagesReceived;
    };
 
    Metric.prototype.update = function(id, sendTime, payloadSize) {
       var elapsed = new Date().getTime() - sendTime;
       for (var i=0; i<outlierSize; i++) {
          if (typeof this.topTenHigh[i] === "undefined" || elapsed > this.topTenHigh[i]) {
             this.topTenHigh[i] = elapsed;
             break;
          }
       }
       //
       for (var i=0; i<outlierSize; i++) {
          if (typeof this.topTenLow[i] === "undefined" || elapsed < this.topTenLow[i]) {
             this.topTenLow[i] = elapsed;
             break;
          }
       }
       this.messagesReceived++;
       this.totalTime += elapsed;
       this.totalPayloadSize += payloadSize;
       this.totalRequests++;
    };
    //
    Metric.prototype.averageResponse = function() {
       return helper.round(this.totalTime / this.totalRequests);
    };
    Metric.prototype.summary = function() {
       return new MetricResultSummary(this);
    };
    Metric.prototype.toString = function() {
       return new MetricResultSummary(this).toString();
    };
 
    var get = function(key) {
          var metric = store[key];
          if (typeof metric === "undefined") {
             metric = new Metric();
             store[key] = metric;
          }
          return metric;
    };
    return {
       get: function(key) {
          return get(key);
       },
       update: function(key, id, sendTime, payloadSize) {
          var metric = get(key);
          metric.update(id, sendTime, payloadSize);
          return metric;
       },
       summaryFor: function(key) {
          var metric = get(key);
          return metric.summary();
       },
       heading: function() {
          return 'count, average response (ms), average outlier high response (ms), average outlier low response (ms), average message size, messages received';
       },
       summary: function() {
          var result = new Metric();
          var count = 0;
          for (var key in store) {
             var metric = store[key];
             metric.addTo(result);
             count++;
          }
          var summary = result.summary();
          summary.count = count;
          return summary;
       }
    };
 }
 
 

Request/Reply Server

The request/reply server also uses cluster module and listens for Websocket port on each process. Upon receiving order-request API, it sends a reply to the client. Here is the implementation of server:

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 2;
 var MAX_THREADS = process.env.MAX_THREADS || 20;
 var helper = require('helper')
    
 var cluster = require('cluster');
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 hwUsage.start();
 var socketsByAddr = {};
 var totalClients = 0;
 
 
 if (cluster.isMaster) {
    for (var i = 0; i < MAX_THREADS; i++) {
       cluster.fork();
    }  
    cluster.on('exit', function(worker, code, signal) {
       console.log('worker ' + worker.process.pid + ' died');
    });
 } else {
    //
    var io = require('socket.io').listen(8124);
    io.set('log level', 1); 
    io.set('transports', ['websocket']);
    io.set('force new connection', true);
    io.enable('browser client gzip');
    
    io.sockets.on('connection', function(socket) {
       totalClients++;
       socket.address = socket.handshake.address.address + ':' + socket.handshake.address.port;
       socketsByAddr[socket.address] = true;
       socket.on('order-request', function(request) {
          var response = {request: request.counter, timestamp : request.timestamp, address:request.address};
          metrics.update(request.address, request.request, request.timestamp, JSON.stringify(request).length);
          socket.emit('order-response', response);
       });
       
       // disconnected
       socket.on('disconnect', function() {
          totalClients--;
          delete socketsByAddr[socket.address];
       });
       
       socket.on('end', function() {
       });
    });
    });
 }
 
 
 setInterval(function() {
    hwUsage.stop(function(usage) {
      console.log('sockets, clients, ' + hwUsage.heading() + ',' + metrics.heading());
      console.log(helper.size(socketsByAddr) + ',' + totalClients + ',' + usage.toString() + ',' + metrics.summary().toString());
    });
 }, 15000);
 
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
 });
 
 

Request/Reply Client

The request/reply client is similar to pub/sub model except it initiates order-request API every 50-250 milli-seconds. It also creates new connections every 15 seconds and logs response time continuously.

 var OUTLIER_SIZE = process.env.OUTLIER_SIZE || 10;
 var INTERVAL_BETWEEN_NEW_CLIENTS_SECS = (process.env.INTERVAL_BETWEEN_NEW_CLIENTS_SECS || 15) * 1000;
 
 var hwUsage = require('hardware_usage');
 var metrics = require('metrics')({}, OUTLIER_SIZE);
 hwUsage.start();
 
 var order_client = require('order_client')(metrics, hwUsage);
 // create connections for this client
 var totalConnections = 0;
 var buildClients = function() {
    for (var i=0; i<100; i++) {
       var client = order_client.newClient();
    }
 };
 
 
 setInterval(buildClients, INTERVAL_BETWEEN_NEW_CLIENTS_SECS);
 
 setInterval(function() {
     order_client.log();
 }, INTERVAL_BETWEEN_NEW_CLIENTS_SECS);
 
 process.on('uncaughtException', function(err) {
   console.log("GLOBAL " + err + "\n" + err.stack);
   console.trace('stack trace');
   //process.exit(1);
 });
 

The actual client is defined as module, e.g.

 var io = require('socket.io-client');
 var helper = require('helper');
 var MAX_ROWS = process.env.MAX_ROWS || 10;
 
 var nextClientId = 0;
 var nextRequestId = 0;
 var totalConnected = 0;
 module.exports = function(metrics, hwUsage) {
    var OrderClient = function() {
       this.id = ++nextClientId;
       this.connected = false;
       this.client = io.connect('localhost', { port: 8124,
                               transports: ['websocket'],
                               'force new connection': true,
                               'sync on disconnect' : true,
                               'connect timeout': 500,
                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
    };
 
    //
    OrderClient.prototype.setupConnectListener = function() {
       var that = this;
       this.client.on('connect_failed', function(error) {
          this.connected = false;
       });
    
       this.client.on('connect', function() {
          this.connected = true;
          totalConnected++;
       });
 
       this.client.on('disconnect', function() {
          this.connected = false;
          totalConnected--;
       });
    
       this.client.on('reconnect_failed', function() {
          this.connected = false;
       });
       
       this.client.on('end', function() {
          this.connected = false; 
       });
    };    
          
       
    OrderClient.prototype.sendRequest = function(max) {
       var execs = [];
       var date = new Date();
       for (var i=0; i<max; i++) {
          execs.push({sequence:i, activityDateStr:date.toString(), com: helper.random(5),price: helper.random(100), accountId:1772139, symbol:"QQQ", transaction:"STO", description:"QQQ Stock", qty: helper.random(200), key: "QQQ:::S", netAmount: helper.random(1000)});
       }
       var request = {request: ++nextRequestId, rows:max, timestamp : date.getTime(), address:this.id, execs: execs};
       this.client.emit('order-request', request);
    }; 
    OrderClient.prototype.setupResponseListener = function() {
       var that = this;
       this.client.on('order-response', function(response) {
          var metric = metrics.update(response.address, response.request, response.timestamp, JSON.stringify(response).length);
        });
    };
 
    return {
       log: function() {
          hwUsage.stop(function(usage) {
             console.log('clients,connected,' + hwUsage.heading() + ',' + metrics.heading());
             console.log(nextClientId + ',' + totalConnected + ',' + usage.toString() + ',' + metrics.summary().toString());
          });
       },
       newClient: function() {
          var client = new OrderClient();
          client.setupConnectListener();
          client.setupResponseListener();
          setInterval(function() {
             client.sendRequest(helper.random(MAX_ROWS));
          }, helper.random(250) + 50);
          return client;
       }
    };
 }
 
 

First blocker

I started testing on my Linux machine and saw RangeError: Maximum call stack size exceeded after server reached about 2000 connections. It turned out JSON library on socket.io went into recursion when it received messages while it's parsing existing message. I found a patch at Fix infinite recursion in Websocket parsers and manually applied it as I didn't see it on the latest version socket.io library 0.8.9. After applying it, I was able to make some progress.

Second blocker

After reaching about 10,000 connections I started seeing Cannot call method 'packet' of null coming from the socket.js. At the time, I was using default configuration for Websockets and apparently it was falling back to xhr-polling under heavy load (See open bug for it). I changed configuration to explicitly defined websocket protocol without any fallback, e.g.

 io.set('transports', ['websocket']);
 

I also made some changes to network settings on my Linux machine as recommended by Node.js w/1M concurrent connections!.

 net.core.rmem_max = 33554432
 net.core.wmem_max = 33554432
 net.ipv4.tcp_rmem = 4096 16384 33554432
 net.ipv4.tcp_wmem = 4096 16384 33554432
 net.ipv4.tcp_mem = 786432 1048576 26777216
 net.ipv4.tcp_max_tw_buckets = 360000
 net.core.netdev_max_backlog = 2500
 vm.min_free_kbytes = 65536
 vm.swappiness = 0
 net.ipv4.ip_local_port_range = 1024 65535
 

I then applied above changes using:

 sudo sysctl -w net.core.somaxconn=10000
 sudo sysctl -w net.ipv4.tcp_max_syn_backlog=10000 
 sudo sysctl -p
 

Third blocker

Above changes got me to about 17,000 connections but then I started seeing connection timeout on the client side and warn: client not handshaken client should reconnect on the server side. are two open bugs: first and second for it. I added reconnect parameters to client connection, e.g.

                               'reconnect': true,
                               'reconnection delay': 500,
                               'reopen delay': 500,
                               'max reconnection attempts': 5});
 

Fourth blocker

Under heavy load, I also saw "Error: not opened" coming from WebSocket library, e.g.

     at WebSocket.send (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/node_modules/ws/lib/WebSocket.js:175:16)
     at Transport.WS.send (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transports/websocket.js:107:22)
     at Transport.packet (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:178:10)
     at Transport.WS.payload (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transports/websocket.js:120:12)
     at Socket.flushBuffer (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/socket.js:327:20)
     at Socket.setBuffer (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/socket.js:314:14)
     at Socket.onConnect (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/socket.js:409:14)
     at Transport.onConnect (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:139:17)
     at Transport.onPacket (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:91:12)
     at Transport.onData (/alldata/home/sbhatti/prototype-nodejs/node_modules/socket.io-client/lib/transport.js:69:16)
 

I chose to ignore it as Websocket is attempting to send data when connection is closed and client will just reconnect.

Summary

I am still evaluating Node.js and I have not decided if I would recommend Node.js for streaming solution. I will be comparing these results against some Java solutions such as Atomosphere and Vertx.io and will post those results in future.


Newer Posts »

Powered by WordPress