Shahzad Bhatti Welcome to my ramblings and rants!

October 10, 2022

Implementing Distributed Locks (Mutex and Semaphore) with Databases

Filed under: Concurrency,Rust,Uncategorized — admin @ 10:55 pm

Overview

I recently needed a way to control access to shared resources in a distributed system for concurrency control. Though, you may use Consul or Zookeeper (or low-level Raft / Paxos implementations if you are brave) for managing distributed locks but I wanted to reuse existing database without adding another dependency to my tech stack. Most databases support transactions or conditional updates with varying degree of support for transactional guarantees, but they can’t be used for distributed locks if the business logic you need to protect resides outside the databases. I found a lock client library based on AWS Databases but it didn’t support semaphores. The library implementation was tightly coupled with concerns of lock management and database access and it wasn’t easy to extend it easily. For example, following diagram shows how cyclic dependencies in the code:

class diagram

Due to above deficiencies in existing solutions, I decided to implement my own implementation of distributed locks in Rust with following capabilities:

  • Allow creating lease based locks that can either protect a single shared resource with a Mutex lock or protect a finite set of shared resources with a Semaphore lock.
  • Allow renewing leases based on periodic intervals so that stale locks can be acquired by other users.
  • Allow releasing Mutex and semaphore locks explicitly after the user performs a critical action.
  • CRUD APIs to manage Mutex and Semaphore entities in the database.
  • Multi-tenancy support for different clients in case the database is shared by multiple users.
  • Fair locks to support first-come and first serve based access grant when acquiring same lock concurrently.
  • Scalable solution for supporting tens of thousands concurrent mutexes and semaphores.
  • Support multiple data stores such as relational databases such as MySQL, PostgreSQL, Sqlite and as well as NoSQL/Cache data stores such as AWS Dynamo DB and Redis.

High-level Design

I chose Rust to build the library for managing distributed locks due to strict performance and correctness requirements. Following diagram shows the high-level components in the new library:

LockManager Interface

The client interacts with the LockManager that defines following operations to acquire, release, renew lock leases and manage lifecycle of Mutexes and Semaphores:

#[async_trait]
pub trait LockManager {
    // Attempts to acquire a lock until it either acquires the lock, or a specified additional_time_to_wait_for_lock_ms is
    // reached. This method will poll database based on the refresh_period. If it does not see the lock in database, it
    // will immediately return the lock to the caller. If it does see the lock, it will note the lease expiration on the lock. If
    // the lock is deemed stale, (that is, there is no heartbeat on it for at least the length of its lease duration) then this
    // will acquire and return it. Otherwise, if it waits for as long as additional_time_to_wait_for_lock_ms without acquiring the
    // lock, then it will return LockError::NotGranted.
    //
    async fn acquire_lock(&self, opts: &AcquireLockOptions) -> LockResult<MutexLock>;

    // Releases the given lock if the current user still has it, returning true if the lock was
    // successfully released, and false if someone else already stole the lock. Deletes the
    // lock item if it is released and delete_lock_item_on_close is set.
    async fn release_lock(&self, opts: &ReleaseLockOptions) -> LockResult<bool>;

    // Sends a heartbeat to indicate that the given lock is still being worked on.
    // This method will also set the lease duration of the lock to the given value.
    // This will also either update or delete the data from the lock, as specified in the options
    async fn send_heartbeat(&self, opts: &SendHeartbeatOptions) -> LockResult<MutexLock>;

    // Creates mutex if doesn't exist
    async fn create_mutex(&self, mutex: &MutexLock) -> LockResult<usize>;

    // Deletes mutex lock if not locked
    async fn delete_mutex(&self,
                          other_key: &str,
                          other_version: &str,
                          other_semaphore_key: Option<String>) -> LockResult<usize>;

    // Finds out who owns the given lock, but does not acquire the lock. It returns the metadata currently associated with the
    // given lock. If the client currently has the lock, it will return the lock, and operations such as release_lock will work.
    // However, if the client does not have the lock, then operations like releaseLock will not work (after calling get_lock, the
    // caller should check mutex.expired() to figure out if it currently has the lock.)
    async fn get_mutex(&self, mutex_key: &str) -> LockResult<MutexLock>;

    // Creates or updates semaphore with given max size
    async fn create_semaphore(&self, semaphore: &Semaphore) -> LockResult<usize>;

    // Returns semaphore for the key
    async fn get_semaphore(&self, semaphore_key: &str) -> LockResult<Semaphore>;

    // find locks by semaphore
    async fn get_semaphore_mutexes(&self,
                                   other_semaphore_key: &str,
    ) -> LockResult<Vec<MutexLock>>;

    // Deletes semaphore if all associated locks are not locked
    async fn delete_semaphore(&self,
                              other_key: &str,
                              other_version: &str,
    ) -> LockResult<usize>;
}

The LockManager interacts with LockStore to access mutexes and semaphores, which delegate to implementation of mutex and semaphore repositories for lock management. The library defines two implementation of LockStore: first, DefaultLockStore that supports mutexes and semaphores where mutexes are used to acquire a singular lock whereas semaphores are used to acquire a lock from a set of finite shared resources. The second, FairLockStore uses a Redis specific implementation of fair semaphores for managing lease based semaphores that support first-come and first-serve order. The LockManager supports waiting for the lock to be available if lock is not immediately available where it periodically checks for the availability of mutex or semaphore based lock. Due to this periodic polling, the fair semaphore algorithm won’t support FIFO order if a new client requests a lock while previous lock request is waiting for next polling interval.

Create Lock Manager

You can instantiate a Lock Manager with relational database store as follows:

let config = LocksConfig::new("test_tenant");
let mutex_repo = factory::build_mutex_repository(RepositoryProvider::Rdb, &config)
	.await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Rdb, &config)
	.await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry()).expect("failed to initialize lock manager");

Alternatively, you can choose AWS Dynamo DB as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Ddb, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Or Redis based data-store as follows:

let mutex_repo = factory::build_mutex_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build mutex repository");
let semaphore_repo = factory::build_semaphore_repository(
  	RepositoryProvider::Redis, &config).await.expect("failed to build semaphore repository");
let store = Box::new(DefaultLockStore::new(&config, mutex_repo, semaphore_repo));

Note: The AWS Dynamo DB uses strongly consistent reads feature as by default it is eventually consistent reads.

Acquiring a Mutex Lock

You will need to build options for acquiring with key name and lease period in milliseconds and then acquire it:

let opts = AcquireLockOptionsBuilder::new("mylock")
	.with_lease_duration_secs(10).build();
let lock = lock_manager.acquire_lock(&opts)
	.expect("should acquire lock");

The acquire_lock operation will automatically create mutex lock if it doesn’t exist otherwise it will wait for the period of lease-time if the lock is not available. This will return a structure for mutex lock that includes:

{
  "mutex_key":"one",
  "tenant_id":"local-host-name",
  "version":"258d513e-bae4-4d91-8608-5d500be27593",
  "lease_duration_ms":15000,
  "locked":true,
  "expires_at":"2022-10-11T03:04:43.126542"
}

Renewing the lease of Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .with_lease_duration_secs(15)
                .build();

let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one", "258d513e-bae4-4d91-8608-5d500be27593")
                .build();
lock_manager.release_lock(&release_opts)
				.expect("should release lock");

Acquiring a Semaphore based Lock

The semaphores allow you to define a set of locks for a resource with a maximum size. The operation for acquiring semaphore is similar to acquiring regular lock except you specify semaphore size, e.g.:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
				.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key":"one_0000000000",
  "tenant_id":"local-host-name",
  "version":"5ad557df-dbe6-439d-8a31-dc367e32eab9",
  "lease_duration_ms":15000,
  "semaphore_key":"one",
  "locked":true,
  "expires_at":"2022-10-11T04:03:33.662484"
}

The semaphore lock will create mutexes internally that will be numbered from 0 to max-size (exclusive). You can get semaphore details using:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "4ff77432-ed84-48b5-9831-8e53f56c2620",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": false,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  {
    "mutex_key": "one_0000000000",
    "tenant_id": "local-host-name",
    "version": "ba5a62e5-80f1-474e-a895-c4a18d252cb9",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "749b4ded-e356-4ef5-a23b-73a4984130c8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Renewing the lease of Semaphore Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

let opts = SendHeartbeatOptionsBuilder::new(
  				"one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new("one_0000000000", "749b4ded-e356-4ef5-a23b-73a4984130c8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Acquiring a Fair Semaphore

The fair semaphores is only available for Redis due to internal implementation, and it requires enabling it via fair_semaphore configuration option, otherwise its usage is similar to above operations, e.g.:

let mut config = LocksConfig::new("test_tenant");
config.fair_semaphore = Some(fair_semaphore);

let fair_semaphore_repo = factory::build_fair_semaphore_repository(
  	RepositoryProvider::Redis, &config)
	.await.expect("failed to create fair semaphore");
let store = Box::new(FairLockStore::new(&config, fair_semaphore_repo));
let locks_manager = LockManagerImpl::new(
  	&config, store, &default_registry())
	.expect("failed to initialize lock manager");

Then acquire lock similar to the semaphore syntax as before:

let opts = AcquireLockOptionsBuilder::new("my_pool")
                    .with_lease_duration_secs(15)
                    .with_semaphore_max_size(10)
                    .build();
let lock = lock_manager.acquire_lock(&opts)
			.expect("should acquire semaphore lock");

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

{
  "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "tenant_id": "local-host-name",
  "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

The fair semaphore lock does not use mutexes internally but for the API compatibility, it builds a mutex with a key based on combination of semaphore-key and version. You can then query semaphore state as follows:

let semaphore = locks_manager.get_semaphore("one").await
                .expect("failed to find semaphore");

That would return:

{
  "semaphore_key": "one",
  "tenant_id": "local-host-name",
  "version": "5779b01f-eaea-4043-8ae0-9f8b942c2727",
  "max_size": 10,
  "lease_duration_ms": 15000,
  "busy_count": 1,
  "fair_semaphore": true,
}

Or, fetch state of all mutexes associated with the semaphore using:

let mutexes = locks_manager.get_semaphore_mutexes("one").await
                .expect("failed to find semaphore mutexes");

Which would return:

  [
  {
    "mutex_key": "one_0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "tenant_id": "local-host-name",
    "version": "0fec9a7b-4354-4712-b537-ac14213bc5e8",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": true,
    "expires_at": "2022-10-11T04:41:43.845711",
  },
  {
    "mutex_key": "one_0000000001",
    "tenant_id": "local-host-name",
    "version": "",
    "lease_duration_ms": 15000,
    "semaphore_key": "one",
    "locked": false,
  },
  ...

Note: The mutex_key will be slightly different for unlocked mutexes as mutex-key isn’t needed for internal implementation.

Renewing the lease of Fair Semaphore Lock

You can renew lease of fair semaphore similar to above semaphore syntax, e.g.:

let opts = SendHeartbeatOptionsBuilder::new(
  			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_lease_duration_secs(15)
                .with_opt_semaphore_key("one")
                .build();
let updated_lock = lock_manager.send_heartbeat(&opts)
					.expect("should renew lock");

Note: Due to internal implementation of fair semaphore, the version won’t be changed upon lease renewal.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

let opts = ReleaseLockOptionsBuilder::new(
    			"one_0fec9a7b-4354-4712-b537-ac14213bc5e8", "0fec9a7b-4354-4712-b537-ac14213bc5e8")
                .with_opt_semaphore_key("one")
                .build();

lock_manager.release_lock(&opts)
				.expect("should release lock");

Command Line Interface

In addition to a Rust based interface, the distributed locks library also provides a command line interface for managing mutex and semaphore based locks, e.g.:

Mutexes and Semaphores based Distributed Locks with databases.

Usage: db-locks [OPTIONS] [PROVIDER] <COMMAND>

Commands:
  acquire

  heartbeat

  release

  get-mutex

  delete-mutex

  create-mutex

  create-semaphore

  get-semaphore

  delete-semaphore

  get-semaphore-mutexes

  help
          Print this message or the help of the given subcommand(s)

Arguments:
  [PROVIDER]
          Database provider [default: rdb] [possible values: rdb, ddb, redis]

Options:
  -t, --tenant <TENANT>
          tentant-id for the database [default: local-host-name]
  -f, --fair-semaphore <FAIR_SEMAPHORE>
          fair semaphore lock [default: false] [possible values: true, false]
  -j, --json-output <JSON_OUTPUT>
          json output of result from action [default: false] [possible values: true, false]
  -c, --config <FILE>
          Sets a custom config file
  -h, --help
          Print help information
  -V, --version
          Print version information

For example, you can acquire fair semaphore lock as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis acquire --key one --semaphore-max-size 10

Which would return:

{
  "mutex_key": "one_69816448-7080-40f3-8416-ede1b0d90e80",
  "tenant_id": "local-host-name",
  "version": "69816448-7080-40f3-8416-ede1b0d90e80",
  "lease_duration_ms": 15000,
  "semaphore_key": "one",
  "locked": true,
}

You can run following command for renewing above lock:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis heartbeat --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

And then release it as follows:

% REDIS_URL=redis://192.168.1.102 cargo run --  --fair-semaphore true --json-output true redis release --key one_69816448-7080-40f3-8416-ede1b0d90e80 --semaphore-key one --version 69816448-7080-40f3-8416-ede1b0d90e80

Summary

I was able to meet the initial goals for implementing distributed locks and though this library is early in development. You can download and try it from https://github.com/bhatti/db-locks. Feel free to send your feedback or contribute to this library.

May 12, 2022

Applying Laws of Scalability to Technology and People

As businesses grow with larger customers size and hire more employees, they face challenges to meet the customer demands in terms of scaling their systems and maintaining rapid product development with bigger teams. The businesses aim to scale systems linearly with additional computing and human resources. However, systems architecture such as monolithic or ball of mud makes scaling systems linearly onerous. Similarly, teams become less efficient as they grow their size and become silos. A general solution to solve scaling business or technical problems is to use divide & conquer and partition it into multiple sub-problems. A number of factors affect scalability of software architecture and organizations such as the interactions among system components or communication between teams. For example, the coordination, communication and data/knowledge coherence among the system components and teams become disproportionately expensive with the growth in size. The software systems and business management have developed a number of laws and principles that can used to evaluate constraints and trade offs related to the scalability challenges. Following is a list of a few laws from the technology and business domain for scaling software architectures and business organizations:

Amdhal’s Law

Amdahl’s Law is named after Gene Amdahl that is used to predict speed up of a task execution time when it’s scaled to run on multiple processors. It simply states that the maximum speed up will be limited by the serial fraction of the task execution as it will create resource contention:

Speed up (P, N) = 1 / [ (1 - P) + P / N ]

Where P is the fraction of task that can run in parallel on N processors. When N becomes large, P / N approaches 0 so speed up is restricted to 1 / (1 – P) where the serial fraction (1 – P) becomes a source of contention due to data coherence, state synchronization, memory access, I/O or other shared resources.

Amdahl’s law can also be described in terms of throughput using:

N / [ 1 + a (N - 1) ]

Where a is the serial fraction between 0 and 1. In parallel computing, a class of problems known as embarrassingly parallel workload where the parallel tasks have a little or no dependency among tasks so their value for a will be 0 because they don’t require any inter-task communication overhead.

Amdah’s law can be used to scale teams as an organization grows where the teams can be organized as small and cross-functional groups to parallelize the feature work for different product lines or business domains, however the maximum speed up will still be limited by the serial fraction of the work. The serial work can be: build and deployment pipelines; reviewing and merging changes; communication and coordination between teams; and dependencies for deliverables from other teams. Fred Brooks described in his book The Mythical Man-Month how adding people to a highly divisible task can reduce overall task duration but other tasks are not so easily divisible: while it takes one woman nine months to make one baby, “nine women can’t make a baby in one month”.

The theoretical speedup of the latency of the execution of a program according to Amdahl’s law (credit wikipedia).

Brooks’s Law

Brooks’s law was coined by Fred Brooks that states that adding manpower to a late software project makes it later due to ramp up time. As the size of team increases, the ramp up time for new employees also increases due to quadratic communication overhead among team members, e.g.

Number of communication channels = N x (N - 1) / 2

The organizations can build small teams such as two-pizza/single-threaded teams where communication channels within each team does not explode and the cross-functional nature of the teams require less communication and dependencies from other teams. The Brook’s law can be equally applied to technology when designing distributed services or components so that each service is designed as a loosely coupled module around a business domain to minimize communication with other services and services only communicate using a well designed interfaces.

Universal Scalability Law

The Universal Scalability Law is used for capacity planning and was derived from Amdahl’s law by Dr. Neil Gunther. It describes relative capacity in terms of concurrency, contention and coherency:

C(N) = N / [1 + a(N – 1) + B.N (N – 1) ]

Where C(N) is the relative capacity, a is the serial fraction between 0 and 1 due to resource contention and B is delay for data coherency or consistency. As data coherency (B) is quadratic in N so it becomes more expensive as size of N increases, e.g. using a consensus algorithm such as Paxos is impractical to reach state consistency among large set of servers because it requires additional communication between all servers. Instead, large scale distributed storage services generally use sharding/partitioning and gossip protocol with a leader-based consensus algorithm to minimize peer to peer communication.

The Universal Scalability Law can be applied to scale teams similar to Amdahl’s law where a is modeled for serial work or dependency between teams and B is modeled for communication and consistent understanding among the team members. The cost of B can be minimized by building cross-functional small teams so that teams can make progress independently. You can also apply this model for any decision making progress by keeping the size of stake holders or decision makers small so that they can easily reach the agreement without grinding to halt.

The gossip protocols also applies to people and it can be used along with a writing culture, lunch & learn and osmotic communication to spread knowledge and learnings from one team to other teams.

Little’s Law

Little’s Law was developed by John Little to predict number of items in a queue for stable stable and non-preemptive. It is part of queueing theory and is described mathematically as:

L = A W

Where L is the average number of items within the system or queue, A is the average arrival time of items and W is the average time an item spends in the system. The Little’s law and queuing theory can be used for capacity planning for computing servers and minimizing waiting time in the queue (L).

The Little’s law can be applied for predicting task completion rate in an agile process where L represents work-in-progress (WIP) for a sprint; A represents arrival and departure rate or throughput/capacity of tasks; W represents lead-time or an average amount of time in the system.

WIP = Throughput x Lead-Time

Lead-Time = WIP / Throughput

You can use this relationship to reduce the work in progress or lead time and improve throughput of tasks completion. Little’s law observes that you can accomplish more by keeping work-in-progress or inventory small. You will be able to better respond to unpredictable delays if you keep a buffer in your capacity and avoid 100% utilization.

King’s formula

The King’s formula expands Little’s law by adding utilization and variability for predicting wait time before serving of requests:

{\displaystyle \mathbb {E} (W_{q})\approx \left({\frac {\rho }{1-\rho }}\right)\left({\frac {c_{a}^{2}+c_{s}^{2}}{2}}\right)\tau }
(credit wikipedia)

where T is the mean service time, m (1/T) is the service rate, A is the mean arrival rate, p = A/m is the utilization, ca is the coefficient of variation for arrivals and cs is the coefficient of variation for service times. The King’s formula shows that the queue sizes increases to infinity as you reach 100% utilization and you will have longer queues with greater variability of work. These insights can be applied to both technical and business processes so that you can build systems with a greater predictability of processing time, smaller wait time E(W) and higher throughput ?.

Note: See Erlang analysis for serving requests in a system without a queue where new requests are blocked or rejected if there is not sufficient capacity in the system.

Gustafson’s Law

Gustafson’s law improves Amdahl’s law with a keen observation that parallel computing enables solving larger problems by computations on very large data sets in a fixed amount of time. It is defined as:

S = s + p x N

S = (1 – s) x N

S = N + (1 – N) x s

where S is the theoretical speed up with parallelism, N is the number of processors, s is the serial fraction and p is the parallel part such that s + p = 1.

Gustafson’s law shows that limitations imposed by the sequential fraction of a program may be countered by increasing the total amount of computation. This allows solving bigger technical and business problems with a greater computing and human resources.

Conway’s Law

Conway’s law states that an organization that designs a system will produce a design whose structure is a copy of the organization’s communication structure. It means that the architecture of a system is derived from the team structures of an organization, however you can also use the architecture to derive the team structures. This allows defining building teams along the architecture boundaries so that each team is a small, cross functional and cohesive. A study by the Harvard Business School found that the often large co-located teams tended to produce more tightly-coupled and monolithic codebases whereas small distributed teams produce more modular codebases. These lessons can be applied to scaling teams and architecture so that teams and system modules are built around organizational boundaries and independent concerns to promote autonomy and reduce tight coupling.

Pareto Principle

The Pareto principle states that for many outcomes, roughly 80% of consequences come from 20% of causes. This principle shows up in numerous technical and business problems such as 20% of code has the 80% of errors; customers use 20% of functionality 80% of the time; 80% of optimization improvements comes from 20% of the effort, etc. It can also be used to identify hotspots or critical paths when scaling, as some microservices or teams may receive disproportionate demands. Though, scaling computing resources is relatively easy but scaling a team beyond an organization boundary is hard. You will have to apply other management tools such as prioritization, planning, metrics, automation and better communication to manage critical work.

Metcalfe’s Law

The Metcalfe’s law states that if there are N users of a telecommunications network, the value of the network is N2. It’s also referred as Network effects and applies to social networking sites.

Number of possible pair connections = N * (N – 1) / 2

Reed’s Law expanded this law and observed that the utility of large networks can scale exponentially with the size of the network.

Number of possible subgroups of a network = 2N – N – 1

This law explains the popularity of social networking services via viral communication. These laws can be applied to model information flow between teams or message exchange between services to avoid peer to peer communication with extremely large group of people or a set of nodes. A common alternative is to use a gossip protocol or designate a partition leader for each group that communicates with other leaders and then disseminate information to the group internally.

Dunbar Number

The Dunbar’s number is a suggested cognitive limit to the number of people with whom one can maintain stable social relationships. It has a commonly used value of 150 and can be used to limit direct communication connections within an organization.

Wirth’s Law and Parkinson’s Law

The Wirth’s Law is named after Niklaus Wirth who observed that the software is getting slower more rapidly than hardware is becoming faster. Over the last few decades, processors have become exponentially faster as a Moor’s Law but often that gain allows software developers to develop more complex software that consumes all gains of the speed. Another factor is that it allows software developers to use languages and tools that may not generate more efficient code so the code becomes bloated. There is a similar law in software development called Parkinson’s law that work expands to fill the time available for it. Though, you also have to watch for Hofstadter’s Law that states that “it always takes longer than you expect, even when you take into account Hofstadter’s Law”; and Brook’s Law, which states that “adding manpower to a late software project makes it later.”

The Wirth’s Law, named after Niklaus Wirth, posits that software tends to become slower at a rate that outpaces the speed at which hardware becomes faster. This observation reflects a trend where, despite significant advancements in processor speeds as predicted by Moor’s Law , software complexity increases correspondingly. Developers often leverage these hardware improvements to create more intricate and feature-rich software, which can negate the hardware gains. Additionally, the use of programming languages and tools that do not prioritize efficiency can lead to bloated code.

In the realm of software development, there are similar principles, such as Parkinson’s law, which suggests that work expands to fill the time allotted for its completion. This implies that given more time, software projects may become more complex or extended than initially necessary. Moreover, Hofstadter’s Law offers a cautionary perspective, stating, “It always takes longer than you expect, even when you take into account Hofstadter’s Law.” This highlights the often-unexpected delays in software development timelines. Brook’s Law further adds to these insights with the adage, “Adding manpower to a late software project makes it later.” These laws collectively emphasize that the demand upon a resource tends to expand to match the supply of the resource but adding resources later also poses challenges due to complexity in software development and project management.

Dunbar Number

The Dunbar’s number is a suggested cognitive limit to the number of people with whom one can maintain stable social relationships. It has a commonly used value of 150 and can be used to limit direct communication connections within an organization.

Summary

Above laws shows how you can partition tightly coupled architecture and large teams into modular architecture and small autonomous teams. For example, Amdahl’s and Universal Scalability laws demonstrate that you have to account for the cost of serial work, coordination and communication between partitions as you parallelize the problem because they become bottleneck as you scale. Brook’s and Metcalfe’s laws indicate that you will need to manage the number of communication paths among modules or teams as they can explode quadratically thus stifling your growth. Little’s law and King’s formula establishes that you need to reduce inventory or work in progress and avoid 100% utilization in order to provide reliable throughput. Conway’s law shows how architecture and team structures can be aligned for maximum autonomy and productivity. This allows you to accomplish more work by using small cross functional teams who own independent product lines and build modular architecture to reduce dependency on other teams and subsystems. Pareto principle can be used to make small changes to the architecture or teams that results in higher scalability and productivity. Wirth’s Law and Parkinson’s Law, when applied judiciously, can be instrumental in enhancing efficiency in software development. By setting more stringent timelines and clear, concise objectives, it can counteract the tendency for work to expand to fill the available time. Dunbar number only applies to people but it can be used to limit dependencies for external teams as a human mind has a finite capacity to maintain external relationships. However, before applying these laws, you should have clear goals and collect proper metrics and KPIs so that you can measure the baseline and improvements from these laws. You should also be cautious when applying these laws prematurely for scalability as it may make things worse. Finally, when solving scalability and performance related problems, it is vital to focus on global optimization to scale an entire organization or the system as opposed to a local optimization by focusing strictly only on a specific part of the system.

August 15, 2021

Structured Concurrency with Swift

Filed under: Concurrency,Uncategorized — Tags: , , , — admin @ 6:19 pm

I wrote about support of structured concurrency in Javascript/Typescript, Erlang/Elixir, Go, Rust, Kotlin and Swift last year (Part-I, Part-II, Part-III, Part-IV) but Swift language was still in development for async/await and actors support. The Swift 5.5 will finally have these new concurrency features available, which are described below:

Async/Await

As described in Part-IV, Swift APIs previously used completion handlers for asynchronous methods that suffered from:

  • Poor error handling because you could not use a single way to handle errors/exceptions instead separate callbacks for errors were needed
  • Difficult to cancel asynchronous operation or exit early after a timeout.
  • Requires a global reasoning of shared state in order to prevent race conditions.
  • Stack traces from the asynchronous thread don’t include the originating request so the code becomes hard to debug.
  • As Swift/Objective-C runtime uses native threads, creating a lot of background tasks results in expensive thread resources and may cause excessive context switching.
  • Nested use of completion handlers turn the code into a callback hell.

Following example shows poor use of control flow and deficient error handling when using completion handlers:

func fetchThumbnails(for ids: [String],
    completion handler: @escaping ([String: UIImage]?, Error?) -> Void) {
    guard let id = ids.first else { return handler([:], nil) }
    let request = thumbnailURLRequest(for: id)
    URLSession.shared.dataTask(with: request) { data, response, error in
        guard let response = response,
              let data = data else { return handler(nil, error) } // Poor error handling
        UIImage(data: data)?.prepareThumbnail(of: thumbSize) { image in
            guard let image = image else { return handler(nil, ThumbnailError()) }
        }
        fetchThumbnails(for: Arrays(ids.dropFirst()) { thumbnail, error in
            // cannot use loop
            ...
        }
    }
}

Though, use of Promise libraries help a bit but it still suffers from dichotomy of control flow and error handling. Here is equivalent code using async/await:

func fetchThumbnails(for ids: [String]) async throws -> [String: UIImage] {
	let thumbnails: [String: UIImage] = [:]
    for id in ids {
    	let request = thumbnailURLRequest(for: id)
        let (data, response) = try await URLSession.shared.dataTask(for: request)
        try validateResponse(response)
        guard let image = await UIImage(data: data)?.byPreparingThumbnail(ofSize: thumbSize) else { throw ThumbnailError()) }
        thumbnails[id] = image
    }
    return thumbnails
}

As you can see, above code not only improves control flow and adds uniform error handling but it also enhances readability by removing the nested structure of completion handlers.

Tasks Hierarchy, Priority and Cancellation

When a new task is created using async/await, it inherits the priority and local values of the parent task, which are then passed to the entire hierarchy of child tasks from the parent task. When a parent task is cancelled, the Swift runtime automatically cancels all child tasks, however Swift uses cooperative cancellation so child tasks must check for cancellation state otherwise they may continue to execute, however the results from cancelled tasks are discarded.

Continuations and Scheduling

Swift previously used native threads to schedule background tasks, where new threads were automatically created when a thread is blocked or waiting for another resource. The new Swift runtime creates native threads based on the number of cores and background tasks use continuations to schedule the background task on the native threads. When a task is blocked, its state is saved on the heap and another task is scheduled for processing on the thread. The await syntax suspends current thread and releases control until the child task is completed. This cooperative scheduling requires runtime support for non-blocking I/O operations and system APIs so that native threads are not blocked and continue to work on other background tasks. This also limits background tasks from using semaphores and locks, which are discussed below.

async function

In above example, when a thread is working on a background task “updateDatabase” that starts a child tasks “add” or “save”, it saves the tasks as continuations on heap. However, if current task is suspended then the thread can work on other tasks as shown below:

Multiple Asynchronous Tasks

The async/await in Swift also allows scheduling multiple asynchronous tasks and then awaiting for them later, e.g.

struct MarketData {
    let symbol: String
    let price: Int
    let volume: Int
}

struct HistoryData {
    let symbol: String
    let history: [Int]
    func sum() -> Int {
      history.reduce(0, +)
    }
}

func fetchMarketData(symbol: String) async throws -> MarketData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(MarketData(symbol: symbol, price: 10, volume: 200)))
        }
    }
}

func fetchHistoryData(symbol: String) async throws -> HistoryData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(HistoryData(symbol: symbol, history: [5, 10, 15, 20])))
        }
    }
}

func getMovingAverage(symbol: String) async throws -> Int {
    async let marketData = fetchMarketData(symbol: symbol)
    async let historyData = fetchHistoryData(symbol: symbol)
    let sum = try await marketData.price + historyData.sum()
    return try await sum / (historyData.history.count+1)
}

The async let syntax is called concurrent binding where the child task executes in parallel to the parent task.

Task Groups

The task groups allow dispatching multiple background tasks that are executed concurrently in background and Swift automatically cancels all child tasks when a parent task is cancelled. Following example demonstrates use of group API:

func downloadImage(id: String) async throws -> UIImage {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(UIImage(data: [])))
        }
    }
}

func downloadImages(ids: [String]) async throws -> [String: UIImage] {
    var images: [String: UIImage] = [:]
    try await withThrowingTaskGroup(of: (String, UIImage).self) { group in
        for id in ids {
            group.addTask(priority: .background) {
                return (id, try await downloadImage(id: id))
            }
        }
        for try await (id, image) in group {
            images[id] = image
        }
    }
    return images
}

As these features are still in development, Swift has recently changed group.async API to group.addTask. In above example, images are downloaded in parallel and then for try await loop gathers results.

Data Races

Swift compiler will warn you if you try to mutate a shared state from multiple background tasks. In above example, the asynchronous task returns a tuple of image-id and image instead of mutating shared dictionary. The parent task then mutates the dictionary using the results from the child task in for try await loop.

Cancellation

You can also cancel a background task using cancel API or cancel all child tasks of a group using group.cancelAll(), e.g.

group.cancelAll()

The Swift runtime automatically cancels all child tasks if any of the background task fails. You can store reference to a child task in an instance variable if you need to cancel a task in a different method, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]

    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
              ...
            }
    }
    func collectionView(_ collectionView: UICollectionView,
        didEndDisplaying cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
        imageTasks[item]?.cancel()
    }
}

As cancellation in Swift is cooperative, you must check cancellation state explicitly otherwise task will continue to execute but Swift will reject the results, e.g.

if Task.isCancelled {
    return // return early
}

Timeout

The task or async/await APIs don’t directly support timeout so you must implement it manually similar to cooperative cancellation.

Semaphores and Locks

Swift does not recommend using Semaphores and Locks with background tasks because they are suspended when waiting for an external resource and can be later resumed on a different thread. Following example shows incorrect use of semaphores with background tasks:

func updateDatabase(_ asyncUpdateDatabase: @Sendable @escaping () async -> Void {
  let semaphore = DispatchSemaphore(value: 0)
  Task {
    await asyncUpdateDatabase()
    semaphore.signal()
  }
  semaphore.wait() // Do not use unsafe primitives to wait across task boundaries
}

TaskLocal

You can annotate certain properties with TaskLocal, which are stored in the context of Task and is available to the task and all of its children, e.g.

enum TracingExample {
    @TaskLocal
    static let traceID: TraceID?
}
...
guard let traceID = TracingExample.traceID else {
  print("no trace id")
  return
}
print(traceID)

Detached Tasks (Unstructured)

Above tasks and async/await APIs are based on structured concurrency where parent task is not completed until all child background tasks are done with their work. However, Swift allows launching detached tasks that can continue to execute in background without waiting for the results, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]
    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
                defer { imageTasks[item] = nil }
                let images = try await getImages(for: ids)
                Task.detached(priority: .background) {
                    await withThrowingTaskGroup(of: Void.self) { g in
                        g.addTask { try await addImageCache(for: images) }
                        g.addTask { try await logImages(for: images) }
                    }
                }
                display(images, in: cell)
            }
    }
}

Legacy APIs

The legacy code that use completion-handlers can use following continuation APIs to support async/await syntax:

func persistPosts() async throws -> [Post] {
    typealias PostContinuation = CheckedContinuation<[Post], Error>
    return try await withCheckedThrowingContinuation { (continuation: PostContinuation) in
        self.getPersistentPosts { posts, error in
            if let error = error {
                continuation.resume(throwing: error)
            } else {
                continuation.resume(returning: posts)
            }
        }
    }
}

In above example, the getPersistentPosts method used completion-handler and persistPosts method provides a bridge so that you can use async/await syntax. The resume method can only called once for the continuation. 

You may also save continuation in an instance variable when you need to resume in another method, e.g.

class MyViewController: UIViewController {
    private var activeContinuation: CheckedContinuation<[Post], Error>?
    func sharePostsFromPeer() async throws -> [Post] {
        try await withCheckedThrowingContinuation { continuation in
            self.activeContinuation = continuation
            self.peerManager.syncSharedPosts()
        }
    }
}
extension MyViewController: PeerSyncDelegate {
    func peerManager(_ manager: PeerManager, received posts: [Post]) {
        self.activeContinuation?.resume(returning: posts)
        self.activeContinuation = nil
    }
    func peerManager(_ manager: PeerManager, hadError error: Error) {
        self.activeContinuation?.resume(throwing: error)
        self.activeContinuation = nil
    }
}

Implementing WebCrawler Using Async/Await

Following example shows implementation of WebCrawler using async/await described in Part I of the concurrency series:

import Foundation
struct Request {
    let url: String
    let depth: Int
    let deadline: DispatchTime
}
enum CrawlError: Error {
    case timeoutError(String)
}
let MAX_DEPTH = 4
let MAX_URLS = 11
let DOMAINS = [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "gh.com",
  "hi.com",
  "ij.com",
  "jk.com",
  "kl.com",
  "lm.com",
  "mn.com",
  "no.com",
  "op.com",
  "pq.com",
  "qr.com",
  "rs.com",
  "st.com",
  "tu.com",
  "uv.com",
  "vw.com",
  "wx.com",
  "xy.com",
  "yz.com",
];
public func crawl(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawl(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}
public func crawlWithActors(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawlWithActors(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}

///////////////// PRIVATE METHODS ////////////////
func doCrawl(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    try await withThrowingTaskGroup(of: (Request, Int).self) { group in
        for req in requests {
	    group.addTask(priority: .background) {
	        return (req, try await handleRequest(req))
	    }
        }
        for try await (req, childURLs) in group {
	    if totalChildURLs % 10 == 0 {
		print("received request \(req)")
	    }
	    totalChildURLs += childURLs
        }
    }
    return totalChildURLs
}
func doCrawlWithActors(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    let crawler = CrawlActor()
    for req in requests {
     	let childURLs = try await crawler.handle(req)
	totalChildURLs += childURLs
    }
    return totalChildURLs
}
func handleRequest(_ request: Request) async throws -> Int {
    let contents = try await download(request.url)
    let newContents = try await jsrender(request.url, contents)
    if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
        try await index(request.url, newContents)
        let urls = try await parseURLs(request.url, newContents)
        let childURLs = try await doCrawl(urls: urls, depth: request.depth + 1, deadline: request.deadline)
        return childURLs + 1
    } else {
        return 0
    }
}
func download(_ url: String) async throws -> String {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    return randomString(100)
}
func jsrender(_ url: String, _ contents: String) async throws -> String {
    // for SPA apps that use javascript for rendering contents
    return contents
}
func index(_ url: String, _ contents: String) async throws {
    // apply standardize, stem, ngram, etc for indexing
}
func parseURLs(_ url: String, _ contents: String) async throws -> [String] {
    // tokenize contents and extract href/image/script urls
    var urls = [String]()
    for _ in 0..<MAX_URLS {
        urls.append(randomUrl())
    }
    return urls
}
func hasContentsChanged(_ url: String, _ contents: String) -> Bool {
    return true
}
func isSpam(_ url: String, _ contents: String) -> Bool {
    return false
}
func randomUrl() -> String {
    let number = Int.random(in: 0..<WebCrawler.DOMAINS.count)
    return "https://" + WebCrawler.DOMAINS[number] + "/" + randomString(20)
}
func randomString(_ length: Int) -> String {
  let letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
  return String((0..<length).map{ _ in letters.randomElement()! })
}

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using try await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following are major features of the structured concurrency in Swift:

  • Concurrency scope?—?The async/await defines scope of concurrency where all child background tasks must be completed before returning from the asynchronous function.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling?—?Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • Swift runtime schedules asynchronous tasks on a fixed number of native threads and automatically suspends tasks when they wait for I/O or other resources.

Following are the major shortcomings in Swift for its support of structured concurrency:

  • The most glaring omission in above implementation is timeout, which is not supported in Swift’s implementation.
  • Swift runtime manages scheduling of tasks and you cannot pass your own execution dispatcher for scheduling background tasks.

Actors

Actor Model is a classic abstraction from 1970s for managing concurrency where an actor keeps its internal state private and uses message passing for interaction with its state and behavior. An actor can only work on one message at a time, thus it prevents any data races when accessing from multiple background tasks. I have previously written about actors and described them Part II of the concurrency series when covering Erlang and Elixir.

actor

Instead of creating a background task using serial queue such as:

final class Counter {
    private var queue = DispatchQueue(label: "counter.queue")
    private var _count : Int = 0
    var count: Int {
        queue.sync {
            _count
        }
    }

    func incr() {
        queue.async(flags: .barrier) {
            self._count += 1
        }
    }
    func decr() {
        queue.async(flags: .barrier) {
            self._count -= 1
        }
    }
}

The actor syntax simplifies such implementation and removes all boilerplate e.g.

actor Counter {
    var count: Int = 0
    func incr() {
        count += 1
    }
    func decr() {
        count -= 1
    }
}

Above syntax protects direct access to the internal state and you must use await syntax to access the state or behavior, e.g.

Task {
	let c = Counter()
    await withTaskGroup(of: Void.self) { group in
        for i in 0..<100 {
            group.async {
                await c.incr()
            }
        }
    }
    print("count \(await c.count)")
}

Priority Inversion Principle

The dispatch queue API applies priority inversion principle when a high priority task is behind low priority tasks, which bumps up the priority of low priority tasks ahead in the queue. The runtime environment then executes the high priority task after completing those low priority tasks. The actor API instead can choose high priority task directly from the actor’s queue without waiting for completion of the low priority tasks ahead in the queue.

Actor Reentrancy

If an actor invokes another actor or background task in its function, it may get suspended until the background task is completed. In the meantime, another client may invoke the actor and modify its state so you need to check assumptions when changing internal state. A continuation used for the background task may be scheduled on a different thread after resuming the work, you cannot rely on DispatchSemaphore, NSLock, NSRecursiveLock, etc. for synchronizations.

Following code from WWDC-2021 shows how reentrancy can be handled safely:

actor ImageDownloader {
    private enum CacheEntry {
        case inProgress(Task.Handle<Image, Error>)
        case ready(Image)
    }
    private var cache: [URL: CacheEntry] = [:]
    func downloadAndCache(from url: URL) async throws -> Image? {
        if let cached = cache[url] {
            switch cached {
            case .ready(let image):
                return image
            case .inProgress(let handle):
                return try await handle.get()
            }
        }
        let handle = async {
            try await downloadImage(from: url)
        }
        cache[url] = .inProgress(handle)
        do {
            let image = try await handle.get()
            cache[url] = .ready(image)
            return image
        } catch {
            cache[url] = nil
            throw error
        }
    }
}

The ImageDownloader actor in above example downloads and caches the image and while it’s downloading an image. The actor will be suspended while it’s downloding the image but another client can reenter the downloadAndCache method and download the same image. Above code prevents duplicate requests and reuses existing request to serve multiple concurrent clients.

Actor Isolation

The actors in Swift prevent invoking methods directly but you can annotate methods with nonisolated if you need to call them directly but those methods cannot mutate state, e.g.

actor Account {
    let id: String
    var balance: Double = 0
    init(id: String) {
        self.id = id
    }
}
extension Account: Hashable {
    nonisolated func hash(into hasher: inout Hasher) {
        hasher.combine(id)
   }
   static func == (lhs: Account, rhs: Account) -> Bool {
        return lhs.id == rhs.id
   }
}

Sendable

The actors requires that any data structure used in its internal state are thread safe and implement Sendable protocol such as:

  • Value types
  • Actors
  • Immutable classes
  • Synchronized classes
  • @Sendable Functions
struct Book: Sendable {
    var title: String
    var authors: [Author]
}

@MainActor

The UI apps require that all UI updates are performed on the main thread and previously you had to dispatch UI work to DispatchQueue.main queue. Swift now allows marking functions, classes or structs with a special annotations of @MainActor where the functions are automatically executed on the main thread, e.g.

@MainActor func checkedOut(_ books: [Book]) {
  booksView.checkedOutBooks = books
}
...
await checkedOut(booksOnLoan)

Following example shows how a view-controller can be annotated with the @MainActor annotations:

@MainActor class MyViewController: UIViewController {
  func onPress() .... // implicitly on main-thread
  nonisolated func fetch() async {
    ...

In above example, all methods for MyViewController are executed on the main thread, however you can exclude certain methods via nonisolated keyword.

@globalActor

The @globalActor annotation defines a singleton global actor and @MainActor is a kind of global actor. You can also define your own global actor such as:

@globalActor
public struct GlobalSettings {
  public actor SettingsActor {
     func rememberPassword() -> Bool {
        return UserDefaults.standard.bool(forKey: "rememberPassword")
     }
  }

  public static let shared = SettingsActor()
}

...
let rememberPassword = await GlobalSettings.shared.rememberPassword()

Message Pattern Matching

As actors in Swift use methods to invoke operations on actor, they don’t support pattern matching similar to Erlang/Elixir, which offer selecting next message to process by comparing one or more fields in the message.

Local only

Unlike actors in Erlang or Elixir, actors in Swift can only communicate with other actors in the same process or application and they don’t support distributed communication to remote actors.

Actor Executor/Dispatcher

The actor protocol defines following property to access the executor:

var unownedExecutor: UnownedSerialExecutor

However, unownedExecutor is a read-only property that cannot be changed at this time.

Implementing WebCrawler Using Actors and Tasks

Following example shows implementation of WebCrawler using actors and tasks described in Part I of the concurrency series:

import Foundation
actor CrawlActor {
    public func handle(_ request: Request) async throws -> Int {
	let contents = try await download(request.url)
	let newContents = try await jsrender(request.url, contents)
  	if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
    	    try await index(request.url, newContents)
    	    let urls = try await parseURLs(request.url, newContents)
    	    let childURLs = try await doCrawlWithActors(urls: urls, depth: request.depth + 1, deadline: request.deadline)
    	    return childURLs + 1
  	} else {
    	    return 0
  	}
    }
}

Above implementation uses actors for processing crawling requests but it shares other code for parsing and downloading web pages. As an actor provides a serialize access to its state and behavior, you can’t use a single actor to implement a highly concurrent web crawler. Instead, you may divide the web domain that needs to be crawled into a pool of actors that can share the work.

Performance Comparison

Following table from Part-IV summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

You can download full code for Swift example from https://github.com/bhatti/concurency-katas/tree/main/swift.

Overall, Swift’s new features for structured concurrency including async/await and actors is a welcome addition to its platform. On the downside, Swift concurrency APIs lack support for timeouts, customized dispatcher/executors and micro benchmarks showed higher overhead than expected. However, on the positive side, the Swift runtime catches errors due to data races and the new async/await/actors syntax prevents bugs that were previously caused by incorrect use of completion handlers and error handling. This will help developers write more robust and responsive apps in the future.

December 26, 2020

Applying Structured Concurrency patterns in GO with case-study of Gitlab-runner

Filed under: Concurrency,GO — admin @ 9:31 pm

I recently wrote a series of blogs on structured concurrency (Part-I, Part-II, Part-III, Part-IV) and how it improves readability, concurrency-scope, composition and flow-control of concurrent code and adds better support for error-handling, cancellation, and timeout. I have been using GO on a number of projects over last few years and I will share a few concurrency patterns that I have used or seen in other projects such as gitlab-runner. I demonstrated in above series how structured concurrency considers GO statement harmful similar to GOTO in structured programming. Just as structured programming replaced GOTO with control-flow primitives such as single-entry/exit, if-then, loop, and functions calls; structured concurrency provides scope of concurrency where parent waits for all asynchronous code. I will show how common concurrency patterns in GO can take advantage of structured concurrency.

Asynchronous Tasks

The primary purpose of goroutines is to perform asynchronous tasks where you might be requesting a data from remote API or database, e.g. here is a sample code from gitlab-runner that uses goroutines to copy archive artifacts:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
	waitCh := make(chan error)
	go func() {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
		waitCh <- err
	}()

	select {
	case <-ctx.Done():
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
		return <-waitCh

	case err := <-waitCh:
		return err
	}
}

Here is a async/await based syntax based on async_await.go that performs similar task using structured concurrency:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
    ctx := context.Background()
    timeout := ..
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
        return nil, err
    }
    abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
    }
    async.Execute(ctx, handler, abort).Await(ctx, timeout)
}

Above code defines scope of concurrency and adds support for timeout while making the code easier to comprehend.

Note: due to lack of generics in GO, interface{} are used to accept different types that may be passed to asynchronous tasks.

Racing Asynchronous Tasks

A common use of goroutines is spawning multiple asynchronous tasks and takes result of first task that completes e.g. here is a sample code from gitlab-runner that uses goroutines to copy stdout and stderr:

	stdoutErrCh := make(chan error)
	go func() {
		_, errCopy := stdcopy.StdCopy(output, output, hijacked.Reader)
		stdoutErrCh <- errCopy
	}()

	// Write the input to the container and close its STDIN to get it to finish
	stdinErrCh := make(chan error)
	go func() {
		_, errCopy := io.Copy(hijacked.Conn, input)
		_ = hijacked.CloseWrite()
		if errCopy != nil {
			stdinErrCh <- errCopy
		}
	}()

	// Wait until either:
	// - the job is aborted/cancelled/deadline exceeded
	// - stdin has an error
	// - stdout returns an error or nil, indicating the stream has ended and
	//   the container has exited
	select {
	case <-ctx.Done():
		err = errors.New("aborted")
	case err = <-stdinErrCh:
	case err = <-stdoutErrCh:
	}

Above code creates stdoutErrCh channel to capture errors from stdout and stdinErrCh channel to capture errors from stderr and then waits for either to finish.

Here is equivalent code that uses structured concurrency with async/await primitives from async_racer.go:

    ctx := context.Background()
    timeout := ..
    pollin
    handler1 := func(ctx context.Context) (interface{}, error) {
        return nil, stdcopy.StdCopy(output, output, hijacked.Reader)
    }
    handler2 := func(ctx context.Context) (interface{}, error) {
		defer hijacked.CloseWrite()
        return nil, io.Copy(hijacked.Conn, input)
    }
    future, _ := async. ExecuteRacer(ctx, handler1, handler2)
     _, err := future.Await(ctx, timeout)

Above code uses async/await syntax to define scope of concurrency and clarifies intent of the business logic without distraction of concurrency logic.

Performing cleanup when task is aborted or cancelled

If a goroutine spawns an external process for background work, you may need to kill that process in case goroutine task is cancelled or times out. For example, here is a sample code from gitlab-runner that calls KillAndWait function to terminate external process when context.Done() is invoked:

   func (c *command) Run() error {
       err := c.cmd.Start()
       if err != nil {
           return fmt.Errorf("failed to start command: %w", err)
       }

       go c.waitForCommand()

       select {
       case err = <-c.waitCh:
           return err

       case <-c.context.Done():
           return newProcessKillWaiter(c.logger, c.gracefulKillTimeout, c.forceKillTimeout).
               KillAndWait(c.cmd, c.waitCh)
       }
   }

In above, Run method starts a command in goroutine, waits for completion in another goroutine and then listens to response from waitCh and context.Done channels.

Here is how async/await structure from async_await.go can apply structured concurrency to above code:

   func (c *command) Run() error {
       timeout := ...
       ctx := context.Background()
       handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, c.cmd.Run()
       }
       abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, KillAndWait(c.cmd, c.waitCh)
       }
       Execute(ctx, handler, abort, nil).Await(ctx, timeout)
   }

Using GO Channels as data pipe/queue

GO channels are designed based on CSP rendezvous primitives where both sender and receiver have to wait to exchange messages. However, you can add buffering to make these channels as bounded queue (for back-pressure). Here is an example code from gitlab-runner that uses channels to stream log messages:

 func (l *kubernetesLogProcessor) scan(ctx context.Context, logs io.Reader) (*bufio.Scanner, <-chan string) {
     logsScanner := bufio.NewScanner(logs)

     linesCh := make(chan string)
     go func() {
         defer close(linesCh)

         // This goroutine will exit when the calling method closes the logs stream or the context is cancelled
         for logsScanner.Scan() {
             select {
             case <-ctx.Done():
                 return
             case linesCh <- logsScanner.Text():
             }
         }
     }()

     return logsScanner, linesCh
 }

Above code creates a channel linesCh without any buffer and then creates a goroutine where logs are read and sent to linesCh channel. As I mentioned, above code will block sender until the receiver is ready to receive these log messages and you may lose log messages if receiver is slow and goroutine is killed before logs can be read. Though, structured concurrency cannot help in this case but we can simply use an event-bus or a local message-queue to stream these logs.

WaitGroup to wait for completion of goroutines

GO language supports sync.WaitGroup to wait for completion of goroutines but it’s redundant if you are also using channels to receive for reply. For example, here is a sample code from gitlab-runner that uses WaitGroup to wait for completion of groutines:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
         wg := sync.WaitGroup{}
         for _, service := range e.services {
             wg.Add(1)
             go func(service *types.Container) {
                 _ = e.waitForServiceContainer(service, time.Duration(waitForServicesTimeout)*time.Second)
                 wg.Done()
             }(service)
         }
         wg.Wait()
     }
 }

Here is how above code can be replaced by async/await syntax from my async_await.go:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
        ctx := context.Background()
        timeout := time.Duration(waitForServicesTimeout)*time.Second
        futures := make([]async.Awaiter, len(e.services))
        handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
          return e.waitForServiceContainer(payload.(string)), nil
        }
        for i:=0; i<len(e.services); i++ {
          futures[i] = async.Execute(ctx, handler, async.NoAbort, e.services[i])
	    }
        async.AwaitAll(ctx, timeout, futures...)
     }
 }

Above code is not shorter than the original but it is more readable and eliminates subtle bugs where you might be using WaitGroup incorrectly, thus resulting in deadlock.

Fork-Join based Asynchronous Tasks

One common use of goroutines is to spawn multiple asynchronous tasks and wait for their completion similar to fork-join pattern, e.g. here is a sample code from gitlab-runner that uses goroutines to perform cleanup of multiple services and sends back result via a channel.

func (s *executor) cleanupServices() {
	ch := make(chan serviceDeleteResponse)
	var wg sync.WaitGroup
	wg.Add(len(s.services))

	for _, service := range s.services {
		go s.deleteKubernetesService(service.ObjectMeta.Name, ch, &wg)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for res := range ch {
		if res.err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.err))
		}
	}
}

func (s *executor) deleteKubernetesService(serviceName string, ch chan<- serviceDeleteResponse, wg *sync.WaitGroup) {
	defer wg.Done()

	err := s.kubeClient.CoreV1().
		Services(s.configurationOverwrites.namespace).
		Delete(serviceName, &metav1.DeleteOptions{})
	ch <- serviceDeleteResponse{serviceName: serviceName, err: err}
}

The cleanupServices method above goes through a collection of services and then calls deleteKubernetesService in goroutine, which calls kubernetes API to remove given service. It waits for all background using WaitGroup and then receives any errors from error channel and logs them.

Here is how you can use async/await code from async_await.go that applies structured concurrency by abstracting low-level goroutines and channels:

func (s *executor) cleanupServices() {
    ctx := context.Background()
    timeout := time.Duration(5 * time.Second)
    futures := make([]async.Awaiter, len(s.services))
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         serviceName := payload.(string)
         err := s.kubeClient.CoreV1().
      		Services(s.configurationOverwrites.namespace).
			Delete(serviceName, &metav1.DeleteOptions{})
         return nil, err
    }
    for i:=0; i<len(s.services); i++ {
        futures[i] = async.Execute(ctx, handler, async.NoAbort, s.services[i])
	}
    results := async.AwaitAll(ctx, timeout, futures...)
	for res := range results {
		if res.Err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.Err))
		}
	}
}

Above code uses structured concurrency by defining scope of asynchronous code in cleanupServices and adds better support for cancellation, timeout and error handling. Also, you don’t need to use WaitGroup to wait for completion anymore.

Polling Asynchronous Task

In some cases, you may need to poll a background task to check its status or wait for its completion, e.g. here is a sample code from gitlab-runner that waits for pod until it’s running:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
	pollInterval := config.GetPollInterval()
	pollAttempts := config.GetPollAttempts()
	for i := 0; i <= pollAttempts; i++ {
		select {
		case r := <-triggerPodPhaseCheck(c, pod, out):
			if !r.done {
				time.Sleep(time.Duration(pollInterval) * time.Second)
				continue
			}
			return r.phase, r.err
		case <-ctx.Done():
			return api.PodUnknown, ctx.Err()
		}
	}
	return api.PodUnknown, errors.New("timed out waiting for pod to start")
}

func triggerPodPhaseCheck(c *kubernetes.Clientset, pod *api.Pod, out io.Writer) <-chan podPhaseResponse {
	errc := make(chan podPhaseResponse)
	go func() {
		defer close(errc)
		errc <- getPodPhase(c, pod, out)
	}()
	return errc
}

The waitForPodRunning method above repeatedly calls triggerPodPhaseCheck, which creates a goroutine and then invokes kubernetes API to get pod status. It then returns pod status in a channel that waitForPodRunning listens to.

Here is equivalent code using async/await from async_polling.go:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
    timeout := config.getPodRunningTimeout()
    pollInterval := config.GetPollInterval()
    handler := func(ctx context.Context, payload interface{}) (bool, interface{}, error) {
         r := getPodPhase(c, pod, out)
         return r.done, r.phase, r.err
    }
    _, phase, err := async.ExecutePolling(ctx, handler, NoAbort, 0, pollInterval).
  						Await(ctx, timeout)
	return r.phase, r.err
}

Above code removes complexity due to manually polling and managing goroutines/channels.

Background Task with watchdog

Another concurrency pattern in GO involves starting a background task but then launch another background process to monitor the task or its runtime environment so that it can terminate background task if watchdog finds any errors. For example, here is a sample code from gitlab-runner that executes a command inside kubernetes pod and then it launches another goroutine to monitor pod status, i.e.,

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...
...

	podStatusCh := s.watchPodStatus(ctx)

	select {
	case err := <-s.runInContainer(containerName, containerCommand):
		s.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
		var terminatedError *commandTerminatedError
		if err != nil && errors.As(err, &terminatedError) {
			return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
		}

		return err
	case err := <-podStatusCh:
		return &common.BuildError{Inner: err}
	case <-ctx.Done():
		return fmt.Errorf("build aborted")
	}
}

func (s *executor) watchPodStatus(ctx context.Context) <-chan error {
	// Buffer of 1 in case the context is cancelled while the timer tick case is being executed
	// and the consumer is no longer reading from the channel while we try to write to it
	ch := make(chan error, 1)

	go func() {
		defer close(ch)

		t := time.NewTicker(time.Duration(s.Config.Kubernetes.GetPollInterval()) * time.Second)
		defer t.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-t.C:
				err := s.checkPodStatus()
				if err != nil {
					ch <- err
					return
				}
			}
		}
	}()

	return ch
}

func (s *executor) runInContainer(name string, command []string) <-chan error {
	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)

		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		err := retryable.Run()
		if err != nil {
			errCh <- err
		}

		exitStatus := <-s.remoteProcessTerminated
		if *exitStatus.CommandExitCode == 0 {
			errCh <- nil
			return
		}

		errCh <- &commandTerminatedError{exitCode: *exitStatus.CommandExitCode}
	}()

	return errCh
}

In above code, runWithAttach calls watchPodStatus to monitor status of the pod in background goroutine and then executes command in runInContainer.

Here is equivalent code using async/await from async_watchdog.go:

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...

...
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
      return nil, s.runInContainer(containerName, containerCommand)
    }
    watchdogHandler := func(ctx context.Context, payload interface{}) error {
         return nil, s.checkPodStatus()
    }

    res, err := async.ExecuteWatchdog(ctx, handler, watchdogHandler, async.NoAbort, nil, poll).
  				Await(ctx, timeout)
	if err != nil {
		return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
	}
}

func (s *executor) runInContainer(name string, command []string) error {
		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		return retryable.Run()
}

Above code removes extraneous complexity due to concurrent code embedded with functional code and makes it easier to comprehend with improved support of concurrency scope, error handling, timeout and cancellation.

Other accidental complexity in Gitlab-Runner

Besides concurrency, here are a few other design choices that adds accidental complexity in gitlab-runner:

Abstraction

The primary goal of gitlab-runner is to abstract executor framework so that it can use different platforms such as Docker, Kubernetes, SSH, Shell, etc to execute processes. However, it doesn’t define an interface for common behavior such as managing runtime containers or executing a command for these executors.

Adapter/Gateway pattern

A common pattern to abstract third party library or APIs is to use an adapter or gateway pattern but gitlab-runner mixes external APIs with internal executor logic. The kubernetes executor in gitlab-runner defines logic for both interacting with external Kubernetes server and managing Kubernetes Pod or executing processes inside those pods. For example, my initial intent for looking at the gitlab-runner was to adopt APIs for interacting with Kubernetes but I could not reuse any code as a library and instead I had to copy relevant code for my use-case.

Separation of Concerns

The gitlab-runner is not only riddled with concurrency related primitives such as goroutines and channels but it also mixes other aspects such as configuration, feature-flags, logging, monitoring, etc. For example, it uses configurations for defining containers, services, volumes for Kubernetes but it hard codes various internal configurations for build, helpers, monitoring containers instead of injecting them via external configuration. Similarly, it hard codes volumes for repo, cache, logging, etc. A common design pattern in building software is to use layers of abstractions or separation of concerns where each layer or a module is responsible for a single concern (single-responsibility principle). For example, you can divide executors into three layers: adapter-layer, middle-layer and high-level layer where adapter layer strictly interacts with underlying platform such as Kubernetes and no other layer needs to know internal types or APIs of Kubernetes. The middle layer uses adapter layer to delegate any executor specific behavior and defines methods to configure containers. The high-level layer is responsible for injecting dependent services, containers and configurations to execute jobs within the gitlab environment. I understand some of these abstractions might become leaky if there are significant differences among executors but such design allows broader reuse of lower-level layers in other contexts.

Summary

Though, modern scalable and performant applications demand concurrency but sprinkling low-level concurrency patterns all over your code adds significant accidental or extraneous complexity to your code. In this blog, I demonstrated how encapsulating concurrent code with library of structured concurrency patterns can simplify your code. Concurrency is hard especially with lower-level primitives such as WaitGroup, Mutex, goroutines, channels in GO. In addition, incorrect use of these concurrency primitives can lead to deadlocks or race conditions. Using a small library for abstracting common concurrency patterns can reduce probability of concurrency related bugs. Finally, due to lack of immutability, strong ownership, generics or scope of concurrency in GO, you still have to manage race conditions and analyze your code carefully for deadlocks but applying structured concurrency can help manage concurrent code better.

Powered by WordPress