Shahzad Bhatti Welcome to my ramblings and rants!

August 17, 2014

PlexService Overview – a Micro-service framework for defining HTTP/Websockets and JMS based Services

Filed under: Uncategorized — admin @ 9:19 pm

I recently created a new framework PlexService for serving micro-services. which can be accessed by HTTP, Websockets or JMS interfaces. You can choose these different access mechanism by needs of your services. For example, as JMS services are inherently asynchronous, they provide good foundation for building scalable and reactive services. You may choose http stack for implementing REST services or choose websockets for implementing interactive services.

PlexService framework provides provides basic support for encoding POJO objects into JSON for service consumption. The developers define service configuration via annoations to specify gateway types, encoding scheme, end-points, etc.

PlexService provides support of role-based security, where you can specify list of roles who can access each service. The service providers implement how to verify roles, which are then enforced by PlexService framework.

If you implement all services in JMS, you can easily expose them via HTTP or Websockets by configuring web-to-jms bridge. The bridge routes all requests from HTTP/Websockets to JMS and listen for incoming messages, which are then routed back to web clients.

PlexService provides basic metrics such as latency, invocations, errors, etc., which are exposed via JMX interface. PlexService uses jetty for serving web services. The developers provide JMS containers at runtime if required.

Building/Installing

Checkout code using

 git clone git@github.com:bhatti/PlexService.git

Compile and build jar file using

 ./gradlew jar

Copy and add jar file manually in your application.

Defining role-based security

PlexService allows developers to define role-based security, which is invoked when accessing services, e.g.

 public class BuggerRoleAuthorizer implements RoleAuthorizer {
     private final UserRepository userRepository;
 
     public BuggerRoleAuthorizer(UserRepository userRepository) {
       this.userRepository = userRepository;
     }
 
     @Override
       public void authorize(Request request, String[] roles) throws AuthException {
         String sessionId = request.getSessionId();
         User user = userRepository.getUserBySessionId(sessionId);
         if (user == null) {
           throw new AuthException(Constants.SC_UNAUTHORIZED,
               request.getSessionId(), request.getRemoteAddress(),
               "failed to validate session-id");
         }
         for (String role : roles) {
           if (!user.getRoles().contains(role)) {
             throw new AuthException(Constants.SC_UNAUTHORIZED,
                 request.getSessionId(), request.getRemoteAddress(),
                 "failed to match role");
           }
         }
       }
 }

Typically, login-service will store session-id, which is then passed to the implementation of RoleAuthorizer, e.g.

 @ServiceConfig(gateway = GatewayType.HTTP, requestClass = Void.class, endpoint = "/login", method = Method.POST, codec = CodecType.JSON)
 public class LoginService extends AbstractUserService implements RequestHandler {
   public LoginService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
   public void handle(Request request) {
     String username = request.getStringProperty("username");
     String password = request.getStringProperty("password");
 
     User user = userRepository.authenticate(username, password);
     AbstractResponseBuilder responseBuilder = request.getResponseBuilder();
     if (user == null) {
       throw new AuthException(Constants.SC_UNAUTHORIZED,
               request.getSessionId(), request.getRemoteAddress(),
               "failed to authenticate");
     } else {
       responseBuilder.addSessionId(userRepository.getSessionId(user));
       responseBuilder.send(user);
     }   
   }
 }

In above example the session-id is added to response upon successful login, which is then passed for future requests. For http services, you may use cookies to store session-ids, otherwise you would need to pass session-id as a parameter.

Here is how you can invoke login-service from curl:

 
 curl --cookie-jar cookies.txt -v -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/login?username=erica&password=pass"

which would return:

 
 Content-Type: application/json
 Set-Cookie: PlexSessionID=5 Expires: Thu, 01 Jan 1970 00:00:00 GMT
 {"id":5,"username":"erica","email":"erica@plexobject.com","roles":["Employee"]}

Defining Services

Defining a REST service for creating a user

Here is how you can a REST service:

@ServiceConfig(gateway = GatewayType.HTTP, requestClass = User.class, 
     rolesAllowed = "Administrator", endpoint = "/users", method = Method.POST, 
     codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements
 RequestHandler {
   public CreateUserService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }

The ServiceConfig annotation defines that this service can be accessed via HTTP at “/users” URI. PlexService will provide encoding from JSON to User object and will ensure that service can be accessed by user who has Administrator role.

Here is how you can invoke this service from curl:

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/users" -d "{\"username\":\"david\",\"password\":\"pass\",\"email\":\"david@plexobject.com\",\"roles\":[\"Employee\"]}"

Defining a Web service over Websockets for creating a user

Here is how you can a Websocket based service:

 @ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = User.class, 
     rolesAllowed = "Administrator", endpoint = "/users", method = Method.POST, 
     codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements
 RequestHandler {
   public CreateUserService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }

The ServiceConfig annotation defines that this service can be accessed via Websocketat “/users” endpoint. However, as opposed to HTTP based service, this endpoint is not enforced in HTTP request and can be in any format as long it’s unique for a service.

Here is how you can access websocket service from javascript:

 var ws = new WebSocket("ws://127.0.0.1:8181/users");
 ws.onopen = function() {
   var req = {"payload":"", "endpoint":"/login", "method":"POST", "username":"scott", "password":"pass"};
   ws.send(JSON.stringify(req));
 };
 
 ws.onmessage = function (evt) {
   alert("Message: " + evt.data);
 };
 
 ws.onclose = function() {
 };
 
 ws.onerror = function(err) {
 };

Note that websockets are not supported by all browsers and above code will work only supported browsers such as IE 11+, FF 31+, Chrome 36+, etc.

Defining a JMS service for creating a user

Here is how you can create JMS service:

 @ServiceConfig(gateway = GatewayType.JMS, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "queue:{scope}-create-user-service-queue", 
       method = Method.MESSAGE, 
       codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements RequestHandler {
     public CreateUserService(UserRepository userRepository) {
     super(userRepository);
     }
 
     @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }

Note that the only difference is type of gateway. PlexService also support variables in end-points, which are populated from configurations. For example, you may create scope variable to create different queues/topics for different developers/environments. PlexService will serialize POJO classes into JSON when delivering messages over JMS.

Defining a REST service with parameterized URLs

PlexService allows developers to define URIs for services, that contains variables. These variables are then populated actual requests. These can be used for implementing REST services, e.g.

 @ServiceConfig(gateway = GatewayType.HTTP, requestClass = BugReport.class, 
       rolesAllowed = "Employee", endpoint = "/projects/{projectId}/bugreports", 
       method = Method.POST, 
       codec = CodecType.JSON)
 public class CreateBugReportService extends AbstractBugReportService implements RequestHandler {
     public CreateBugReportService(BugReportRepository bugReportRepository,
         UserRepository userRepository) {
       super(bugReportRepository, userRepository);
     }
 
     @Override
       public void handle(Request request) {
         BugReport report = request.getPayload();
         report.validate();
         BugReport saved = bugReportRepository.save(report);
         request.getResponseBuilder().send(saved);
       }
 }

Here is an example of invoking this service from curl:

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/projects/2/bugreports" -d "{\"title\":\"As an administrator, I would like to assign roles to users so that they can perform required actions.\",\"description\":\"As an administrator, I would like to assign roles to users so that they can perform required actions.\",\"bugNumber\":\"story-201\",\"assignedTo\":\"mike\",\"developedBy\":\"mike\"}"
 

Using variables with Websocket based service

You can also create variables for websocket’s endpoints similar to JMS, which are initialized from parameters.

 @ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = BugReport.class, 
       rolesAllowed = "Employee", endpoint = "{variable}-create-bugreport-service-channel", 
       method = Method.MESSAGE, codec = CodecType.JSON)
 public class CreateBugReportService extends AbstractBugReportService implements
         RequestHandler {
     public CreateBugReportService(BugReportRepository bugReportRepository,
             UserRepository userRepository) {
         super(bugReportRepository, userRepository);
     }
 
     @Override
     public void handle(Request request) {
         BugReport report = request.getPayload();
         report.validate();
         BugReport saved = bugReportRepository.save(report);
         request.getResponseBuilder().send(saved);
     }
 
 }

Here is another example of consuming websocket based service from javascript:

 var ws = new WebSocket("ws://127.0.0.1:8181/users");
 ws.onopen = function() {
   var req = {"payload":{"title":"my title", "description":"my description","bugNumber":"story-201", "assignedTo":"mike", "developedBy":"mike"},"PlexSessionID":"4", "endpoint":"/projects/2/bugreports/2/assign", "method":"POST"};
   ws.send(JSON.stringify(req));
 };
 
 ws.onmessage = function (evt) {
   alert("Message: " + evt.data);
 };
 
 ws.onclose = function() {
 };
 
 ws.onerror = function(err) {
 };

Defining a REST service for querying users

Here is an example REST service, which uses GET request to query users:

   @ServiceConfig(gateway = GatewayType.HTTP, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "/users", method = Method.GET, 
       codec = CodecType.JSON)
   public class QueryUserService extends AbstractUserService implements
   RequestHandler {
     public QueryUserService(UserRepository userRepository) {
       super(userRepository);
     }
     @Override
       public void handle(Request request) {
         Collection<User> users = userRepository.getAll(new Predicate<User>() {
             @Override
             public boolean accept(User u) {
             return true;
             }
             });
         request.getResponseBuilder().send(users);
       }
   }

Here is how you can invoke this service from curl

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" "http://127.0.0.1:8181/users"   

which would return json array such as:

 
 [{"id":2,"username":"alex","email":"alex@plexobject.com","roles":["Employee"]},{"id":3,"username":"jeff","email":"jeff@plexobject.com","roles":["Employee","Manager"]},{"id":4,"username":"scott","email":"scott@plexobject.com","roles":["Employee","Administrator","Manager"]},{"id":5,"username":"erica","email":"erica@plexobject.com","roles":["Employee"]}]

Defining a JMS service for querying users

Here is an example of defining query users via JMS service:

 @ServiceConfig(gateway = GatewayType.JMS, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "queue:{scope}-query-user-service-queue", 
       method = Method.MESSAGE, 
       codec = CodecType.JSON)
 public class QueryUserService extends AbstractUserService implements RequestHandler {
     public QueryUserService(UserRepository userRepository) {
       super(userRepository);
     }
     @Override
       public void handle(Request request) {
         Collection<User> users = userRepository.getAll(new Predicate<User>() {
             @Override
             public boolean accept(User u) {
             return true;
             }
             });
         request.getResponseBuilder().send(users);
       }
 }

The end-point can contain variables such as scope that are initialized from configuration.

Registering services and starting service container

You will need to register services with ServiceRegistry at runtime, which would initialize and start those services, e.g.

 Collection<RequestHandler> services = new HashSet<>();
 services.add(new CreateUserService(userRepository));
 services.add(new UpdateUserService(userRepository));
 services.add(new QueryUserService(userRepository));
 services.add(new DeleteUserService(userRepository));
 services.add(new LoginService(userRepository));
 services.add(new CreateProjectService(projectRepository, userRepository));
 services.add(new UpdateProjectService(projectRepository, userRepository));
 services.add(new QueryProjectService(projectRepository, userRepository));
 services.add(new AddProjectMemberService(projectRepository, userRepository));
 services.add(new RemoveProjectMemberService(projectRepository, userRepository));
 services.add(new CreateBugReportService(bugreportRepository, userRepository));
 services.add(new UpdateBugReportService(bugreportRepository, userRepository));
 services.add(new QueryBugReportService(bugreportRepository, userRepository));
 services.add(new QueryProjectBugReportService(bugreportRepository, userRepository));
 
 services.add(new AssignBugReportService(bugreportRepository, userRepository));
 serviceRegistry = new ServiceRegistry(config, services, new BuggerRoleAuthorizer(userRepository));
 serviceRegistry.start();
 

Creating Http to JMS bridge

You may choose to write all services as JMS and then expose them via HTTP using bridge provided by PlexService, e.g.

   final String mappingJson = IOUtils.toString(new FileInputStream( args[1]));
 Collection<HttpToJmsEntry> entries = new JsonObjectCodec().decode(
     mappingJson, new TypeReference<List<HttpToJmsEntry>>() {
     });
 WebToJmsBridge bridge = new WebToJmsBridge(new Configuration(args[0]), entries, GatewayType.HTTP);
 bridge.startBridge();

Creating Websocket to JMS bridge

Similarly, you may expose JMS services via websockets based transport using the bridge:

 
   final String mappingJson = IOUtils.toString(new FileInputStream( args[1]));
 Collection<HttpToJmsEntry> entries = new JsonObjectCodec().decode(
     mappingJson, new TypeReference<List<HttpToJmsEntry>>() {
     });
 WebToJmsBridge bridge = new WebToJmsBridge(new Configuration(args[0]), entries, GatewayType.WEBSOCKET);
 bridge.startBridge();

Here is JSON configuration for bridge:

[
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports/{id}/assign","method":"POST",
     "destination":"queue:{scope}-assign-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports","method":"GET",
     "destination":"queue:{scope}-query-project-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users","method":"GET",
     "destination":"queue:{scope}-query-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects","method":"GET",
     "destination":"queue:{scope}-query-projects-service","timeoutSecs":30},
   {"codecType":"JSON","path":"/bugreports","method":"GET",
     "destination":"queue:{scope}-bugreports-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}/membership/add","method":"POST",
     "destination":"queue:{scope}-add-project-member-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}/membership/remove","method":"POST",
     "destination":"queue:{scope}-remove-project-member-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports","method":"POST",
     "destination":"queue:{scope}-create-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users","method":"POST",
     "destination":"queue:{scope}-create-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects","method":"POST",
     "destination":"queue:{scope}-create-projects-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users/{id}","method":"POST",
     "destination":"queue:{scope}-update-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users/{id}/delete","method":"POST",
     "destination":"queue:{scope}-delete-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}","method":"POST",
     "destination":"queue:{scope}-update-project-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports/{id}","method":"POST",
     "destination":"queue:{scope}-update-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/login","method":"POST",
     "destination":"queue:{scope}-login-service-queue","timeoutSecs":30}]

Defining a Streaming Quotes Service over Websockets

Suppose you are building a high performance streaming quote service for providing real-time stock quotes, you can easily build it using PlexService framework, e.g.

@ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = Void.class, endpoint = "/quotes", method = Method.MESSAGE, codec = CodecType.JSON)
 public class QuoteServer implements RequestHandler {
     public enum Action {
         SUBSCRIBE, UNSUBSCRIBE
     }
 
     static final Logger log = LoggerFactory.getLogger(QuoteServer.class);
 
     private QuoteStreamer quoteStreamer = new QuoteStreamer();
 
     @Override
     public void handle(Request request) {
         String symbol = request.getProperty("symbol");
         String actionVal = request.getProperty("action");
         log.info("Received " + request);
         ValidationException
                 .builder()
                 .assertNonNull(symbol, "undefined_symbol", "symbol",
                         "symbol not specified")
                 .assertNonNull(actionVal, "undefined_action", "action",
                         "action not specified").end();
         Action action = Action.valueOf(actionVal.toUpperCase());
         if (action == Action.SUBSCRIBE) {
             quoteStreamer.add(symbol, request.getResponseBuilder());
         } else {
             quoteStreamer.remove(symbol, request.getResponseBuilder());
         }
     }
 
     public static void main(String[] args) throws Exception {
         Configuration config = new Configuration(args[0]);
         QuoteServer service = new QuoteServer();
         Collection<RequestHandler> services = new ArrayList<>();
         services.add(new QuoteServer());
         //
         ServiceRegistry serviceRegistry = new ServiceRegistry(config, services, null);
         serviceRegistry.start();
         Thread.currentThread().join();
     }
 }

Above example defines a service that listen to websockets and responds to subscribe or unsubscribe requests from web clients.

You can define mock QuoteStreamer as follows, which periodically sends quotes to all subscribers:

public class QuoteStreamer extends TimerTask {
     private int delay = 1000;
     private Map<String, Collection<ResponseDispatcher>> subscribers = new ConcurrentHashMap<>();
     private QuoteCache quoteCache = new QuoteCache();
     private final Timer timer = new Timer(true);
 
     public QuoteStreamer() {
         timer.schedule(this, delay, delay);
     }
 
     public void add(String symbol, ResponseDispatcher dispatcher) {
         symbol = symbol.toUpperCase();
         synchronized (symbol.intern()) {
             Collection<ResponseDispatcher> dispatchers = subscribers
                     .get(symbol);
             if (dispatchers == null) {
                 dispatchers = new HashSet<ResponseDispatcher>();
                 subscribers.put(symbol, dispatchers);
             }
             dispatchers.add(dispatcher);
         }
     }
 
     public void remove(String symbol, ResponseDispatcher dispatcher) {
         symbol = symbol.toUpperCase();
         synchronized (symbol.intern()) {
             Collection<ResponseDispatcher> dispatchers = subscribers
                     .get(symbol);
             if (dispatchers != null) {
                 dispatchers.remove(dispatcher);
             }
         }
     }
 
     @Override
     public void run() {
         for (Map.Entry<String, Collection<ResponseDispatcher>> e : subscribers
                 .entrySet()) {
             Quote q = quoteCache.getLatestQuote(e.getKey());
             Collection<ResponseDispatcher> dispatchers = new ArrayList<>(
                     e.getValue());
             for (ResponseDispatcher d : dispatchers) {
                 try {
                     d.send(q);
                 } catch (Exception ex) {
                     remove(e.getKey(), d);
                 }
             }
         }
     }
 }

Here is a sample javascript/html client, which allows users to subscribe to different stock symbols:

       var ws = new WebSocket("ws://127.0.0.1:8181/quotes");
       ws.onopen = function() {
       };
       var lasts = {};
       ws.onmessage = function (evt) {
         //console.log(evt.data);
         var quote = JSON.parse(evt.data).payload;
         var d = new Date(quote.timestamp);
         $('#time').text(d.toString());
         $('#company').text(quote.company);
         $('#last').text(quote.last.toFixed(2));
         var prev = lasts[quote.company];
         if (prev != undefined) {
           var change = quote.last - prev;
           if (change >= 0) {
             $('#change').css({'background-color':'green'});
           } else {
             $('#change').css({'background-color':'red'});
           }
           $('#change').text(change.toFixed(2));
         } else {
           $('#change').text('N/A');
         }
         lasts[quote.company] = quote.last;
       };
 
       ws.onclose = function() {
       };
 
       ws.onerror = function(err) {
       };
       function send(payload) {
         $('#input').text(payload);
         ws.send(payload);
       }
       $(document).ready(function() {
         $("#subscribe").click(function() {
           var symbol = $("#symbol").val();
           var req = {"endpoint":"/quotes", "symbol":symbol, "action":"subscribe"};
           send(JSON.stringify(req));
         });
       });
       $(document).ready(function() {
         $("#unsubscribe").click(function() {
           var symbol = $("#symbol").val();                                                                                            
           var req = {"endpoint":"/quotes", "symbol":symbol, "action":"unsubscribe"};
           send(JSON.stringify(req));
         });
       });
    <script>
 
   <body>
     <form>
       Symbol:<input type="text" id="symbol" value="AAPL" size="4" />
       <input type="button" id="subscribe" value="Subscribe"/>
       <input type="button" id="unsubscribe" value="Unsubscribe"/>
     </form>
 
     <br>
 
     <table id="quotes" class="quote" width="600" border="2" cellpadding="0" cellspacing="3">
       <thead>
         <tr>
           <th>Time</th>
           <th>Company</th>
           <th>Last</th>
           <th>Change</th>
         </tr>
       </thead>
       <tbody>
         <tr>
           <td id="time"></td>
           <td id="company"></td>
           <td id="last"></td>
           <td id="change"></td>
         </tr>
       </tbody>
     </table>
   </body>

PlexService includes this sample code, where you can start streaming quote server by running “quote.sh” command and then open quote.html file in your browser.

Using JMX

PlexService uses JMX to expose key metrics and lifecycle methods to start or stop services. You can use jconsole to access the JMX controls, e.g.

 
 jconsole localhost:9191

Summary

PlexService comes a full-fledged sample application under plexsvc-sample folder and you browse JavaDocs to view APIs.

 

Powered by WordPress