Shahzad Bhatti Welcome to my ramblings and rants!

May 21, 2023

Heuristics from “Code That Fits in Your Head”

Filed under: Methodologies,Technology,Uncategorized — admin @ 5:00 pm

The code maintenance and readability are important aspects of writing software systems and the “Code That Fits in Your Head” comes with a lot of practical advice for writing maintainable code. Following are a few important heuristics from the book:

1. Art or Science

In the first chapter, the author compares software development with other fields such as Civil engineering that deals with design, construction, and maintenance of components. Though, software development has these phases among others but the design and construction phases in it are intimately connected and requires continuous iteration. Another metaphor discussed in the book is thinking software development as a living organism like garden, which makes more sense as like pruning weeds in garden, you have to refactor the code base and manage technical debt. Another metaphor described in the book is software craftsmanship and software developer may progress from apprentice, journeyman to master. Though, these perspectives help but software doesn’t quite fit the art metaphor and author suggests heuristics and guidelines for programming. The author introduces software engineering that allows a structured framework for development activities.

2. Checklists

A lot of professions such as airplane pilots and doctors follow a checklist for accomplishing a complex task. You may use similar checklist for setting up a new code-base such as using Git, automating the build, enabling all compiler error messages, using linters, static analysis, etc. Though, software engineering is more than following a checklist but these measures help make small improvements.

3. Tackling Complexity

This chapter defines sustainability and quality as the purpose for the book as the software may exists for decades and it needs to sustain its organization. The software exists to provide a value, though in some cases the value may not be immediate. This means at times, worse technology succeeds because it provides faster path to the value and companies which are too focus on perfection can run out of business. Richard Gabriel coined the aphorism that worse is better. The sustainability chooses middle ground by moving in right direction with checklists and balanced software development practices. The author compares computer with human brain and though this comparison is not fair and working memory of humans is much smaller that can hold from four to seven pieces of information. This number is important when writing a code as you spend more time reading the code and a code with large number of variables or conditional logic can make it harder to understand. The author refers to the work of Daniel Kahneman who suggested model of thoughts comprising two systems: System 1 and System 2. When a programmer is in the zone or in a flow, the system 1 always active and try to understand the code. This means that writing modular code with a fewer dependencies, variables and decisions is easier to understand and maintain. The human brain can deal with limited memory and if the code handles more than seven things at once then it will lead to the complexity.

4. Vertical Slice and Walking Skeleton

This chapter recommends starting and deploying a vertical slice of the application to get to the working software. A vertical slice may consists of multiple layers but it gives an early feedback and is a working software. A number of software development methodologies such as Test-driven development, Behavioral-driven development, Domain-driven design, Type-driven development and Property-driven development help building fine-grained implementations with tests. For example, if you don’t have tests then you can use characterization test to describe the behavior of existing software. The tests generally follow Arranage-Act-Assert phases where the arrange phase prepares the test, the act phase invokes the operation under test and the assert phase verifies the actual outcome. The documentation can further explain why decisions in the code were made. The walking skeleton helps vertical slice by using acceptance-test-driven development or outside-in-test-driven development. For example, you can pick a simplest feature to implement that aims for the happy path to demonstrate that the system has a specific capability. The unit-tests will test this feature by using Fake Object, data-transfer-object (DTO) and interfaces (e.g. RepositoryInterface). The dependencies are injected into tests with this mock behavior. The real objects that are difficult tests can use a Humble Object pattern and drain the object of branching logic. Making small improvements that are continuously delivered also keep stakeholders updated so that they know when you will be done.

5. Encapsulation

The encapsulation hides details by defining a contract that describes the valid interactions between objects and callers. The parameterized tests can capture the desired behavior and assert the invariants. The incremental changes can be added using test-driven development that uses red-green-refactor where you first write a failing test, then make the test pass and then refactor to improve the code. When using a contract to capture the interactions, you can use Postel’s law to build resilient systems, i.e.,

Be conservative in what you send, be liberal in what you accept.

The encapsulation guarantees that an object is always valid, e.g. you can use a constructor to validate all invariants including pre-conditions and post-conditions.

6. Triangulation

As the working memory for humans is very small, you have to decompose and compartmentalize the code structure into smaller chunks that can be easily understood. The author describes a devil’s advocate technique for validating behavior in the unit tests where you try to pass the tests with incomplete implementation, which tells you that you need more test cases. This process can be treated as kind of triangulation:

As the tests get more specific, the code gets more generic

7. Decomposition

The code rot occurs because no one pays attention to the overall quality when making small changes. You can use metrics to track gradual decay such as cyclomatic complexity should be below seven. In order to improve the code readability, the author suggests using 80/24 rule where you limit a method size to be no more than 24 lines and width of each line to be no more than 80 characters. The author also suggests hex flower rule:

No more than seven things should be going on in a single piece of code.

The author defines abstraction to capture essence of an object, i.e.,

Abstraction is the elimination of the irrelevant and the amplification of the essential.

Another factor that influences decomposition is cohesion so that code that works on the same data structure or all of its attributes is defined in the same module or class. The author cautions against the feature envy to decrease the complexity and you may need to refactor the code to another method or class. The author refers to a technique “parse, don’t validate” when validating an object so that the validate method takes less-structured input and produces more-structured output. Next, author describes fractal architecture where a large system is decomposed into smaller chunks and each chunk hides details but can be zoomed in to see the structure. The fractal architecture helps organize the code so that lower-level details are captured in a single abstract chunk and can easily fit in your brain.

8. API Design

This chapter describes principles of API design such as affordance, which uses encapsulation to preserve the invariants of objects involved in the API. The affordance allows a caller to invoke an API only when preconditions are met. The author strengthen the affordance with a poka-yoke (mistake proof) analogy, which means a good interface design should be hard to misuse. Other techniques in the chapter includes: write code for the readers; favor well-named code over comments; and X out names. The X out names replaces API name with xxx and sees if a reader can guess what the API does. For example, you may identify APIs for command-query separation where a method structure like void xxx() can be considered as command with a side effect. In order to communicate the intent of an API, the author describes a hierarchy of communication such as using API’s distinct types, helpful names, good comments, automated tests, helpful commit messages and good documentation.

9. Teamwork

In this chapter, the author provides tips for teamwork and communication with other team mates such as writing good Git commit messages using 50/72 rule where you first write a summary no wider than 50 characters, followed by a blank line and then detailed text with no wider than 72 characters. Other techniques include Continuous Integration that generally use trunk or main branch for all commits and developers make small changes optionally with feature-flags that are frequently merged. The developers are encouraged to make small commits and the code ownership is collective to decrease the bus factor. The author refers to pair programming and mob programming for collaboration within the team. In order to facilitate the collaboration, the author suggests reducing code review latency and rejecting any large change set. The reviewers should be asking whether they can maintain the code, is the code intent clear and could it be further simplified, etc. You can also pull down the code and test it locally to further gain the insight.

10. Augmenting Code

This chapter focuses on refactoring existing code for adding new functionality, enhancing existing behavior and bug fixes. The author suggests using feature-flags when deploying incomplete code. The author describes the strangler pattern for refactoring with incremental changes and suggests:

For any significant change, don’t make it in-place; make it side-by-side.

The strangler pattern can be applied at method-level where you may add a new method instead of making in-place change to an existing method and then remove the original method. Similarly, you can use class-level strangler to introduce new data structure and then remove old references. The author suggests using semantic versioning so that you can support backward compatible or breaking changes.

11. Editing Unit Tests

Though, with an automated test suite, you can refactor production code safely but there is no safety net when making changes to the test code. You can add additional tests, supplement new assertions to existing tests or change unit tests to parametersized tests without affecting existing behavior. Though, some programmers follow a single assertion per test and consider multiple assertions an Assertion Roulette but author suggests strengthening the postconditions in unit tests with additional assertions, which is somewhat similar to the Liskov Substitution Principle that says that subtypes may weaken precondition and strengthen postconditions. The author suggests separating refactoring of test and production code and use IDE’s supported refactoring tools such as rename, extract or move method when possible.

12. Troubleshooting

When troubleshooting, you first have to understand what’s going on. This chapter suggests using scientific method to make a hypothesis, performing the experiment and comparing the outcome to prediction. The author also suggests simplifying and removing the code to check if a problem goes away. Other ways to simplify the code include composing an object graph in code instead of using complex dependency injection; using pure functions instead of using mock objects; merging often instead of using complex diff tools; learning SQL instead of using convoluted object-relational mapping, etc. Another powerful technique for troubleshooting is rubber ducking where you try to explain the problem and gain a new insight in the process. In order to build quality, you should aim to reduce defects to zero. The tests also help with troubleshooting by writing an automated test to reproduce defects before fixing so that they serve as a regression test. The author cautions against slow tests and non-deterministic defects due race conditions. Finally, the author suggests using bisection that uses a binary search for finding the root cause where you reproduce the defect in half of the code and continue until you find the problem. You can also use bisection feature of Git to find the commit that introduced the defect.

13. Separation of Concerns

The author describes Kent Beck’s aphorism:

Things that change at the same rate belong together. Things that change at different rates belong apart.

The principle of separation of concerns can be used for decomposing working software into smaller parts, which can be decomposed further with nested composition. The author suggests using command query separation principle to keep side effects separated from the query operations. Object-oriented composition tends to focus on composing side effects together such as Composite design pattern, which lead to complex code. The author describes Sequential Composition that chains methods together and Referential Transparency to define a deterministic method without side effects. Next, the author describes cross cutting concerns such as logging, performance monitoring, auditing, metering, instrumentation, caching, fault tolerance, and security. The author finally describes Decorator pattern to enhance functionality, e.g., you can add logging to existing code without changing it and log actions from impure functions.

14. Rhythm

This chapter describes daily and recurring practices that software development teams follow such as daily stand-ups. The personal rhythm includes time-boxing or using Pomodoro technique; taking a break; using time deliberately; and touch type. The team rhythm includes updating dependencies regularly, scheduling other things such as checking certificates. The author describes Conway’s law:

Any organization that design a system […] will inevitably produce a design whose structure is a copy of the organization’s communication structure.

You can use this law to organize the work that impacts the code base.

15. The Usual Suspects

This chapter covers usual suspects of software engineering: architecture, algorithms, performance, security and other approaches. For example, performance is often a critical aspect but premature optimization can be wasteful. Instead correctness, an effort to reduce complexity and defects should be priority. In order to implement security, the author suggests STRIDE threat modelling, which includes Spoofing, Tempering, Repudiation, Information disclosure, Denial of service and Elevation of privilege. Other techniques include property-based testing and Behavioral code analysis can be used to extract information from Git to identify patterns and problems.

16. Tour

In this last chapter, the author shows tips on understanding an unfamiliar code by navigating to the main method and finding the way around. You can check if the application uses broader patterns such as Fractal architecture, Model-View-Controller and understands authentication, authorization, routing, etc. The author provides a few suggestions about code structure and file organization such as putting files in one directory though it’s a contestable advice. The author refers to the Hex flower and fractal architecture where you can zoom in to see more details. When using a monolithic architecture, the entire production code compiles to a single executable file that makes it harder to reuse parts of the code in new ways. Another drawback of monolithic architecture is that dependencies can be hard to manage and abstraction can be coupled with implementation, which violates the Dependency Inversion Principle. Further in order to prevent cyclic dependencies, you will need to detect and prevent Acyclic Dependency Principle. Finally, you can use test suite to learn about the system.

Summary

The book is full of practical advice on writing maintainable code such as:

  • 50/72 Rule for Git commit messages
  • 80/24 Rule for writing small blocks of code
  • Tests based on Arrange-Act-Assert and Red-Green Refactor
  • Bisection for troubleshooting
  • Checklists for a new codebase
  • Command Query Separation
  • Cyclomatic Complexity and Counting the Variables
  • Decorators for cross-cutting concerns
  • Devil’s advocate for improving assertions
  • Feature flags
  • Functional core and imperative shell
  • Hierarchy of communication
  • Parse, don’t validate
  • Postel’s law to maintain invariants
  • Regularly update dependencies
  • Reproduce defects as Tests
  • Review code
  • Semantic Versioning
  • Separate refactoring of test and production code
  • Strangler pattern
  • Threat modeling using STRIDE
  • Transformation priority premise to make small changes and keeping the code in working condition
  • X-driven development by using unit-tests, static code analysis, etc.
  • X out of Names

These heuristics help make the software development sustainable so that the team can make incremental changes to the code while maintaining high quality.

October 10, 2022

Implementing Distributed Locks (Mutex and Semaphore) with Databases

Filed under: Concurrency,Rust,Uncategorized — admin @ 10:55 pm

Overview

I recently needed a way to control access to shared resources in a distributed system for concurrency control. Though, you may use Consul or Zookeeper (or low-level Raft / Paxos implementations if you are brave) for managing distributed locks but I wanted to reuse existing database without adding another dependency to my tech stack. Most databases support transactions or conditional updates with varying degree of support for transactional guarantees, but they can’t be used for distributed locks if the business logic you need to protect resides outside the databases. I found a lock client library based on AWS Databases but it didn’t support semaphores. The library implementation was tightly coupled with concerns of lock management and database access and it wasn’t easy to extend it easily. For example, following diagram shows how cyclic dependencies in the code:

class diagram

Due to above deficiencies in existing solutions, I decided to implement my own implementation of distributed locks in Rust with following capabilities:

  • Allow creating lease based locks that can either protect a single shared resource with a Mutex lock or protect a finite set of shared resources with a Semaphore lock.
  • Allow renewing leases based on periodic intervals so that stale locks can be acquired by other users.
  • Allow releasing Mutex and semaphore locks explicitly after the user performs a critical action.
  • CRUD APIs to manage Mutex and Semaphore entities in the database.
  • Multi-tenancy support for different clients in case the database is shared by multiple users.
  • Fair locks to support first-come and first serve based access grant when acquiring same lock concurrently.
  • Scalable solution for supporting tens of thousands concurrent mutexes and semaphores.
  • Support multiple data stores such as relational databases such as MySQL, PostgreSQL, Sqlite and as well as NoSQL/Cache data stores such as AWS Dynamo DB and Redis.

High-level Design

I chose Rust to build the library for managing distributed locks due to strict performance and correctness requirements. Following diagram shows the high-level components in the new library:

LockManager Interface

The client interacts with the LockManager that defines following operations to acquire, release, renew lock leases and manage lifecycle of Mutexes and Semaphores:

#[async_trait]
pub trait LockManager {
    // Attempts to acquire a lock until it either acquires the lock, or a specified additional_time_to_wait_for_lock_ms is
    // reached. This method will poll database based on the refresh_period. If it does not see the lock in database, it
    // will immediately return the lock to the caller. If it does see the lock, it will note the lease expiration on the lock. If
    // the lock is deemed stale, (that is, there is no heartbeat on it for at least the length of its lease duration) then this
    // will acquire and return it. Otherwise, if it waits for as long as additional_time_to_wait_for_lock_ms without acquiring the
    // lock, then it will return LockError::NotGranted.
    //
    async fn acquire_lock(&self, opts: &AcquireLockOptions) -> LockResult<MutexLock>;

    // Releases the given lock if the current user still has it, returning true if the lock was
    // successfully released, and false if someone else already stole the lock. Deletes the
    // lock item if it is released and delete_lock_item_on_close is set.
    async fn release_lock(&self, opts: &ReleaseLockOptions) -> LockResult<bool>;

    // Sends a heartbeat to indicate that the given lock is still being worked on.
    // This method will also set the lease duration of the lock to the given value.
    // This will also either update or delete the data from the lock, as specified in the options
    async fn send_heartbeat(&self, opts: &SendHeartbeatOptions) -> LockResult<MutexLock>;

    // Creates mutex if doesn't exist
    async fn create_mutex(&self, mutex: &MutexLock) -> LockResult<usize>;

    // Deletes mutex lock if not locked
    async fn delete_mutex(&self,
                          other_key: &str,
                          other_version: &str,
                          other_semaphore_key: Option<String>) -> LockResult<usize>;

    // Finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the
    // given lock. If the client currently has the lock, it will return the lock, and operations such as release_lock will work.
    // However, if the client does not have the lock, then operations like releaseLock will not work (after calling get_lock, the
    // caller should check mutex.expired() to figure out if it currently has the lock.)
    async fn get_mutex(&self, mutex_key: &str) -> LockResult<MutexLock>;

    // Creates or updates semaphore with given max size
    async fn create_semaphore(&self, semaphore: &Semaphore) -> LockResult<usize>;

    // Returns semaphore for the key
    async fn get_semaphore(&self, semaphore_key: &str) -> LockResult<Semaphore>;

    // find locks by semaphore
    async fn get_semaphore_mutexes(&self,
                                   other_semaphore_key: &str,
    ) -> LockResult<Vec<MutexLock>>;

    // Deletes semaphore if all associated locks are not locked
    async fn delete_semaphore(&self,
                              other_key: &str,
                              other_version: &str,
    ) -> LockResult<usize>;
}

The LockManager interacts with LockStore to access mutexes and semaphores, which delegate to implementation of mutex and semaphore repositories for lock management. The library defines two implementation of LockStore: first, DefaultLockStore that supports mutexes and semaphores where mutexes are used to acquire a singular lock whereas semaphores are used to acquire a lock from a set of finite shared resources. The second, FairLockStore uses a Redis specific implementation of fair semaphores for managing lease based semaphores that support first-come and first-serve order. The LockManager supports waiting for the lock to be available if lock is not immediately available where it periodically checks for the availability of mutex or semaphore based lock. Due to this periodic polling, the fair semaphore algorithm won’t support FIFO order if a new client requests a lock while previous lock request is waiting for next polling interval.

Create Lock Manager

You can instantiate a Lock Manager with relational database store as follows:

let config = LocksConfig::new("test_tenant");
let mutex_repo = factory::build_mutex_repository(RepositoryProvider::Rdb, &config)
	.await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Rdb, &config)
	.await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry()).expect("failed to initialize lock manager");

Alternatively, you can choose AWS Dynamo DB as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Or Redis based data-store as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Note: The AWS Dynamo DB uses strongly consistent reads feature as by default it is eventually consistent reads.

Acquiring a Mutex Lock

You will need to build options for acquiring with key name and lease period in milliseconds and then acquire it:

let opts = AcquireLockOptionsBuilder::new("mylock")
	.with_lease_duration_secs(10).build();
let lock = lock_manager.acquire_lock(&opts)
	.expect("should acquire lock");

The acquire_lock operation will automatically create mutex lock if it doesn’t exist otherwise it will wait for the period of lease-time if the lock is not available. This will return a structure for mutex lock that includes:

{
  "mutex_key":"one",
  "tenant_id":"local-host-name",
  "version":"258d513e-bae4-4d91-8608-5d500be27593",
  "lease_duration_ms":15000,
  "locked":true,
  "expires_at":"2022-10-11T03:04:43.126542"
}

Renewing the lease of Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .with_lease_duration_secs(15)
                .build();

let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .build();
lock_manager.release_lock(&release_opts)
				.expect("should release lock");

Acquiring a Semaphore based Lock

The semaphores allow you to define a set of locks for a resource with a maximum size. The operation for acquiring semaphore is similar to acquiring regular lock except you specify semaphore size, e.g.:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
				.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key":"one_0000000000",
  "tenant_id":"local-host-name",
  "version":"5ad557df-dbe6-439d-8a31-dc367e32eab9",
  "lease_duration_ms":15000,
  "semaphore_key":"one",
  "locked":true,
  "expires_at":"2022-10-11T04:03:33.662484"
}

The semaphore lock will create mutexes internally that will be numbered from 0 to max-size (exclusive). You can get semaphore details using:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "4ff77432-ed84-48b5-9831-8e53f56c2620",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": false,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  {
    "mutex_key": "one_0000000000",
    "tenant_id": "local-host-name",
    "version": "ba5a62e5-80f1-474e-a895-c4a18d252cb9",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "749b4ded-e356-4ef5-a23b-73a4984130c8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Renewing the lease of Semaphore Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new(
  				"one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Acquiring a Fair Semaphore

The fair semaphores is only available for Redis due to internal implementation, and it requires enabling it via fair_semaphore configuration option, otherwise its usage is similar to above operations, e.g.:

let mut config = LocksConfig::new("test_tenant");
config.fair_semaphore = Some(fair_semaphore);

let fair_semaphore_repo = factory::build_fair_semaphore_repository(
  	RepositoryProvider::Redis, &config)
	.await.expect("failed to create fair semaphore");
let store = Box::new(FairLockStore::new(&config, fair_semaphore_repo));
let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry())
	.expect("failed to initialize lock manager");

Then acquire lock similar to the semaphore syntax as before:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
			.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "tenant_id": "local-host-name",
  "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

The fair semaphore lock does not use mutexes internally but for the API compatibility, it builds a mutex with a key based on combination of semaphore-key and version. You can then query semaphore state as follows:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "5779b01f-eaea-4043-8ae0-9f8b942c2727",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": true,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  [
  {
    "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "tenant_id": "local-host-name",
    "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
    "expires_at": "2022-10-11T04:41:43.845711",
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Note: The mutex_key will be slightly different for unlocked mutexes as mutex-key isn’t needed for internal implementation.

Renewing the lease of Fair Semaphore Lock

You can renew lease of fair semaphore similar to above semaphore syntax, e.g.:

let opts = SendHeartbeatOptionsBuilder::new(
  			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: Due to internal implementation of fair semaphore, the version won’t be changed upon lease renewal.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new(
    			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Command Line Interface

In addition to a Rust based interface, the distributed locks library also provides a command line interface for managing mutex and semaphore based locks, e.g.:

Mutexes and Semaphores based Distributed Locks with databases.

Usage: db-locks [OPTIONS] [PROVIDER] <COMMAND>

Commands:
  acquire

  heartbeat

  release

  get-mutex

  delete-mutex

  create-mutex

  create-semaphore

  get-semaphore

  delete-semaphore

  get-semaphore-mutexes

  help
          Print this message or the help of the given subcommand(s)

Arguments:
  [PROVIDER]
          Database provider [default: rdb] [possible values: rdb, ddb, redis]

Options:
  -t, --tenant <TENANT>
          tentant-id for the database [default: local-host-name]
  -f, --fair-semaphore <FAIR_SEMAPHORE>
          fair semaphore lock [default: false] [possible values: true, false]
  -j, --json-output <JSON_OUTPUT>
          json output of result from action [default: false] [possible values: true, false]
  -c, --config <FILE>
          Sets a custom config file
  -h, --help
          Print help information
  -V, --version
          Print version information

For example, you can acquire fair semaphore lock as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis acquire --key one --semaphore-max-size 10

Which would return:

{
  "mutex_key": "one_69816448-7080-40f3-8416-ede1b0d90e80",
  "tenant_id": "local-host-name",
  "version": "69816448-7080-40f3-8416-ede1b0d90e80",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

You can run following command for renewing above lock:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis heartbeat --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

And then release it as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis release --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

Summary

I was able to meet the initial goals for implementing distributed locks and though this library is early in development. You can download and try it from https://github.com/bhatti/db-locks. Feel free to send your feedback or contribute to this library.

September 10, 2021

Notes from “Monolith to Microservices”

Filed under: Uncategorized — admin @ 2:04 pm

I recently read Sam Newman’s book on Monolithic to Microservices architecture. I had read his previous book on Building Microservices on related topic that focused more on design and implementation of microservices but there is some overlap of topics in these books.

Chapter 1 – Just Enough Microservices

The first chapter defines how microservices can be designed to be deployed independently by modeling around a business domain.

Benefits

The major benefits of microservices include:

Independent Deployability

Modeled Around a Business Domain

The author explains one of common reason for three-tier architecture with UI/Business-Logic/Database is so common is due to Conway’s law that predicates that system design mimics organization’s communication structure.

Own Their Own Data

In order to keep reduce coupling, author recommends against sharing data with microservices.

Problems

Though, microservices provide you isolation and flexibility but they also add complexity that comes with network communication such as higher latencies, distributed transactions, and handling network failures. Other problems include:

User Interface

The author also cautions against focusing only on the server side and leaving UI as monolithic.

Technology

The author also cautions against chasing shiny new technologies instead of leveraging what you already know.

Size

The size of a microservice depends on the context but just having a small-size should not be the primary concern.

Ownership

The microservices architecture allows strong ownership but it requires that they are designed around the business domain and product lines.

Monolith

The author explains monolithic apps where all code is packaged into a single process.

Modular Monolith

In modular monolith, the code can be broken into different modules and is for deployment.

Distributed Monolith

If boundaries of microservices are not loosely coupled, they can result in distributed monolith that has disadvantages of monolithic and microservices.

Challenges and Benefits of Monolith

The author explains that monolithic design is not necessarily a legacy but a choice that depends on the context.

Cohesion and Coupling

He uses cohesion and coupling when defining microservices where stable systems encourage high cohesion and low coupling that provides independent deployment and minimize chatty services.

Implementation Coupling

The implementation coupling may be caused by sharing domain or database.

Temporal Coupling

The temporal coupling using synchronous APIs to perform an operation.

Deployment Coupling

The deployment coupling adds risk of adding unchanged modules to the deployment.

Domain Coupling

The domain coupling is caused by sharing full domain object instead of events or reducing unrelated information.

Domain-Driven Design

The author reviews domain-driven design concepts such as aggregate and bounded context.

Aggregate

In DDD, an aggregate uses a state machine to manage lifecycle of a grouped object that can be used to design a microservice so that it handles the lifecycle and storage of those aggregates.

Bounded Context

Bounded context represents a boundary of business domain that may contain one or more aggregates. These concepts can be used to define service boundaries so that each service is cohesive with a well-defined interface.

Chapter 2 – Planning a Migration

The chapter two defines a migration path for micro-services by defining goals for the migration and why you should adopt a microservice architecture.

Why Might You Choose Microservices

Common reasons for such architecture includes:

  • improving autonomy
  • reduce time to market
  • scaling independently
  • improving robustness
  • scaling the number of developers while minimizing coordination
  • embracing new technologies

When Might Microservices Be a Bad Idea?

The author also describes scenarios when a microservice architecture is a bad idea such as:

  • when business domain is unclear
  • early adopting microservices in startups
  • customer-installed software.

Changing Organizations

The author describes some of ways organizations can be persuaded to adopt this architecture using Dr. John Kottler’s eight-step process:

  • establishing a sense of urgency
  • creating the guided coalition
  • developing a vision and strategy
  • communicating the change vision
  • empowering employees
  • generating short-term wins.

Importance of Incremental Migration

Incremental migration for microservice architecture is important so that you can release these services to the production and learn from the actual use.

Cost of Change

The author explains cost of change in terms of reversible and irreversible decisions commonly used at Amazon.

Domain Driven Design

The author goes over domain-driven design again and shows how bounded context can define boundaries of the microservices. You can use event storming to define a shared domain model where participants define first domain events and then group those domain events. You can then pick a context that has few in-bound dependencies to start with and using strangler fig pattern for incremental migration. The author also shows two-axis model for service decomposition by comparing benefit vs ease of decomposition.

Reorganizing Teams

The chapter then reviews team restructure so that you can reassign responsibilities towards cross-functional model who can fully own and operate specific microservices.

How Will You Know if the Transition is Working?

You can determine if the transition is working by:

  • having regular checkpoints
  • quantitative measures
  • Qualitative measures
  • Avoiding the sunk cost fallacy.

Chapter 3 – Splitting the Monolith

The chapter three describes how to split the monolith in small steps.

To change the Monolith or Not?

You will have to decide whether to move existing code as is or reimplement.

Refactoring the Monolith

A key blocker in breaking the monolith might be tightly coupled design that requires some refactoring before the migration. The next step in this process might be a modular monolith where you have a single unit of deployment but with statically linked modules.

Pattern: Stranger Fig Application

The Strangler Fig Application incrementally migrates existing code by moving modules to external microservices. In some cases those microservices may need to invoke other common behavior in the monolith.

HTTP Proxy

If the monolith is using an HTTP reverse proxy to intercept incoming calls, it can be extended to redirect requests for the new service. If the new service chooses to implement a new protocol, it may require other changes to the proxy layer that could add risk and goes against general recommendation of “Keep the pipes dumb, the endpoints smart.” One way to remediate is to create a layer for converting protocol from the legacy to the new format such as REST to gRPC.

Service Mesh

Instead of a centralized proxy, you can use service meshes such as Envoy that is deployed as a control-plane along with each service that acts as a proxy for communicating with the service.

Message Interception and Content-based Routing

If a monolith is using messaging, you can intercept messages and use a content-based router to send messages to the new service

Pattern: UI Composition

The UI composition looks at how user interface can be migrated from monolithic backend to microservice architecture.

Page Composition

The page-composition migrates one page at a time based on product verticals to ensure that old page links are replaced with the new URLs when they are changed. You can choose a low-risk vertical for UI migration to reduce the risk of breaking functionality.

Widget Composition

The widget composition reduces the UI migration risk by just replacing a single widget at a time. For example, you may use Edge-Side Includes (ESI) to define a template in the web page and a web server splices in this content. Though, this technique is less common these days due to browser can make requests to populate a widget. This technique was used by Orbitz to render UI modules from a central orchestration service but due to its monolithic design, it became a bottleneck for coordinating changes. The central orchestration service was then migrated to microservices incrementally.

Mobile Applications

These UI composition changes can also be applied to mobile apps, however mobile app is a monolith and whole application needs to be deployed. Some organizations such as Spotify allow dynamic changes from the server side.

Micro Frontends

Modern web browsers and standards such as Web Component specification to help build single page apps and micro frontends by using widget-based composition.

The UI composition is highly effective when migrating vertical slices and you have the ability to change the existing user interface.

Pattern: Branch by Abstraction

The “Branch by Abstraction” can be used with incremental migration using Strangler Fig when the functionality is deeply nested and other developers may be making changes to the same codebase. In order to prevent merge conflicts from large changes and to keep minimal disruption for developers, you create an abstraction for the functionality to be replaced. This abstraction can then be implemented by both existing code and new implementation. You can then switch over the abstraction to new implementation once you are done and clean up old implementation.

Step1: Create abstraction

Create an abstraction using “Extract Interface” refactoring.

Step2: Use abstraction

Refactor the existing clients to use the new abstraction point.

Step3: Create new implementation

Implement the abstraction to call the external service inside the monolith. You may simply return “Not Implemented” errors in the new implementation and deploy code into production as this new service isn’t actually being called.

Step4: Switch implementation

Once the new implementation is done, you can switch the abstraction to point to the new implementation. You may also use feature flags to toggle back and forth quickly.

Step5: Clean up

Once the new implementation is fully working, the old implementation can be removed and you may also remove the abstraction if needed.

Fallback Mechanism

A variation of the branch by abstraction pattern called verify branch by abstraction can be used to implement a live verification where if the new implementation fails, then the old implementation could be used instead.

Overall, branch by abstraction is a fairly general-purpose pattern and useful in most cases where you can change the existing code.

Pattern: Parallel Run

As the strangler fig pattern and branch by abstraction pattern allow both old and new implementations to coexist in production, you can use parallel runs to call both implementations and compare results. Typically, the old implementation is considered the source of truth until the new implementation can be verified (Examples: new pricing system, FTP upload).

N-Version Programming

Critical control systems such as air flight systems use redundant subsystems to interpret signals and then use quorum to find the results.

Verification Techniques

In addition to simply comparing results, you may also need to compare nonfunctional aspects such as latency and failure rate.

Using Spies

A spy is used with unit testing to stub a piece of functionality such as communication with an external service and verify the results (Examples: sending an email or remote notification). Spy is generally run as external process and you may use record/play to replay the events for testing. GitHub’s Scientist is a notable library for this pattern.

Dark Launching and Canary Releasing

The parallel run can be combined with canary release to test the new implementation before releasing to all users. Similarly, dark launching allows enabling the new implementation to only selected users (A/B testing).

Parallel run requires a lot of effort so care must be taken when it’s used.

Pattern: Decorating Collaborator

If you need to trigger some behavior inside the monolith, you can use decorating collaborator pattern to attach new functionality (Example: Loyalty Program – earn points on orders). You may use proxy to intercept the request and add new functionality. On the downside, this may require additional data, which adds more complexity and latency.

This is a simple and elegant approach but it works best the required information can be extracted from the request.

Pattern: Change Data Capture

This pattern allows reacting to changes made in a datastore instead of intercepting the calls. This underlying capture system is coupled with the monolithic datastore (Example: Issue Loyalty Cards – trigger on insert that calls Loyalty Card Printing service).

Implementing Change Data Capture

Database triggers

For example, defining triggers on relational database that calls a service when a record is inserted.

Transaction log pollers

The transactional logs from transactional databases can be inspected by external tools to capture data changes and then pass this data to message brokers or other services.

Batch delta copier

This simply scans the database on a regular schedule for the data that has changed and copies this data to the destination.

The change data capture has a lot of implementation challenges so its use must be kept to minimal.

Chapter 4 – Decomposing the Database

This chapter reviews patterns for managing a single shared database:

Pattern: The Shared Database

The implementation coupling is generally caused by the shared database because the ownership or usage of the database schema is not clear. This weakens the cohesion of business logic because the behavior is spread across multiple services. The author points that the only two situations where you may share the database are when using database for read-only static reference data or when a service is directly exposing a database to handle multiple consumers (Database as a service interface). Also, in some cases the work involved with splitting the database might be too large for incremental migration where you may use some coping patterns to manage the shared database.

Pattern: Database View

When sharing a database, you may define database views for different services to limit the data that is visible to the service.

The Database as a Public Contract

When sharing a database, it might be difficult who is reading or writing the data especially when different applications use the same credentials. This also prevent changing the database schema because some of the applications might not be actively maintained.

Views to Present

One way to change schema without breaking existing application is to define views that looks like old schema. The database view may also just project limited information to implement a form of information hiding. In some cases you may need a materialized view to improve performance. You should use this pattern only when existing monolithic schema can’t be decomposed.

Pattern: Database Wrapping Service

The database wrapping service hides the database behind a service. This can be used when the underlying schema can’t be broken down. This provides better support for customized projection and read/write than the database views.

Pattern: Database-as-a-Service Interface

In cases when you just need to query the database, you may create a dedicated reporting database that can be exposed as a read-only endpoint. A mapping engine can listen for changes in the internal database and then persist them in the reporting database for query purpose by internal/external consumers.

Implementing a Mapping Engine

You may use a change data capture system (Debezium), a batch process to copy the data or a message broker to listen for data events. This allows presenting data in different database technology and provides more flexibility than the database views.

Pattern: Aggregate Exposing Monolith

When a microservice needs a data inside the monolith database, you can expose a service endpoint to provide the access to the domain aggregate. This pattern allows exposing the information in a well defined interface and is safer than exposing the shared database despite additional work.

Pattern: Change Data Ownership

If the monolith needs to access the data in a shared database that should belong to the new microservice, then monolith can be updated to call the new service and treat it as a source of truth.

Database Synchronization

As a strangler fig pattern allows switching back to monolith if we find an issue in the microservice but it requires that the data between the monolith and the new service remains in sync. You may use database view and a shared database for short term until the migration is successfully completed. Alternatively, you may sync both databases via code but it requires some careful thoughts.

Pattern: Synchronize Data in Application

Migrating data from one database to another requires performing synchronization between two data sources.

Step 1: Bulk Synchronize Data

You may take a snapshot of the source data and import it into the new data source while the existing system is kept running. As the source data might be changed while the import process is running, a change data capture process can be implemented to import the changes since the import. You can then deploy new version of the application after this process.

Step 2: Synchronize on Write, Read from Old Schema

Once both databases are in sync, the new application writes all data to both databases while reading from the old database.

Step 3: Synchronize on Write, Read from New Schema

Once, you verify the reads from new database work, you can switch the application to switch the new database as a source of truth.

Pattern: Trace Write

This is a variation of the synchronize data in application pattern where the source of truth is moved incrementally and both sources are considered sources of truth during migration. For example, you may just migrate one domain aggregate at a time and other services may use either data source depending on the information they need.

Data Synchronization

If data is duplicated inconsistency, you may need to apply following options:

  • Write to one source – data to the other source of truths is synchronized after the write.
  • Send writes to both sources – The client makes a call to both sources or use an intermediary to broadcast the request.
  • Send writes to either source – the data is synchronized behind the scene.

The last option should be avoided as it requires two way synchronization. In other cases, there will be some delay in the data being consistent (eventual consistency).

Splitting Apart the Database

Physical versus Logical Database Separation

A physical database can host multiple logically separated schemas so migration to separate physical databases can be planned later to improve robustness and throughput/latency. A single physical database can become a single point of failure but it can be remedied by using multi-primary database modes.

Splitting the Database First, or the Code?

Split the Database First

This may cause multiple database calls instead of one action such as SELECT statement or break transactional integrity so you can detect performance problems earlier. However, it won’t yield much short-term benefits.

Pattern: Repository per bounded context

Breaking down the repositories along the boundaries of bounded context help understand dependencies and coupling between tables.

Pattern: Database per bounded context

This allows you to decompose database around the lines of bounded context so that each bounded context uses a distinct schema. This pattern can be applied in startups when the requirements may change drastically so you can keep multiple schemas while using monolithic architecture.

Split the Code First

This allows understanding data dependencies in the new service and benefit of independent deployment thus offering the short-term improvements.

Pattern: Monolith as data access layer

This allows creating an API in the monolith to provide access to the data.

Pattern: Multi-schema storage

When adding new tables while migrating from the monolith, add new tables to its own schema.

Split Database and Code Together

This split the code and data at once but it takes more effort.

Pattern: Split Table

This splits a single shared table into multiple tables based on boundaries of bounded contexts. However, this may break database transactions.

Pattern: Move Foreign-Key Relationship to Code

Moving the Join

Instead of using database join but in the new service, you will need to query the data from another service.

Database Consistency

As you can’t rely on the referential integrity enforced by the database with multiple schemas, you may need to enforce it in the application such as:

  • check before deletion or existence but it can be error prone.
  • handle deletion gracefully – such as not showing missing information and services may also subscribe to add/delete events for related data (recommended).
  • don’t allow deletion

Shared Static Data

Duplicate static reference data

This will result in some data inconsistencies.

Pattern: Dedicated reference data schema

However, you may need to share the physical database.

Pattern: Static reference data library

This may not be suitable when using different programming languages and you will have to support multiple versions of the library.

Pattern: Static reference data service

This will add another dependency but it can be cached at the client side with update events to sync the local cache.

Transactions

ACID Transactions

This will be hard with multiple schemas but you may use state such as PENDING/VERIFIED to manage atomicity.

Two-Phase Commits

This breaks transaction into voting and commit phases and rollback message is sent if any worker doesn’t vote for commit.

Distributed Transactions – just say No

Sagas

A saga or long-lived transactions use an algorithm that can coordinate multiple changes in state but avoid locking resources. It breaks down LLT into a sequence of transactions that can occur independently as a short-lived.

Saga Failure Modes

Saga provides backward and forward recovery where backward recovery reverts the failure by using compensating transactions. The forward recovery allows continuation from the failure by retrying it.

Note: The rollback will undo all previously executed transactions. You can move forward the steps that are most likely to fail to avoid triggering compensating transactions on large number of steps.

Implementing Sagas
  • orchestrated sags – You may use multiple orchestration to break down the saga using BPM or other tools.
  • Choreographed sagas – This broadcasts events using a message broker. However, the scope of saga transaction may not be apparently visible.

Chapter 5 – Growing Pains

More Services, More Pain

Ownership at Scale

  • Strong code ownership – large scale microservices
  • Weak code ownership
  • Collective code ownership

Breaking Changes

A change in a microservice may break backward compatibility or other changes for consumers. You can ensure that you don’t break contracts when making changes to the services by using explicit schemas and maintaining semantics. You may also support multiple versions of the service if you need to break backward compatibility and allow existing clients to migrate to the new version.

Reporting

A monolithic database simplifies reporting but with different schemas and databases, you may need to build a reporting database to aggregate data from different services.

Monitoring and Troubleshooting

A monolithic app is easier to monitor but with multiple microservices you will need to use log aggregation and define a correlation id with tracing tools to see a transaction events from different services.

Test in Production

You may use synthetic transactions to perform end-to-end testing in production.

Observability

You can gather information about the system by tracing, logs and other system events.

Local Developer Experience

When setting up a local environment, you may need to setup a large number of services that can slow down development process. You may need to stub out services or use tools such as Telepresence proxy to call remote services.

Running Too Many Things

You may use Kubernetes to manage the deployment and troubleshooting the microservices.

End-to-End Testing

A microservice architecture increases the scope of end-to-end testing and you have to debug false negative due to environmental issues. You can use following approaches for end-to-end testing:

  • Limit scope of functional automated tests
  • Use consumer-driven contracts
  • Use automated release remediation and progressive delivery
  • Continually refine your quality feedback cycles

Global versus Local Optimization

You may have to solve same problem with multiple services such as service deployment. You may need to evaluate problems as irreversible or reversible decisions and adopt a broader discussion for irreversible decisions.

Robustness and Resiliency

Distributed systems exhibit a variety of failures so you may need to run redundant services, use asynchronous communication or apply other patterns such as circuit breakers, retries, etc.

Orphaned Services

The orphaned services don’t have clear owners so you can’t immediately fix it if the service stops working. You may need a service registry or other tools to track the services, however some services may predate these tools.

August 15, 2021

Structured Concurrency with Swift

Filed under: Concurrency,Uncategorized — Tags: , , , — admin @ 6:19 pm

I wrote about support of structured concurrency in Javascript/Typescript, Erlang/Elixir, Go, Rust, Kotlin and Swift last year (Part-I, Part-II, Part-III, Part-IV) but Swift language was still in development for async/await and actors support. The Swift 5.5 will finally have these new concurrency features available, which are described below:

Async/Await

As described in Part-IV, Swift APIs previously used completion handlers for asynchronous methods that suffered from:

  • Poor error handling because you could not use a single way to handle errors/exceptions instead separate callbacks for errors were needed
  • Difficult to cancel asynchronous operation or exit early after a timeout.
  • Requires a global reasoning of shared state in order to prevent race conditions.
  • Stack traces from the asynchronous thread don’t include the originating request so the code becomes hard to debug.
  • As Swift/Objective-C runtime uses native threads, creating a lot of background tasks results in expensive thread resources and may cause excessive context switching.
  • Nested use of completion handlers turn the code into a callback hell.

Following example shows poor use of control flow and deficient error handling when using completion handlers:

func fetchThumbnails(for ids: [String],
    completion handler: @escaping ([String: UIImage]?, Error?) -> Void) {
    guard let id = ids.first else { return handler([:], nil) }
    let request = thumbnailURLRequest(for: id)
    URLSession.shared.dataTask(with: request) { data, response, error in
        guard let response = response,
              let data = data else { return handler(nil, error) } // Poor error handling
        UIImage(data: data)?.prepareThumbnail(of: thumbSize) { image in
            guard let image = image else { return handler(nil, ThumbnailError()) }
        }
        fetchThumbnails(for: Arrays(ids.dropFirst()) { thumbnail, error in
            // cannot use loop
            ...
        }
    }
}

Though, use of Promise libraries help a bit but it still suffers from dichotomy of control flow and error handling. Here is equivalent code using async/await:

func fetchThumbnails(for ids: [String]) async throws -> [String: UIImage] {
	let thumbnails: [String: UIImage] = [:]
    for id in ids {
    	let request = thumbnailURLRequest(for: id)
        let (data, response) = try await URLSession.shared.dataTask(for: request)
        try validateResponse(response)
        guard let image = await UIImage(data: data)?.byPreparingThumbnail(ofSize: thumbSize) else { throw ThumbnailError()) }
        thumbnails[id] = image
    }
    return thumbnails
}

As you can see, above code not only improves control flow and adds uniform error handling but it also enhances readability by removing the nested structure of completion handlers.

Tasks Hierarchy, Priority and Cancellation

When a new task is created using async/await, it inherits the priority and local values of the parent task, which are then passed to the entire hierarchy of child tasks from the parent task. When a parent task is cancelled, the Swift runtime automatically cancels all child tasks, however Swift uses cooperative cancellation so child tasks must check for cancellation state otherwise they may continue to execute, however the results from cancelled tasks are discarded.

Continuations and Scheduling

Swift previously used native threads to schedule background tasks, where new threads were automatically created when a thread is blocked or waiting for another resource. The new Swift runtime creates native threads based on the number of cores and background tasks use continuations to schedule the background task on the native threads. When a task is blocked, its state is saved on the heap and another task is scheduled for processing on the thread. The await syntax suspends current thread and releases control until the child task is completed. This cooperative scheduling requires runtime support for non-blocking I/O operations and system APIs so that native threads are not blocked and continue to work on other background tasks. This also limits background tasks from using semaphores and locks, which are discussed below.

async function

In above example, when a thread is working on a background task “updateDatabase” that starts a child tasks “add” or “save”, it saves the tasks as continuations on heap. However, if current task is suspended then the thread can work on other tasks as shown below:

Multiple Asynchronous Tasks

The async/await in Swift also allows scheduling multiple asynchronous tasks and then awaiting for them later, e.g.

struct MarketData {
    let symbol: String
    let price: Int
    let volume: Int
}

struct HistoryData {
    let symbol: String
    let history: [Int]
    func sum() -> Int {
      history.reduce(0, +)
    }
}

func fetchMarketData(symbol: String) async throws -> MarketData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(MarketData(symbol: symbol, price: 10, volume: 200)))
        }
    }
}

func fetchHistoryData(symbol: String) async throws -> HistoryData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(HistoryData(symbol: symbol, history: [5, 10, 15, 20])))
        }
    }
}

func getMovingAverage(symbol: String) async throws -> Int {
    async let marketData = fetchMarketData(symbol: symbol)
    async let historyData = fetchHistoryData(symbol: symbol)
    let sum = try await marketData.price + historyData.sum()
    return try await sum / (historyData.history.count+1)
}

The async let syntax is called concurrent binding where the child task executes in parallel to the parent task.

Task Groups

The task groups allow dispatching multiple background tasks that are executed concurrently in background and Swift automatically cancels all child tasks when a parent task is cancelled. Following example demonstrates use of group API:

func downloadImage(id: String) async throws -> UIImage {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(UIImage(data: [])))
        }
    }
}

func downloadImages(ids: [String]) async throws -> [String: UIImage] {
    var images: [String: UIImage] = [:]
    try await withThrowingTaskGroup(of: (String, UIImage).self) { group in
        for id in ids {
            group.addTask(priority: .background) {
                return (id, try await downloadImage(id: id))
            }
        }
        for try await (id, image) in group {
            images[id] = image
        }
    }
    return images
}

As these features are still in development, Swift has recently changed group.async API to group.addTask. In above example, images are downloaded in parallel and then for try await loop gathers results.

Data Races

Swift compiler will warn you if you try to mutate a shared state from multiple background tasks. In above example, the asynchronous task returns a tuple of image-id and image instead of mutating shared dictionary. The parent task then mutates the dictionary using the results from the child task in for try await loop.

Cancellation

You can also cancel a background task using cancel API or cancel all child tasks of a group using group.cancelAll(), e.g.

group.cancelAll()

The Swift runtime automatically cancels all child tasks if any of the background task fails. You can store reference to a child task in an instance variable if you need to cancel a task in a different method, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]

    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
              ...
            }
    }
    func collectionView(_ collectionView: UICollectionView,
        didEndDisplaying cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
        imageTasks[item]?.cancel()
    }
}

As cancellation in Swift is cooperative, you must check cancellation state explicitly otherwise task will continue to execute but Swift will reject the results, e.g.

if Task.isCancelled {
    return // return early
}

Timeout

The task or async/await APIs don’t directly support timeout so you must implement it manually similar to cooperative cancellation.

Semaphores and Locks

Swift does not recommend using Semaphores and Locks with background tasks because they are suspended when waiting for an external resource and can be later resumed on a different thread. Following example shows incorrect use of semaphores with background tasks:

func updateDatabase(_ asyncUpdateDatabase: @Sendable @escaping () async -> Void {
  let semaphore = DispatchSemaphore(value: 0)
  Task {
    await asyncUpdateDatabase()
    semaphore.signal()
  }
  semaphore.wait() // Do not use unsafe primitives to wait across task boundaries
}

TaskLocal

You can annotate certain properties with TaskLocal, which are stored in the context of Task and is available to the task and all of its children, e.g.

enum TracingExample {
    @TaskLocal
    static let traceID: TraceID?
}
...
guard let traceID = TracingExample.traceID else {
  print("no trace id")
  return
}
print(traceID)

Detached Tasks (Unstructured)

Above tasks and async/await APIs are based on structured concurrency where parent task is not completed until all child background tasks are done with their work. However, Swift allows launching detached tasks that can continue to execute in background without waiting for the results, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]
    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
                defer { imageTasks[item] = nil }
                let images = try await getImages(for: ids)
                Task.detached(priority: .background) {
                    await withThrowingTaskGroup(of: Void.self) { g in
                        g.addTask { try await addImageCache(for: images) }
                        g.addTask { try await logImages(for: images) }
                    }
                }
                display(images, in: cell)
            }
    }
}

Legacy APIs

The legacy code that use completion-handlers can use following continuation APIs to support async/await syntax:

func persistPosts() async throws -> [Post] {
    typealias PostContinuation = CheckedContinuation<[Post], Error>
    return try await withCheckedThrowingContinuation { (continuation: PostContinuation) in
        self.getPersistentPosts { posts, error in
            if let error = error {
                continuation.resume(throwing: error)
            } else {
                continuation.resume(returning: posts)
            }
        }
    }
}

In above example, the getPersistentPosts method used completion-handler and persistPosts method provides a bridge so that you can use async/await syntax. The resume method can only called once for the continuation. 

You may also save continuation in an instance variable when you need to resume in another method, e.g.

class MyViewController: UIViewController {
    private var activeContinuation: CheckedContinuation<[Post], Error>?
    func sharePostsFromPeer() async throws -> [Post] {
        try await withCheckedThrowingContinuation { continuation in
            self.activeContinuation = continuation
            self.peerManager.syncSharedPosts()
        }
    }
}
extension MyViewController: PeerSyncDelegate {
    func peerManager(_ manager: PeerManager, received posts: [Post]) {
        self.activeContinuation?.resume(returning: posts)
        self.activeContinuation = nil
    }
    func peerManager(_ manager: PeerManager, hadError error: Error) {
        self.activeContinuation?.resume(throwing: error)
        self.activeContinuation = nil
    }
}

Implementing WebCrawler Using Async/Await

Following example shows implementation of WebCrawler using async/await described in Part I of the concurrency series:

import Foundation
struct Request {
    let url: String
    let depth: Int
    let deadline: DispatchTime
}
enum CrawlError: Error {
    case timeoutError(String)
}
let MAX_DEPTH = 4
let MAX_URLS = 11
let DOMAINS = [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "gh.com",
  "hi.com",
  "ij.com",
  "jk.com",
  "kl.com",
  "lm.com",
  "mn.com",
  "no.com",
  "op.com",
  "pq.com",
  "qr.com",
  "rs.com",
  "st.com",
  "tu.com",
  "uv.com",
  "vw.com",
  "wx.com",
  "xy.com",
  "yz.com",
];
public func crawl(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawl(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}
public func crawlWithActors(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawlWithActors(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}

///////////////// PRIVATE METHODS ////////////////
func doCrawl(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    try await withThrowingTaskGroup(of: (Request, Int).self) { group in
        for req in requests {
	    group.addTask(priority: .background) {
	        return (req, try await handleRequest(req))
	    }
        }
        for try await (req, childURLs) in group {
	    if totalChildURLs % 10 == 0 {
		print("received request \(req)")
	    }
	    totalChildURLs += childURLs
        }
    }
    return totalChildURLs
}
func doCrawlWithActors(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    let crawler = CrawlActor()
    for req in requests {
     	let childURLs = try await crawler.handle(req)
	totalChildURLs += childURLs
    }
    return totalChildURLs
}
func handleRequest(_ request: Request) async throws -> Int {
    let contents = try await download(request.url)
    let newContents = try await jsrender(request.url, contents)
    if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
        try await index(request.url, newContents)
        let urls = try await parseURLs(request.url, newContents)
        let childURLs = try await doCrawl(urls: urls, depth: request.depth + 1, deadline: request.deadline)
        return childURLs + 1
    } else {
        return 0
    }
}
func download(_ url: String) async throws -> String {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    return randomString(100)
}
func jsrender(_ url: String, _ contents: String) async throws -> String {
    // for SPA apps that use javascript for rendering contents
    return contents
}
func index(_ url: String, _ contents: String) async throws {
    // apply standardize, stem, ngram, etc for indexing
}
func parseURLs(_ url: String, _ contents: String) async throws -> [String] {
    // tokenize contents and extract href/image/script urls
    var urls = [String]()
    for _ in 0..<MAX_URLS {
        urls.append(randomUrl())
    }
    return urls
}
func hasContentsChanged(_ url: String, _ contents: String) -> Bool {
    return true
}
func isSpam(_ url: String, _ contents: String) -> Bool {
    return false
}
func randomUrl() -> String {
    let number = Int.random(in: 0..<WebCrawler.DOMAINS.count)
    return "https://" + WebCrawler.DOMAINS[number] + "/" + randomString(20)
}
func randomString(_ length: Int) -> String {
  let letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
  return String((0..<length).map{ _ in letters.randomElement()! })
}

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using try await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following are major features of the structured concurrency in Swift:

  • Concurrency scope?—?The async/await defines scope of concurrency where all child background tasks must be completed before returning from the asynchronous function.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling?—?Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • Swift runtime schedules asynchronous tasks on a fixed number of native threads and automatically suspends tasks when they wait for I/O or other resources.

Following are the major shortcomings in Swift for its support of structured concurrency:

  • The most glaring omission in above implementation is timeout, which is not supported in Swift’s implementation.
  • Swift runtime manages scheduling of tasks and you cannot pass your own execution dispatcher for scheduling background tasks.

Actors

Actor Model is a classic abstraction from 1970s for managing concurrency where an actor keeps its internal state private and uses message passing for interaction with its state and behavior. An actor can only work on one message at a time, thus it prevents any data races when accessing from multiple background tasks. I have previously written about actors and described them Part II of the concurrency series when covering Erlang and Elixir.

actor

Instead of creating a background task using serial queue such as:

final class Counter {
    private var queue = DispatchQueue(label: "counter.queue")
    private var _count : Int = 0
    var count: Int {
        queue.sync {
            _count
        }
    }

    func incr() {
        queue.async(flags: .barrier) {
            self._count += 1
        }
    }
    func decr() {
        queue.async(flags: .barrier) {
            self._count -= 1
        }
    }
}

The actor syntax simplifies such implementation and removes all boilerplate e.g.

actor Counter {
    var count: Int = 0
    func incr() {
        count += 1
    }
    func decr() {
        count -= 1
    }
}

Above syntax protects direct access to the internal state and you must use await syntax to access the state or behavior, e.g.

Task {
	let c = Counter()
    await withTaskGroup(of: Void.self) { group in
        for i in 0..<100 {
            group.async {
                await c.incr()
            }
        }
    }
    print("count \(await c.count)")
}

Priority Inversion Principle

The dispatch queue API applies priority inversion principle when a high priority task is behind low priority tasks, which bumps up the priority of low priority tasks ahead in the queue. The runtime environment then executes the high priority task after completing those low priority tasks. The actor API instead can choose high priority task directly from the actor’s queue without waiting for completion of the low priority tasks ahead in the queue.

Actor Reentrancy

If an actor invokes another actor or background task in its function, it may get suspended until the background task is completed. In the meantime, another client may invoke the actor and modify its state so you need to check assumptions when changing internal state. A continuation used for the background task may be scheduled on a different thread after resuming the work, you cannot rely on DispatchSemaphore, NSLock, NSRecursiveLock, etc. for synchronizations.

Following code from WWDC-2021 shows how reentrancy can be handled safely:

actor ImageDownloader {
    private enum CacheEntry {
        case inProgress(Task.Handle<Image, Error>)
        case ready(Image)
    }
    private var cache: [URL: CacheEntry] = [:]
    func downloadAndCache(from url: URL) async throws -> Image? {
        if let cached = cache[url] {
            switch cached {
            case .ready(let image):
                return image
            case .inProgress(let handle):
                return try await handle.get()
            }
        }
        let handle = async {
            try await downloadImage(from: url)
        }
        cache[url] = .inProgress(handle)
        do {
            let image = try await handle.get()
            cache[url] = .ready(image)
            return image
        } catch {
            cache[url] = nil
            throw error
        }
    }
}

The ImageDownloader actor in above example downloads and caches the image and while it’s downloading an image. The actor will be suspended while it’s downloding the image but another client can reenter the downloadAndCache method and download the same image. Above code prevents duplicate requests and reuses existing request to serve multiple concurrent clients.

Actor Isolation

The actors in Swift prevent invoking methods directly but you can annotate methods with nonisolated if you need to call them directly but those methods cannot mutate state, e.g.

actor Account {
    let id: String
    var balance: Double = 0
    init(id: String) {
        self.id = id
    }
}
extension Account: Hashable {
    nonisolated func hash(into hasher: inout Hasher) {
        hasher.combine(id)
   }
   static func == (lhs: Account, rhs: Account) -> Bool {
        return lhs.id == rhs.id
   }
}

Sendable

The actors requires that any data structure used in its internal state are thread safe and implement Sendable protocol such as:

  • Value types
  • Actors
  • Immutable classes
  • Synchronized classes
  • @Sendable Functions
struct Book: Sendable {
    var title: String
    var authors: [Author]
}

@MainActor

The UI apps require that all UI updates are performed on the main thread and previously you had to dispatch UI work to DispatchQueue.main queue. Swift now allows marking functions, classes or structs with a special annotations of @MainActor where the functions are automatically executed on the main thread, e.g.

@MainActor func checkedOut(_ books: [Book]) {
  booksView.checkedOutBooks = books
}
...
await checkedOut(booksOnLoan)

Following example shows how a view-controller can be annotated with the @MainActor annotations:

@MainActor class MyViewController: UIViewController {
  func onPress() .... // implicitly on main-thread
  nonisolated func fetch() async {
    ...

In above example, all methods for MyViewController are executed on the main thread, however you can exclude certain methods via nonisolated keyword.

@globalActor

The @globalActor annotation defines a singleton global actor and @MainActor is a kind of global actor. You can also define your own global actor such as:

@globalActor
public struct GlobalSettings {
  public actor SettingsActor {
     func rememberPassword() -> Bool {
        return UserDefaults.standard.bool(forKey: "rememberPassword")
     }
  }

  public static let shared = SettingsActor()
}

...
let rememberPassword = await GlobalSettings.shared.rememberPassword()

Message Pattern Matching

As actors in Swift use methods to invoke operations on actor, they don’t support pattern matching similar to Erlang/Elixir, which offer selecting next message to process by comparing one or more fields in the message.

Local only

Unlike actors in Erlang or Elixir, actors in Swift can only communicate with other actors in the same process or application and they don’t support distributed communication to remote actors.

Actor Executor/Dispatcher

The actor protocol defines following property to access the executor:

var unownedExecutor: UnownedSerialExecutor

However, unownedExecutor is a read-only property that cannot be changed at this time.

Implementing WebCrawler Using Actors and Tasks

Following example shows implementation of WebCrawler using actors and tasks described in Part I of the concurrency series:

import Foundation
actor CrawlActor {
    public func handle(_ request: Request) async throws -> Int {
	let contents = try await download(request.url)
	let newContents = try await jsrender(request.url, contents)
  	if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
    	    try await index(request.url, newContents)
    	    let urls = try await parseURLs(request.url, newContents)
    	    let childURLs = try await doCrawlWithActors(urls: urls, depth: request.depth + 1, deadline: request.deadline)
    	    return childURLs + 1
  	} else {
    	    return 0
  	}
    }
}

Above implementation uses actors for processing crawling requests but it shares other code for parsing and downloading web pages. As an actor provides a serialize access to its state and behavior, you can’t use a single actor to implement a highly concurrent web crawler. Instead, you may divide the web domain that needs to be crawled into a pool of actors that can share the work.

Performance Comparison

Following table from Part-IV summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

You can download full code for Swift example from https://github.com/bhatti/concurency-katas/tree/main/swift.

Overall, Swift’s new features for structured concurrency including async/await and actors is a welcome addition to its platform. On the downside, Swift concurrency APIs lack support for timeouts, customized dispatcher/executors and micro benchmarks showed higher overhead than expected. However, on the positive side, the Swift runtime catches errors due to data races and the new async/await/actors syntax prevents bugs that were previously caused by incorrect use of completion handlers and error handling. This will help developers write more robust and responsive apps in the future.

August 3, 2020

Summary of Data Consistency in Relational and NoSQL Databases

Filed under: Uncategorized — admin @ 8:34 pm

The relational databases generally guarantee transactions in terms of ACID properties that include:

  • A – Atomicity – transaction either succeeds or fails.
  • C – Consistency – all data will remain consistent.
  • I – Isolation – transaction will not be affected by other transactions.
  • D – Durability – changes from the transaction will be stored persistently.

Following is a list of transaction isolation levels:

  • Dirty Read – a transaction can read data that has not yet been committed by another transaction.
  • Non Repeatable Read – a transaction sees different data when reading same row again due to concurrency.
  • Phantom Read – a transaction sees different set of rows when running the same query again.

The SQL standard defines following isolation levels:

  • Read-Uncommitted – a transaction may see uncommitted changes by other transactions, thus allowing dirty reads.
  • Read Committed – a transaction only sees committed changes, thus preventing dirty reads.
  • Repeatable Read – prevents non-repeatable reads
  • Serializable – a highest isolation level where executing transactions appear to be executing serially.

NoSQL based Distributed systems define following consistency levels:

  • Strict consistency – a strongest consistency level that returns most recent updates when reading a value.
  • Sequential consistency – a weaker model as defined by Lamport(1979)
  • Linearizability (atomic) – guarantees sequential consistency with the real-time constraint
  • Causal consistency – a weaker model than Linearizability that guarantees write operations that are casually related must be seen in the same order

Most NoSQL databases lack ACID transaction guarantees and instead offer tradeoffs in terms of CAP theorem and PACELC, where CAP theorem states that a database can only guarantee two of three properties:

  • Consistency – Every node in the cluster responds with the most recent data that may require blocking the request until all replicas are updated.
  • Availability – Every node returns an immediate response even if the response isn’t the most recent data.
  • Partition Tolerance – The system continues to operate even if a node loses connectivity with other nodes.

Consistency in CAP is different than that of ACID where consistency in ACID means a transaction won’t corrupt the database and guarantees database correctness with transaction order but in CAP, it means maintaining Linearizability property that guarantees having the most up-to-date data. Serialization is highest form of isolation between transactions in ACID model with multi-operation, multi-object, arbitrary total order whereas linearizability is a single-operation, single-object, real-time order that applies to distributed systems.

In the event of a network failure (MTBF, MTTF, MTTR), you must choose Partition Tolerance from , so choice is between AP and CP (availability vs consistency). PACELC theorem extends CAP where you choose between availability (A) and consistency in presence of network partitioning (P) but choose between latency (L) and consistency) otherwise (E). Most NoSQL database choose availability and support Basically Available, Soft State, Eventual Consistency (Base) instead of strict serializability or linearizability. The eventual consistency only guarantees liveness where updates will be observed eventually. Some of modern NoSQL databases also support strong eventual consistency using conflict-free replicated data types.

September 30, 2016

Review of “Simple architecture for complex enterprises”

Filed under: Uncategorized — admin @ 11:24 am

“Simple architecture for complex enterprises” focuses on tackling complexity in IT systems. There are a number of methodologies such as Zachman, TOGAF and EA but they don’t address how to manage complexity. The author shares following concerns when implementing an enterprise architecture:

  • Unreliable Enterprise Information – when enterprise cannot access or trust its information
  • Untimely Enterprise Information – when reliable information is not available in a timely fashion.
  • New Projects Underway – when building a new complex IT project without understanding its relationship to the business processes.
  • New Companies Being Acquired
  • Enterprise Wants to Spin Off Unit
  • Need to Identify Outsourcing Opportunities
  • Regulatory Requirements
  • Need to Automate Relationships with Internal Partners
  • Need to Automate Relationships with Customers
  • Poor Relationship Between IT and Business Units
  • Poor Interoperability of IT Systems
  • IT Systems Unmanageable – when IT systems are built piecemeal and patched together.

The author defines enterprise architecture as:

“An enterprise architecture is a description of the goals of an organization, how these goals are realized by business processes, and how these business processes can be better served through technology.”

The author asserts need for planning when building enterprise IT systems and argues that complexity hinders success of these systems and cites several examples from Government and business industries. The author defines the Zachman framework for organizing architecture artifacts and design documents. John Zachman proposed six descriptive foci: data, function, network, people, time and motivation in the framework.

The author explains that Zachman framework does not address complexity of the systems. Next, author explains TOGAF (The Open Group Achitecture Framework) that has four categories:

  • Business architecture – business processes
  • Application architecture
  • Data architecture
  • Technical architecture – hardware / software infrastructure

TOGAF defines ADM (Architecture Development Method) as a recipe for creating architecture.  The author considers TOGAF as a process instead of framework that can complement Zachman. TOGAF defines following levels of enterprise continuum:

  • enterprise continuum
  • foundation architectures
  • common system architectures
  • industry architectures
  • organization architectures (ADM)

TOGAF defines knowledge bases such as TRM (Technical Reference Model) and SIB (Standards Information Base). The ADM defines following phases:

  • Phase: Prelim – framework and principles
  • Phase: A – architecture vision (statement of architecture work, architecture vision)
  • Phase: B – business architecture (input frame stakeholders to get baseline and business objectives)
  • Phase: C – information system architectures (baseline data architecture, review principles, models, data architecture)
  • Phase: D – technology architecture – infrastructure
  • Phase: E – opportunities and solutions
  • Phase: F – migration planning
  • Phase: G – implementation governance
  • Phase: H – architecture change management and then it goes back to Phase A.

TOGAF also lacks complexity management and the author then explains The Federal Enterprise Architecture (FEA) that includes reference models for business, service, components, technical and data. FEA organizes EA into segments of business functionality and enterprise services. FEA creates five reference models:

  • The Business Reference Model (BRM) – business view
  • The Component Reference Model (CRM)
  • The Technical Reference Model (TRM)
  • The Data Reference Model
  • The Performance Reference Model

In chapter two, the author explains how complexity affects system, e.g. a Rubik Cube of 2 x 2 x 2 dimensions has 8 interior cubes and 3.7 x 10^6 permutations but a Rubik Cube of 4 x 4 x 4 dimensions has 64 interior cubes and 7.4 x 10^45 permutations. The relative complexity of 4 x 4 x 4 dimensions Rubik Cube is much higher than Rubik Cube of 2 x 2 x 2 dimensions and the author argues that by partitioning 4 x 4 x 4 Rubik Cube into eight 2 x 2 x 2 Rubik Cube, you can lower its complexity. The author defines following five laws of partitions:

  • Partitions must be true partitions
  • Partition definitions must be appropriate (e.g. organizing clothing store by color may not be helpful to customers)
  • Partition subset numbers must be appropriate
  • Partition subset sizes must be roughly equal
  • Subset interactions must be minimal and well defined

Further, author suggests simplification to reduce complexity when partitioning by removing partition subsets along with their associated items and removal of other items from one or more partition subsets, leaving the subsets themselves in place. The process of partitioning can be done iteratively by choosing one of the partition subsets and simplifying it.  The author narrates story of Jon Boyd who came with the iterative process: observe, orient, plan, act (OOPA) when he was observing how pilots used aircrafts in dogfights at the Air Force. Also, he observed that faster you iterate on OOPA, better your chances of winning the dogfight.

In chapter three, the author shows how mathematics can be used for partitioning. He describes the number of system states as the best measure of complexity and relative complexity of two systems is ratio of the number of states in those systems. For example, a system with two variables, each taking six states can take 6^2 states. i.e,

C = S^v where C is the complexity, V is the number of variables and S is the number of significant states on average.

In business process, the number of paths and decision points within each path is the best measure of complexity, i.e.,

O = P^d where D is the number of decision points and P is the number of paths for each decision points and O is outcome.

The author introduces concept of homomorphism where observing one system make prediction on another system, e.g.   relationships between dice systems, software systems and business processes as homomorphic. A system with two six-sided dice has 36 possible states (P^d or 6^2). However, we can reduce number of states by dividing dices into multiple buckets, e.g. two dices with each bucket has 12 states instead of 36. The general formula for the number of states of B buckets with D dices and F faces per dice:

B * F^d

This chapter describes concept of equivalence relations with following properties:

  • E(a, a) – always true — reflexivity
  • E(a, b) implies E(b, a) — symmetry
  • E(a, b) and E(b, c) implies E(a, c) — transitivity

In chapter four, the author explains simple iterative partitions (SIP) to create a diagrammatic overview of the enterprise system that focus on what enterprise does (as opposed to how). The SIP starts with an autonomous business capability (ABC) that represents an equivalence class or one of the set that make up the partition. The ABC model includes process component and technology component including relationships for implementation and deployment. In addition to implementation and deployment, author adds ABC type that is used as a category of ABC such as human resources. These types can also be defined in hierarchical fashion and different implementations of same ABC types are considered siblings. The implementations can also be composed so that one ABC is part of another ABC. Another type of relationship is partner relationships at implementation or deployment levels where one ABC may create information request, information broadcast or work request.

In chapter five, author explains SIP process that has following goals:

  • Complexity control
  • Logic-based decisions
  • Value-driven deliverables
  • Reproducible results
  • Verifiable architectures
  • Flexible methodology

The SIP process consists of following six primary phases:

<------Preliminary------>   <--------------Preparatory------------------->   <------Iteration------>
Phase-0       Phase-1       Phase-2        Phase-3          Phase-4                 Phase-5
Evaluation    Preparation   Partitioning   Simplification   Prioritization          Iteration

The phase-0 (enterprise architecture evaluation) addresses following issue:

  • Unreliable enterprise information
  • Untimely enterprise information
  • New complex projects underway
  • New companies being acquired
  • Enterprise wants to spin off unit
  • Need to identify outsourcing opportunities
  • Regulatory requirements
  • Need to automate relationships with external partners
  • Need to automate relationships with customers
  • Poor relationships between IT and business units
  • Poor interoperability of IT systems
  • IT systems unmanageable

The phase-1 (SIP preparation) has following deliverables:

  • Audit of organizational readiness
  • Training
  • Governance model
  • SIP blend
  • Enterprise-specific tools

The phase-2 (partitioning) decomposes enterprise into ABC (discrete autonomous business capability) units. The phase-3 (partition simplification) defines five laws of partitions:

  • Partitions must be true partitions
  • Partition definitions must be appropriate
  • Partition numbers must be appropriate
  • Partition sizes must be roughly equal
  • Partition interactions must be minimal and well defined.

The phase-4 (ABC prioritization) uses value graph analysis to estimate potential payoff and risk. The value graph analysis addresses following factors:

  • Market drivers
  • Cost
  • Organizational risk
  • Technical risk
  • Financial value
  • Organizational preparedness
  • Team readiness
  • Status quo

The phase-5 (ABC iteration) uses iterative approach to simplify architecture.

The chapter six describes NPfit project as a case study in complexity. The NPfit promised integrated system connecting every patient, physician, laboratory, pharmacy and healthcare in the UK. Its infrastructure provided new national network, directory services, care records service (CRS). NPfit is split into five regional groups of patients and it allowed appointment to any facility, prescription fulfillment, and picture archiving. Despite huge budget of $9.8 billion dollars, there were several concerns such as failure to communicate, monolithic approach, stifling of innovation, lack of record confidentiality and quality of shared data. The SIP approach would have helped, e.g. phase-1 audits organizational readiness, training and partitioning. The phase-2 would have addressed complexity dropped multiple regional implementations. The phase-3 would have simplified partitions into subsets such as patient registration, appointment booking, prescriptions, patient records, and lab tests.

The chapter seven focuses on guarding boundaries in technical boundaries. For example two systems may communicate via RPC, shared databases or data access layer but it suggests service-oriented-architecture (SOA) for interoperability for better scalability. The author suggests use of guards or envoy entity for handling outgoing or incoming messages to the system. It defines following rules to encapsulate the software for a given ABC:

  • Autonomy
  • Explicit boundaries
  • Partitioning of functionality
  • Dependencies defined by policy
  • Asynchronicity
  • Partitioning of data
  • No cross-fortress transactions
  • Single-point security
  • Inside trust
  • Keep it simple

The chapter eight summarizes the book and it explains why complexity is the real enemy and how simplicity pays. It reiterates how SIP architecture can simplify architecture by partitioning system into ABC units.

August 17, 2014

PlexService Overview – a Micro-service framework for defining HTTP/Websockets and JMS based Services

Filed under: Uncategorized — admin @ 9:19 pm

I recently created a new framework PlexService for serving micro-services. which can be accessed by HTTP, Websockets or JMS interfaces. You can choose these different access mechanism by needs of your services. For example, as JMS services are inherently asynchronous, they provide good foundation for building scalable and reactive services. You may choose http stack for implementing REST services or choose websockets for implementing interactive services.

PlexService framework provides provides basic support for encoding POJO objects into JSON for service consumption. The developers define service configuration via annoations to specify gateway types, encoding scheme, end-points, etc.

PlexService provides support of role-based security, where you can specify list of roles who can access each service. The service providers implement how to verify roles, which are then enforced by PlexService framework.

If you implement all services in JMS, you can easily expose them via HTTP or Websockets by configuring web-to-jms bridge. The bridge routes all requests from HTTP/Websockets to JMS and listen for incoming messages, which are then routed back to web clients.

PlexService provides basic metrics such as latency, invocations, errors, etc., which are exposed via JMX interface. PlexService uses jetty for serving web services. The developers provide JMS containers at runtime if required.

Building/Installing

Checkout code using

 git clone git@github.com:bhatti/PlexService.git

Compile and build jar file using

 ./gradlew jar

Copy and add jar file manually in your application.

Defining role-based security

PlexService allows developers to define role-based security, which is invoked when accessing services, e.g.

 public class BuggerRoleAuthorizer implements RoleAuthorizer {
     private final UserRepository userRepository;
 
     public BuggerRoleAuthorizer(UserRepository userRepository) {
       this.userRepository = userRepository;
     }
 
     @Override
       public void authorize(Request request, String[] roles) throws AuthException {
         String sessionId = request.getSessionId();
         User user = userRepository.getUserBySessionId(sessionId);
         if (user == null) {
           throw new AuthException(Constants.SC_UNAUTHORIZED,
               request.getSessionId(), request.getRemoteAddress(),
               "failed to validate session-id");
         }
         for (String role : roles) {
           if (!user.getRoles().contains(role)) {
             throw new AuthException(Constants.SC_UNAUTHORIZED,
                 request.getSessionId(), request.getRemoteAddress(),
                 "failed to match role");
           }
         }
       }
 }

Typically, login-service will store session-id, which is then passed to the implementation of RoleAuthorizer, e.g.

 @ServiceConfig(gateway = GatewayType.HTTP, requestClass = Void.class, endpoint = "/login", method = Method.POST, codec = CodecType.JSON)
 public class LoginService extends AbstractUserService implements RequestHandler {
   public LoginService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
   public void handle(Request request) {
     String username = request.getStringProperty("username");
     String password = request.getStringProperty("password");
 
     User user = userRepository.authenticate(username, password);
     AbstractResponseBuilder responseBuilder = request.getResponseBuilder();
     if (user == null) {
       throw new AuthException(Constants.SC_UNAUTHORIZED,
               request.getSessionId(), request.getRemoteAddress(),
               "failed to authenticate");
     } else {
       responseBuilder.addSessionId(userRepository.getSessionId(user));
       responseBuilder.send(user);
     }   
   }
 }

In above example the session-id is added to response upon successful login, which is then passed for future requests. For http services, you may use cookies to store session-ids, otherwise you would need to pass session-id as a parameter.

Here is how you can invoke login-service from curl:

 
 curl --cookie-jar cookies.txt -v -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/login?username=erica&password=pass"

which would return:

 
 Content-Type: application/json
 Set-Cookie: PlexSessionID=5 Expires: Thu, 01 Jan 1970 00:00:00 GMT
 {"id":5,"username":"erica","email":"erica@plexobject.com","roles":["Employee"]}

Defining Services

Defining a REST service for creating a user

Here is how you can a REST service:

@ServiceConfig(gateway = GatewayType.HTTP, requestClass = User.class, 
     rolesAllowed = "Administrator", endpoint = "/users", method = Method.POST, 
     codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements
 RequestHandler {
   public CreateUserService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }

The ServiceConfig annotation defines that this service can be accessed via HTTP at “/users” URI. PlexService will provide encoding from JSON to User object and will ensure that service can be accessed by user who has Administrator role.

Here is how you can invoke this service from curl:

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/users" -d "{\"username\":\"david\",\"password\":\"pass\",\"email\":\"david@plexobject.com\",\"roles\":[\"Employee\"]}"

Defining a Web service over Websockets for creating a user

Here is how you can a Websocket based service:

 @ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = User.class, 
     rolesAllowed = "Administrator", endpoint = "/users", method = Method.POST, 
     codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements
 RequestHandler {
   public CreateUserService(UserRepository userRepository) {
     super(userRepository);
   }
 
   @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }

The ServiceConfig annotation defines that this service can be accessed via Websocketat “/users” endpoint. However, as opposed to HTTP based service, this endpoint is not enforced in HTTP request and can be in any format as long it’s unique for a service.

Here is how you can access websocket service from javascript:

 var ws = new WebSocket("ws://127.0.0.1:8181/users");
 ws.onopen = function() {
   var req = {"payload":"", "endpoint":"/login", "method":"POST", "username":"scott", "password":"pass"};
   ws.send(JSON.stringify(req));
 };
 
 ws.onmessage = function (evt) {
   alert("Message: " + evt.data);
 };
 
 ws.onclose = function() {
 };
 
 ws.onerror = function(err) {
 };

Note that websockets are not supported by all browsers and above code will work only supported browsers such as IE 11+, FF 31+, Chrome 36+, etc.

Defining a JMS service for creating a user

Here is how you can create JMS service:

 @ServiceConfig(gateway = GatewayType.JMS, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "queue:{scope}-create-user-service-queue", 
       method = Method.MESSAGE, 
       codec = CodecType.JSON)
 public class CreateUserService extends AbstractUserService implements RequestHandler {
     public CreateUserService(UserRepository userRepository) {
     super(userRepository);
     }
 
     @Override
     public void handle(Request request) {
       User user = request.getPayload();
       user.validate();
       User saved = userRepository.save(user);
       request.getResponseBuilder().send(saved);
     }
 }

Note that the only difference is type of gateway. PlexService also support variables in end-points, which are populated from configurations. For example, you may create scope variable to create different queues/topics for different developers/environments. PlexService will serialize POJO classes into JSON when delivering messages over JMS.

Defining a REST service with parameterized URLs

PlexService allows developers to define URIs for services, that contains variables. These variables are then populated actual requests. These can be used for implementing REST services, e.g.

 @ServiceConfig(gateway = GatewayType.HTTP, requestClass = BugReport.class, 
       rolesAllowed = "Employee", endpoint = "/projects/{projectId}/bugreports", 
       method = Method.POST, 
       codec = CodecType.JSON)
 public class CreateBugReportService extends AbstractBugReportService implements RequestHandler {
     public CreateBugReportService(BugReportRepository bugReportRepository,
         UserRepository userRepository) {
       super(bugReportRepository, userRepository);
     }
 
     @Override
       public void handle(Request request) {
         BugReport report = request.getPayload();
         report.validate();
         BugReport saved = bugReportRepository.save(report);
         request.getResponseBuilder().send(saved);
       }
 }

Here is an example of invoking this service from curl:

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" -X POST "http://127.0.0.1:8181/projects/2/bugreports" -d "{\"title\":\"As an administrator, I would like to assign roles to users so that they can perform required actions.\",\"description\":\"As an administrator, I would like to assign roles to users so that they can perform required actions.\",\"bugNumber\":\"story-201\",\"assignedTo\":\"mike\",\"developedBy\":\"mike\"}"
 

Using variables with Websocket based service

You can also create variables for websocket’s endpoints similar to JMS, which are initialized from parameters.

 @ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = BugReport.class, 
       rolesAllowed = "Employee", endpoint = "{variable}-create-bugreport-service-channel", 
       method = Method.MESSAGE, codec = CodecType.JSON)
 public class CreateBugReportService extends AbstractBugReportService implements
         RequestHandler {
     public CreateBugReportService(BugReportRepository bugReportRepository,
             UserRepository userRepository) {
         super(bugReportRepository, userRepository);
     }
 
     @Override
     public void handle(Request request) {
         BugReport report = request.getPayload();
         report.validate();
         BugReport saved = bugReportRepository.save(report);
         request.getResponseBuilder().send(saved);
     }
 
 }

Here is another example of consuming websocket based service from javascript:

 var ws = new WebSocket("ws://127.0.0.1:8181/users");
 ws.onopen = function() {
   var req = {"payload":{"title":"my title", "description":"my description","bugNumber":"story-201", "assignedTo":"mike", "developedBy":"mike"},"PlexSessionID":"4", "endpoint":"/projects/2/bugreports/2/assign", "method":"POST"};
   ws.send(JSON.stringify(req));
 };
 
 ws.onmessage = function (evt) {
   alert("Message: " + evt.data);
 };
 
 ws.onclose = function() {
 };
 
 ws.onerror = function(err) {
 };

Defining a REST service for querying users

Here is an example REST service, which uses GET request to query users:

   @ServiceConfig(gateway = GatewayType.HTTP, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "/users", method = Method.GET, 
       codec = CodecType.JSON)
   public class QueryUserService extends AbstractUserService implements
   RequestHandler {
     public QueryUserService(UserRepository userRepository) {
       super(userRepository);
     }
     @Override
       public void handle(Request request) {
         Collection<User> users = userRepository.getAll(new Predicate<User>() {
             @Override
             public boolean accept(User u) {
             return true;
             }
             });
         request.getResponseBuilder().send(users);
       }
   }

Here is how you can invoke this service from curl

 
 curl --cookie cookies.txt -k -H "Content-Type: application/json" "http://127.0.0.1:8181/users"   

which would return json array such as:

 
 [{"id":2,"username":"alex","email":"alex@plexobject.com","roles":["Employee"]},{"id":3,"username":"jeff","email":"jeff@plexobject.com","roles":["Employee","Manager"]},{"id":4,"username":"scott","email":"scott@plexobject.com","roles":["Employee","Administrator","Manager"]},{"id":5,"username":"erica","email":"erica@plexobject.com","roles":["Employee"]}]

Defining a JMS service for querying users

Here is an example of defining query users via JMS service:

 @ServiceConfig(gateway = GatewayType.JMS, requestClass = User.class, 
       rolesAllowed = "Administrator", endpoint = "queue:{scope}-query-user-service-queue", 
       method = Method.MESSAGE, 
       codec = CodecType.JSON)
 public class QueryUserService extends AbstractUserService implements RequestHandler {
     public QueryUserService(UserRepository userRepository) {
       super(userRepository);
     }
     @Override
       public void handle(Request request) {
         Collection<User> users = userRepository.getAll(new Predicate<User>() {
             @Override
             public boolean accept(User u) {
             return true;
             }
             });
         request.getResponseBuilder().send(users);
       }
 }

The end-point can contain variables such as scope that are initialized from configuration.

Registering services and starting service container

You will need to register services with ServiceRegistry at runtime, which would initialize and start those services, e.g.

 Collection<RequestHandler> services = new HashSet<>();
 services.add(new CreateUserService(userRepository));
 services.add(new UpdateUserService(userRepository));
 services.add(new QueryUserService(userRepository));
 services.add(new DeleteUserService(userRepository));
 services.add(new LoginService(userRepository));
 services.add(new CreateProjectService(projectRepository, userRepository));
 services.add(new UpdateProjectService(projectRepository, userRepository));
 services.add(new QueryProjectService(projectRepository, userRepository));
 services.add(new AddProjectMemberService(projectRepository, userRepository));
 services.add(new RemoveProjectMemberService(projectRepository, userRepository));
 services.add(new CreateBugReportService(bugreportRepository, userRepository));
 services.add(new UpdateBugReportService(bugreportRepository, userRepository));
 services.add(new QueryBugReportService(bugreportRepository, userRepository));
 services.add(new QueryProjectBugReportService(bugreportRepository, userRepository));
 
 services.add(new AssignBugReportService(bugreportRepository, userRepository));
 serviceRegistry = new ServiceRegistry(config, services, new BuggerRoleAuthorizer(userRepository));
 serviceRegistry.start();
 

Creating Http to JMS bridge

You may choose to write all services as JMS and then expose them via HTTP using bridge provided by PlexService, e.g.

   final String mappingJson = IOUtils.toString(new FileInputStream( args[1]));
 Collection<HttpToJmsEntry> entries = new JsonObjectCodec().decode(
     mappingJson, new TypeReference<List<HttpToJmsEntry>>() {
     });
 WebToJmsBridge bridge = new WebToJmsBridge(new Configuration(args[0]), entries, GatewayType.HTTP);
 bridge.startBridge();

Creating Websocket to JMS bridge

Similarly, you may expose JMS services via websockets based transport using the bridge:

 
   final String mappingJson = IOUtils.toString(new FileInputStream( args[1]));
 Collection<HttpToJmsEntry> entries = new JsonObjectCodec().decode(
     mappingJson, new TypeReference<List<HttpToJmsEntry>>() {
     });
 WebToJmsBridge bridge = new WebToJmsBridge(new Configuration(args[0]), entries, GatewayType.WEBSOCKET);
 bridge.startBridge();

Here is JSON configuration for bridge:

[
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports/{id}/assign","method":"POST",
     "destination":"queue:{scope}-assign-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports","method":"GET",
     "destination":"queue:{scope}-query-project-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users","method":"GET",
     "destination":"queue:{scope}-query-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects","method":"GET",
     "destination":"queue:{scope}-query-projects-service","timeoutSecs":30},
   {"codecType":"JSON","path":"/bugreports","method":"GET",
     "destination":"queue:{scope}-bugreports-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}/membership/add","method":"POST",
     "destination":"queue:{scope}-add-project-member-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}/membership/remove","method":"POST",
     "destination":"queue:{scope}-remove-project-member-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports","method":"POST",
     "destination":"queue:{scope}-create-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users","method":"POST",
     "destination":"queue:{scope}-create-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects","method":"POST",
     "destination":"queue:{scope}-create-projects-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users/{id}","method":"POST",
     "destination":"queue:{scope}-update-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/users/{id}/delete","method":"POST",
     "destination":"queue:{scope}-delete-user-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{id}","method":"POST",
     "destination":"queue:{scope}-update-project-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/projects/{projectId}/bugreports/{id}","method":"POST",
     "destination":"queue:{scope}-update-bugreport-service-queue","timeoutSecs":30},
   {"codecType":"JSON","path":"/login","method":"POST",
     "destination":"queue:{scope}-login-service-queue","timeoutSecs":30}]

Defining a Streaming Quotes Service over Websockets

Suppose you are building a high performance streaming quote service for providing real-time stock quotes, you can easily build it using PlexService framework, e.g.

@ServiceConfig(gateway = GatewayType.WEBSOCKET, requestClass = Void.class, endpoint = "/quotes", method = Method.MESSAGE, codec = CodecType.JSON)
 public class QuoteServer implements RequestHandler {
     public enum Action {
         SUBSCRIBE, UNSUBSCRIBE
     }
 
     static final Logger log = LoggerFactory.getLogger(QuoteServer.class);
 
     private QuoteStreamer quoteStreamer = new QuoteStreamer();
 
     @Override
     public void handle(Request request) {
         String symbol = request.getProperty("symbol");
         String actionVal = request.getProperty("action");
         log.info("Received " + request);
         ValidationException
                 .builder()
                 .assertNonNull(symbol, "undefined_symbol", "symbol",
                         "symbol not specified")
                 .assertNonNull(actionVal, "undefined_action", "action",
                         "action not specified").end();
         Action action = Action.valueOf(actionVal.toUpperCase());
         if (action == Action.SUBSCRIBE) {
             quoteStreamer.add(symbol, request.getResponseBuilder());
         } else {
             quoteStreamer.remove(symbol, request.getResponseBuilder());
         }
     }
 
     public static void main(String[] args) throws Exception {
         Configuration config = new Configuration(args[0]);
         QuoteServer service = new QuoteServer();
         Collection<RequestHandler> services = new ArrayList<>();
         services.add(new QuoteServer());
         //
         ServiceRegistry serviceRegistry = new ServiceRegistry(config, services, null);
         serviceRegistry.start();
         Thread.currentThread().join();
     }
 }

Above example defines a service that listen to websockets and responds to subscribe or unsubscribe requests from web clients.

You can define mock QuoteStreamer as follows, which periodically sends quotes to all subscribers:

public class QuoteStreamer extends TimerTask {
     private int delay = 1000;
     private Map<String, Collection<ResponseDispatcher>> subscribers = new ConcurrentHashMap<>();
     private QuoteCache quoteCache = new QuoteCache();
     private final Timer timer = new Timer(true);
 
     public QuoteStreamer() {
         timer.schedule(this, delay, delay);
     }
 
     public void add(String symbol, ResponseDispatcher dispatcher) {
         symbol = symbol.toUpperCase();
         synchronized (symbol.intern()) {
             Collection<ResponseDispatcher> dispatchers = subscribers
                     .get(symbol);
             if (dispatchers == null) {
                 dispatchers = new HashSet<ResponseDispatcher>();
                 subscribers.put(symbol, dispatchers);
             }
             dispatchers.add(dispatcher);
         }
     }
 
     public void remove(String symbol, ResponseDispatcher dispatcher) {
         symbol = symbol.toUpperCase();
         synchronized (symbol.intern()) {
             Collection<ResponseDispatcher> dispatchers = subscribers
                     .get(symbol);
             if (dispatchers != null) {
                 dispatchers.remove(dispatcher);
             }
         }
     }
 
     @Override
     public void run() {
         for (Map.Entry<String, Collection<ResponseDispatcher>> e : subscribers
                 .entrySet()) {
             Quote q = quoteCache.getLatestQuote(e.getKey());
             Collection<ResponseDispatcher> dispatchers = new ArrayList<>(
                     e.getValue());
             for (ResponseDispatcher d : dispatchers) {
                 try {
                     d.send(q);
                 } catch (Exception ex) {
                     remove(e.getKey(), d);
                 }
             }
         }
     }
 }

Here is a sample javascript/html client, which allows users to subscribe to different stock symbols:

       var ws = new WebSocket("ws://127.0.0.1:8181/quotes");
       ws.onopen = function() {
       };
       var lasts = {};
       ws.onmessage = function (evt) {
         //console.log(evt.data);
         var quote = JSON.parse(evt.data).payload;
         var d = new Date(quote.timestamp);
         $('#time').text(d.toString());
         $('#company').text(quote.company);
         $('#last').text(quote.last.toFixed(2));
         var prev = lasts[quote.company];
         if (prev != undefined) {
           var change = quote.last - prev;
           if (change >= 0) {
             $('#change').css({'background-color':'green'});
           } else {
             $('#change').css({'background-color':'red'});
           }
           $('#change').text(change.toFixed(2));
         } else {
           $('#change').text('N/A');
         }
         lasts[quote.company] = quote.last;
       };
 
       ws.onclose = function() {
       };
 
       ws.onerror = function(err) {
       };
       function send(payload) {
         $('#input').text(payload);
         ws.send(payload);
       }
       $(document).ready(function() {
         $("#subscribe").click(function() {
           var symbol = $("#symbol").val();
           var req = {"endpoint":"/quotes", "symbol":symbol, "action":"subscribe"};
           send(JSON.stringify(req));
         });
       });
       $(document).ready(function() {
         $("#unsubscribe").click(function() {
           var symbol = $("#symbol").val();                                                                                            
           var req = {"endpoint":"/quotes", "symbol":symbol, "action":"unsubscribe"};
           send(JSON.stringify(req));
         });
       });
    <script>
 
   <body>
     <form>
       Symbol:<input type="text" id="symbol" value="AAPL" size="4" />
       <input type="button" id="subscribe" value="Subscribe"/>
       <input type="button" id="unsubscribe" value="Unsubscribe"/>
     </form>
 
     <br>
 
     <table id="quotes" class="quote" width="600" border="2" cellpadding="0" cellspacing="3">
       <thead>
         <tr>
           <th>Time</th>
           <th>Company</th>
           <th>Last</th>
           <th>Change</th>
         </tr>
       </thead>
       <tbody>
         <tr>
           <td id="time"></td>
           <td id="company"></td>
           <td id="last"></td>
           <td id="change"></td>
         </tr>
       </tbody>
     </table>
   </body>

PlexService includes this sample code, where you can start streaming quote server by running “quote.sh” command and then open quote.html file in your browser.

Using JMX

PlexService uses JMX to expose key metrics and lifecycle methods to start or stop services. You can use jconsole to access the JMX controls, e.g.

 
 jconsole localhost:9191

Summary

PlexService comes a full-fledged sample application under plexsvc-sample folder and you browse JavaDocs to view APIs.

 

Powered by WordPress