Shahzad Bhatti Welcome to my ramblings and rants!

October 10, 2022

Implementing Distributed Locks (Mutex and Semaphore) with Databases

Filed under: Concurrency,Rust,Uncategorized — admin @ 10:55 pm

Overview

I recently needed a way to control access to shared resources in a distributed system for concurrency control. Though, you may use Consul or Zookeeper (or low-level Raft / Paxos implementations if you are brave) for managing distributed locks but I wanted to reuse existing database without adding another dependency to my tech stack. Most databases support transactions or conditional updates with varying degree of support for transactional guarantees, but they can’t be used for distributed locks if the business logic you need to protect resides outside the databases. I found a lock client library based on AWS Databases but it didn’t support semaphores. The library implementation was tightly coupled with concerns of lock management and database access and it wasn’t easy to extend it easily. For example, following diagram shows how cyclic dependencies in the code:

class diagram

Due to above deficiencies in existing solutions, I decided to implement my own implementation of distributed locks in Rust with following capabilities:

  • Allow creating lease based locks that can either protect a single shared resource with a Mutex lock or protect a finite set of shared resources with a Semaphore lock.
  • Allow renewing leases based on periodic intervals so that stale locks can be acquired by other users.
  • Allow releasing Mutex and semaphore locks explicitly after the user performs a critical action.
  • CRUD APIs to manage Mutex and Semaphore entities in the database.
  • Multi-tenancy support for different clients in case the database is shared by multiple users.
  • Fair locks to support first-come and first serve based access grant when acquiring same lock concurrently.
  • Scalable solution for supporting tens of thousands concurrent mutexes and semaphores.
  • Support multiple data stores such as relational databases such as MySQL, PostgreSQL, Sqlite and as well as NoSQL/Cache data stores such as AWS Dynamo DB and Redis.

High-level Design

I chose Rust to build the library for managing distributed locks due to strict performance and correctness requirements. Following diagram shows the high-level components in the new library:

LockManager Interface

The client interacts with the LockManager that defines following operations to acquire, release, renew lock leases and manage lifecycle of Mutexes and Semaphores:

Rust

The LockManager interacts with LockStore to access mutexes and semaphores, which delegate to implementation of mutex and semaphore repositories for lock management. The library defines two implementation of LockStore: first, DefaultLockStore that supports mutexes and semaphores where mutexes are used to acquire a singular lock whereas semaphores are used to acquire a lock from a set of finite shared resources. The second, FairLockStore uses a Redis specific implementation of fair semaphores for managing lease based semaphores that support first-come and first-serve order. The LockManager supports waiting for the lock to be available if lock is not immediately available where it periodically checks for the availability of mutex or semaphore based lock. Due to this periodic polling, the fair semaphore algorithm won’t support FIFO order if a new client requests a lock while previous lock request is waiting for next polling interval.

Create Lock Manager

You can instantiate a Lock Manager with relational database store as follows:

Rust

Alternatively, you can choose AWS Dynamo DB as follows:

Rust

Or Redis based data-store as follows:

Rust

Note: The AWS Dynamo DB uses strongly consistent reads feature as by default it is eventually consistent reads.

Acquiring a Mutex Lock

You will need to build options for acquiring with key name and lease period in milliseconds and then acquire it:

Rust

The acquire_lock operation will automatically create mutex lock if it doesn’t exist otherwise it will wait for the period of lease-time if the lock is not available. This will return a structure for mutex lock that includes:

JSON

Renewing the lease of Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

Rust

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

Rust

Acquiring a Semaphore based Lock

The semaphores allow you to define a set of locks for a resource with a maximum size. The operation for acquiring semaphore is similar to acquiring regular lock except you specify semaphore size, e.g.:

Rust

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

JSON

The semaphore lock will create mutexes internally that will be numbered from 0 to max-size (exclusive). You can get semaphore details using:

Rust

That would return:

JSON

Or, fetch state of all mutexes associated with the semaphore using:

Rust

Which would return:

JSON

Renewing the lease of Semaphore Lock

A lock is only available for the duration specified in lease_duration period, but you can renew it periodically if needed:

Rust

Note: The lease renewal will also update the version of lock so you will need to use the updated version to renew or release the lock.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

Rust

Acquiring a Fair Semaphore

The fair semaphores is only available for Redis due to internal implementation, and it requires enabling it via fair_semaphore configuration option, otherwise its usage is similar to above operations, e.g.:

Rust

Then acquire lock similar to the semaphore syntax as before:

Rust

The acquire_lock operation will automatically create semaphore if it doesn’t exist and it will then check for available locks and wait if all the locks are busy. This will return a structure for lock that includes:

JSON

The fair semaphore lock does not use mutexes internally but for the API compatibility, it builds a mutex with a key based on combination of semaphore-key and version. You can then query semaphore state as follows:

Rust

That would return:

JSON

Or, fetch state of all mutexes associated with the semaphore using:

Rust

Which would return:

JSON

Note: The mutex_key will be slightly different for unlocked mutexes as mutex-key isn’t needed for internal implementation.

Renewing the lease of Fair Semaphore Lock

You can renew lease of fair semaphore similar to above semaphore syntax, e.g.:

Rust

Note: Due to internal implementation of fair semaphore, the version won’t be changed upon lease renewal.

Releasing the lease of Semaphore Lock

You can build options for releasing from the lock returned by above API as follows and then release it:

Rust

Command Line Interface

In addition to a Rust based interface, the distributed locks library also provides a command line interface for managing mutex and semaphore based locks, e.g.:

Shell

For example, you can acquire fair semaphore lock as follows:

Shell

Which would return:

JSON

You can run following command for renewing above lock:

Shell

And then release it as follows:

Shell

Summary

I was able to meet the initial goals for implementing distributed locks and though this library is early in development. You can download and try it from https://github.com/bhatti/db-locks. Feel free to send your feedback or contribute to this library.

May 12, 2022

Applying Laws of Scalability to Technology and People

As businesses grow with larger customers size and hire more employees, they face challenges to meet the customer demands in terms of scaling their systems and maintaining rapid product development with bigger teams. The businesses aim to scale systems linearly with additional computing and human resources. However, systems architecture such as monolithic or ball of mud makes scaling systems linearly onerous. Similarly, teams become less efficient as they grow their size and become silos. A general solution to solve scaling business or technical problems is to use divide & conquer and partition it into multiple sub-problems. A number of factors affect scalability of software architecture and organizations such as the interactions among system components or communication between teams. For example, the coordination, communication and data/knowledge coherence among the system components and teams become disproportionately expensive with the growth in size. The software systems and business management have developed a number of laws and principles that can used to evaluate constraints and trade offs related to the scalability challenges. Following is a list of a few laws from the technology and business domain for scaling software architectures and business organizations:

Amdhal’s Law

Amdahl’s Law is named after Gene Amdahl that is used to predict speed up of a task execution time when it’s scaled to run on multiple processors. It simply states that the maximum speed up will be limited by the serial fraction of the task execution as it will create resource contention:

Speed up (P, N) = 1 / [ (1 - P) + P / N ]

Where P is the fraction of task that can run in parallel on N processors. When N becomes large, P / N approaches 0 so speed up is restricted to 1 / (1 – P) where the serial fraction (1 – P) becomes a source of contention due to data coherence, state synchronization, memory access, I/O or other shared resources.

Amdahl’s law can also be described in terms of throughput using:

N / [ 1 + a (N - 1) ]

Where a is the serial fraction between 0 and 1. In parallel computing, a class of problems known as embarrassingly parallel workload where the parallel tasks have a little or no dependency among tasks so their value for a will be 0 because they don’t require any inter-task communication overhead.

Amdah’s law can be used to scale teams as an organization grows where the teams can be organized as small and cross-functional groups to parallelize the feature work for different product lines or business domains, however the maximum speed up will still be limited by the serial fraction of the work. The serial work can be: build and deployment pipelines; reviewing and merging changes; communication and coordination between teams; and dependencies for deliverables from other teams. Fred Brooks described in his book The Mythical Man-Month how adding people to a highly divisible task can reduce overall task duration but other tasks are not so easily divisible: while it takes one woman nine months to make one baby, “nine women can’t make a baby in one month”.

The theoretical speedup of the latency of the execution of a program according to Amdahl’s law (credit wikipedia).

Brooks’s Law

Brooks’s law was coined by Fred Brooks that states that adding manpower to a late software project makes it later due to ramp up time. As the size of team increases, the ramp up time for new employees also increases due to quadratic communication overhead among team members, e.g.

Number of communication channels = N x (N - 1) / 2

The organizations can build small teams such as two-pizza/single-threaded teams where communication channels within each team does not explode and the cross-functional nature of the teams require less communication and dependencies from other teams. The Brook’s law can be equally applied to technology when designing distributed services or components so that each service is designed as a loosely coupled module around a business domain to minimize communication with other services and services only communicate using a well designed interfaces.

Universal Scalability Law

The Universal Scalability Law is used for capacity planning and was derived from Amdahl’s law by Dr. Neil Gunther. It describes relative capacity in terms of concurrency, contention and coherency:

C(N) = N / [1 + a(N – 1) + B.N (N – 1) ]

Where C(N) is the relative capacity, a is the serial fraction between 0 and 1 due to resource contention and B is delay for data coherency or consistency. As data coherency (B) is quadratic in N so it becomes more expensive as size of N increases, e.g. using a consensus algorithm such as Paxos is impractical to reach state consistency among large set of servers because it requires additional communication between all servers. Instead, large scale distributed storage services generally use sharding/partitioning and gossip protocol with a leader-based consensus algorithm to minimize peer to peer communication.

The Universal Scalability Law can be applied to scale teams similar to Amdahl’s law where a is modeled for serial work or dependency between teams and B is modeled for communication and consistent understanding among the team members. The cost of B can be minimized by building cross-functional small teams so that teams can make progress independently. You can also apply this model for any decision making progress by keeping the size of stake holders or decision makers small so that they can easily reach the agreement without grinding to halt.

The gossip protocols also applies to people and it can be used along with a writing culture, lunch & learn and osmotic communication to spread knowledge and learnings from one team to other teams.

Little’s Law

Little’s Law was developed by John Little to predict number of items in a queue for stable stable and non-preemptive. It is part of queueing theory and is described mathematically as:

L = A W

Where L is the average number of items within the system or queue, A is the average arrival time of items and W is the average time an item spends in the system. The Little’s law and queuing theory can be used for capacity planning for computing servers and minimizing waiting time in the queue (L).

The Little’s law can be applied for predicting task completion rate in an agile process where L represents work-in-progress (WIP) for a sprint; A represents arrival and departure rate or throughput/capacity of tasks; W represents lead-time or an average amount of time in the system.

WIP = Throughput x Lead-Time

Lead-Time = WIP / Throughput

You can use this relationship to reduce the work in progress or lead time and improve throughput of tasks completion. Little’s law observes that you can accomplish more by keeping work-in-progress or inventory small. You will be able to better respond to unpredictable delays if you keep a buffer in your capacity and avoid 100% utilization.

King’s formula

The King’s formula expands Little’s law by adding utilization and variability for predicting wait time before serving of requests:

{\displaystyle \mathbb {E} (W_{q})\approx \left({\frac {\rho }{1-\rho }}\right)\left({\frac {c_{a}^{2}+c_{s}^{2}}{2}}\right)\tau }
(credit wikipedia)

where T is the mean service time, m (1/T) is the service rate, A is the mean arrival rate, p = A/m is the utilization, ca is the coefficient of variation for arrivals and cs is the coefficient of variation for service times. The King’s formula shows that the queue sizes increases to infinity as you reach 100% utilization and you will have longer queues with greater variability of work. These insights can be applied to both technical and business processes so that you can build systems with a greater predictability of processing time, smaller wait time E(W) and higher throughput ?.

Note: See Erlang analysis for serving requests in a system without a queue where new requests are blocked or rejected if there is not sufficient capacity in the system.

Gustafson’s Law

Gustafson’s law improves Amdahl’s law with a keen observation that parallel computing enables solving larger problems by computations on very large data sets in a fixed amount of time. It is defined as:

S = s + p x N

S = (1 – s) x N

S = N + (1 – N) x s

where S is the theoretical speed up with parallelism, N is the number of processors, s is the serial fraction and p is the parallel part such that s + p = 1.

Gustafson’s law shows that limitations imposed by the sequential fraction of a program may be countered by increasing the total amount of computation. This allows solving bigger technical and business problems with a greater computing and human resources.

Conway’s Law

Conway’s law states that an organization that designs a system will produce a design whose structure is a copy of the organization’s communication structure. It means that the architecture of a system is derived from the team structures of an organization, however you can also use the architecture to derive the team structures. This allows defining building teams along the architecture boundaries so that each team is a small, cross functional and cohesive. A study by the Harvard Business School found that the often large co-located teams tended to produce more tightly-coupled and monolithic codebases whereas small distributed teams produce more modular codebases. These lessons can be applied to scaling teams and architecture so that teams and system modules are built around organizational boundaries and independent concerns to promote autonomy and reduce tight coupling.

Pareto Principle

The Pareto principle states that for many outcomes, roughly 80% of consequences come from 20% of causes. This principle shows up in numerous technical and business problems such as 20% of code has the 80% of errors; customers use 20% of functionality 80% of the time; 80% of optimization improvements comes from 20% of the effort, etc. It can also be used to identify hotspots or critical paths when scaling, as some microservices or teams may receive disproportionate demands. Though, scaling computing resources is relatively easy but scaling a team beyond an organization boundary is hard. You will have to apply other management tools such as prioritization, planning, metrics, automation and better communication to manage critical work.

Metcalfe’s Law

The Metcalfe’s law states that if there are N users of a telecommunications network, the value of the network is N2. It’s also referred as Network effects and applies to social networking sites.

Number of possible pair connections = N * (N – 1) / 2

Reed’s Law expanded this law and observed that the utility of large networks can scale exponentially with the size of the network.

Number of possible subgroups of a network = 2N – N – 1

This law explains the popularity of social networking services via viral communication. These laws can be applied to model information flow between teams or message exchange between services to avoid peer to peer communication with extremely large group of people or a set of nodes. A common alternative is to use a gossip protocol or designate a partition leader for each group that communicates with other leaders and then disseminate information to the group internally.

Dunbar Number

The Dunbar’s number is a suggested cognitive limit to the number of people with whom one can maintain stable social relationships. It has a commonly used value of 150 and can be used to limit direct communication connections within an organization.

Wirth’s Law and Parkinson’s Law

The Wirth’s Law is named after Niklaus Wirth who observed that the software is getting slower more rapidly than hardware is becoming faster. Over the last few decades, processors have become exponentially faster as a Moor’s Law but often that gain allows software developers to develop more complex software that consumes all gains of the speed. Another factor is that it allows software developers to use languages and tools that may not generate more efficient code so the code becomes bloated. There is a similar law in software development called Parkinson’s law that work expands to fill the time available for it. Though, you also have to watch for Hofstadter’s Law that states that “it always takes longer than you expect, even when you take into account Hofstadter’s Law”; and Brook’s Law, which states that “adding manpower to a late software project makes it later.”

The Wirth’s Law, named after Niklaus Wirth, posits that software tends to become slower at a rate that outpaces the speed at which hardware becomes faster. This observation reflects a trend where, despite significant advancements in processor speeds as predicted by Moor’s Law , software complexity increases correspondingly. Developers often leverage these hardware improvements to create more intricate and feature-rich software, which can negate the hardware gains. Additionally, the use of programming languages and tools that do not prioritize efficiency can lead to bloated code.

In the realm of software development, there are similar principles, such as Parkinson’s law, which suggests that work expands to fill the time allotted for its completion. This implies that given more time, software projects may become more complex or extended than initially necessary. Moreover, Hofstadter’s Law offers a cautionary perspective, stating, “It always takes longer than you expect, even when you take into account Hofstadter’s Law.” This highlights the often-unexpected delays in software development timelines. Brook’s Law further adds to these insights with the adage, “Adding manpower to a late software project makes it later.” These laws collectively emphasize that the demand upon a resource tends to expand to match the supply of the resource but adding resources later also poses challenges due to complexity in software development and project management.

Principle of Priority Inversion

In modern operating systems, the concept of priority inversion arises when a high-priority process needs resources or data from a low-priority process, but the low-priority process never gets a chance to execute due to its lower priority. This creates a deadlock or inefficiency where the high-priority process is blocked indefinitely. To avoid this, schedulers in modern operating systems adjust the priority of the lower-priority process to ensure it can complete its task and release the necessary resources, allowing the high-priority process to continue.

This same principle applies to organizational dynamics when scaling teams and projects. Imagine a high-priority initiative that requires collaboration from another team whose priorities do not align. Without proper coordination, the team working on the high-priority initiative may never get the support they need, leading to delays or blockages. Just as in operating systems, where a priority adjustment is needed to keep processes running smoothly, organizations must also ensure alignment across teams by managing a global list of priorities. A solution is to maintain a global prioritized list of projects that is visible to all teams. This ensures that the most critical initiatives are recognized and appropriately supported by every team, regardless of their individual workloads. This centralized prioritization ensures that teams working on essential projects can quickly receive the help or resources they need, avoiding bottlenecks or deadlock-like situations where progress stalls because of misaligned priorities.

Load Balancing (Round Robin, Least Connection)

Load balancing algorithms distribute tasks across multiple servers to optimize resource utilization and prevent any single server from becoming overwhelmed. Common strategies include round-robin (distributing tasks evenly across servers) and least connection (directing new tasks to the server with the fewest active connections).

Load balancing can be applied to distribute work among teams or individuals. For instance, round-robin can ensure that tasks are equally assigned to team members, while the least-connection principle can help assign tasks to those with the lightest workload, ensuring no one is overloaded. This leads to more efficient task management, better resource allocation, and balanced work distribution.

MapReduce

MapReduce splits a large task into smaller sub-tasks (map step) that can be processed in parallel, then aggregates the results (reduce step) to provide the final output. In a large project, teams or individuals can be assigned sub-tasks that they can work on independently. Once all the sub-tasks are complete, the results can be aggregated to deliver the final outcome. This fosters parallelism, reduces bottlenecks, and allows for scalable team collaboration, especially for large or complex projects.

Deadlock Prevention (Banker’s Algorithm)

The Banker’s Algorithm is used to prevent deadlocks by allocating resources in such a way that there is always a safe sequence of executing processes, avoiding circular wait conditions. In managing interdependent teams or tasks, it’s important to avoid deadlocks where teams wait on each other indefinitely. By proactively ensuring that resources (e.g., knowledge, tools, approvals) are available before committing teams to work, project managers can prevent deadlock scenarios. Prioritizing resource allocation and anticipating dependencies can ensure steady progress without one team stalling another.

Consensus Algorithms (Paxos, Raft)

Consensus algorithms ensure that distributed systems agree on a single data value or decision, despite potential failures. Paxos and Raft are used to maintain consistency across distributed nodes. In projects involving multiple stakeholders or teams, reaching consensus on decisions can be challenging, especially with different priorities and viewpoints. Consensus-building techniques, inspired by these algorithms, could involve ensuring that key stakeholders agree before any significant action is taken, much like how Paxos ensures agreement across distributed systems. This avoids misalignment and fosters collaboration and trust across teams.

Rate Limiting

Rate limiting controls the number of requests or operations that can be performed in a given timeframe to prevent overloading a system. This concept applies to managing expectations, particularly in teams with multiple incoming requests. Rate limiting can be applied to protect teams from being overwhelmed by too many requests at once. By limiting how many tasks or requests a team can handle at a time, project managers can ensure a sustainable work pace and prevent burnout, much like how rate limiting helps protect system stability.

Summary

Above laws offer strategies for optimizing both technical systems and team dynamics. Amdahl’s Law and the Universal Scalability Law highlight the challenges of parallelizing work, emphasizing the need to manage coordination and communication overhead as bottlenecks when scaling teams or systems. Brook’s and Metcalfe’s Laws reveal the exponential growth of communication paths, suggesting that effective team scaling requires managing these paths to avoid coordination paralysis. Little’s Law and Kingman’s Formula suggest limiting work in progress and preventing 100% resource utilization to ensure reliable throughput, while Conway’s Law underscores the alignment between team structures and system architecture. Teams and their responsibilities should mirror modular architectures, fostering autonomy and reducing cross-team dependencies.

The Pareto Principle can guide teams to make small but impactful changes in architecture or processes that yield significant productivity improvements. Wirth’s Law and Parkinson’s Law serve as reminders to prevent work bloat and unnecessary complexity by setting clear deadlines and objectives. Dunbar’s Number highlights the human cognitive limit in maintaining external relationships, suggesting that team dependencies should be kept minimal to maintain effective collaboration. The consensus algorithms used in distributed systems can be applied to decision-making and collaboration, ensuring alignment among teams. Error correction algorithms are useful for feedback loops, helping teams iteratively improve. Similarly, techniques like load balancing strategies can optimize task distribution and workload management across teams.

Before applying these laws, it is essential to have clear goals, metrics, and KPIs to measure baselines and improvements. Prematurely implementing these scalability strategies can exacerbate issues rather than resolve them. The focus should be on global optimization of the entire organization or system, rather than focusing on local optimizations that don’t align with broader goals.

August 15, 2021

Structured Concurrency with Swift

Filed under: Concurrency,Uncategorized — Tags: , , , — admin @ 6:19 pm

I wrote about support of structured concurrency in Javascript/Typescript, Erlang/Elixir, Go, Rust, Kotlin and Swift last year (Part-I, Part-II, Part-III, Part-IV) but Swift language was still in development for async/await and actors support. The Swift 5.5 will finally have these new concurrency features available, which are described below:

Async/Await

As described in Part-IV, Swift APIs previously used completion handlers for asynchronous methods that suffered from:

  • Poor error handling because you could not use a single way to handle errors/exceptions instead separate callbacks for errors were needed
  • Difficult to cancel asynchronous operation or exit early after a timeout.
  • Requires a global reasoning of shared state in order to prevent race conditions.
  • Stack traces from the asynchronous thread don’t include the originating request so the code becomes hard to debug.
  • As Swift/Objective-C runtime uses native threads, creating a lot of background tasks results in expensive thread resources and may cause excessive context switching.
  • Nested use of completion handlers turn the code into a callback hell.

Following example shows poor use of control flow and deficient error handling when using completion handlers:

Swift

Though, use of Promise libraries help a bit but it still suffers from dichotomy of control flow and error handling. Here is equivalent code using async/await:

Swift

As you can see, above code not only improves control flow and adds uniform error handling but it also enhances readability by removing the nested structure of completion handlers.

Tasks Hierarchy, Priority and Cancellation

When a new task is created using async/await, it inherits the priority and local values of the parent task, which are then passed to the entire hierarchy of child tasks from the parent task. When a parent task is cancelled, the Swift runtime automatically cancels all child tasks, however Swift uses cooperative cancellation so child tasks must check for cancellation state otherwise they may continue to execute, however the results from cancelled tasks are discarded.

Continuations and Scheduling

Swift previously used native threads to schedule background tasks, where new threads were automatically created when a thread is blocked or waiting for another resource. The new Swift runtime creates native threads based on the number of cores and background tasks use continuations to schedule the background task on the native threads. When a task is blocked, its state is saved on the heap and another task is scheduled for processing on the thread. The await syntax suspends current thread and releases control until the child task is completed. This cooperative scheduling requires runtime support for non-blocking I/O operations and system APIs so that native threads are not blocked and continue to work on other background tasks. This also limits background tasks from using semaphores and locks, which are discussed below.

async function

In above example, when a thread is working on a background task “updateDatabase” that starts a child tasks “add” or “save”, it saves the tasks as continuations on heap. However, if current task is suspended then the thread can work on other tasks as shown below:

Multiple Asynchronous Tasks

The async/await in Swift also allows scheduling multiple asynchronous tasks and then awaiting for them later, e.g.

Swift

The async let syntax is called concurrent binding where the child task executes in parallel to the parent task.

Task Groups

The task groups allow dispatching multiple background tasks that are executed concurrently in background and Swift automatically cancels all child tasks when a parent task is cancelled. Following example demonstrates use of group API:

Swift

As these features are still in development, Swift has recently changed group.async API to group.addTask. In above example, images are downloaded in parallel and then for try await loop gathers results.

Data Races

Swift compiler will warn you if you try to mutate a shared state from multiple background tasks. In above example, the asynchronous task returns a tuple of image-id and image instead of mutating shared dictionary. The parent task then mutates the dictionary using the results from the child task in for try await loop.

Cancellation

You can also cancel a background task using cancel API or cancel all child tasks of a group using group.cancelAll(), e.g.

Swift

The Swift runtime automatically cancels all child tasks if any of the background task fails. You can store reference to a child task in an instance variable if you need to cancel a task in a different method, e.g.

Swift

As cancellation in Swift is cooperative, you must check cancellation state explicitly otherwise task will continue to execute but Swift will reject the results, e.g.

Swift

Timeout

The task or async/await APIs don’t directly support timeout so you must implement it manually similar to cooperative cancellation.

Semaphores and Locks

Swift does not recommend using Semaphores and Locks with background tasks because they are suspended when waiting for an external resource and can be later resumed on a different thread. Following example shows incorrect use of semaphores with background tasks:

Swift

TaskLocal

You can annotate certain properties with TaskLocal, which are stored in the context of Task and is available to the task and all of its children, e.g.

Swift

Detached Tasks (Unstructured)

Above tasks and async/await APIs are based on structured concurrency where parent task is not completed until all child background tasks are done with their work. However, Swift allows launching detached tasks that can continue to execute in background without waiting for the results, e.g.

Swift

Legacy APIs

The legacy code that use completion-handlers can use following continuation APIs to support async/await syntax:

Swift

In above example, the getPersistentPosts method used completion-handler and persistPosts method provides a bridge so that you can use async/await syntax. The resume method can only called once for the continuation. 

You may also save continuation in an instance variable when you need to resume in another method, e.g.

Swift

Implementing WebCrawler Using Async/Await

Following example shows implementation of WebCrawler using async/await described in Part I of the concurrency series:

Swift

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using try await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following are major features of the structured concurrency in Swift:

  • Concurrency scope?—?The async/await defines scope of concurrency where all child background tasks must be completed before returning from the asynchronous function.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling?—?Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • Swift runtime schedules asynchronous tasks on a fixed number of native threads and automatically suspends tasks when they wait for I/O or other resources.

Following are the major shortcomings in Swift for its support of structured concurrency:

  • The most glaring omission in above implementation is timeout, which is not supported in Swift’s implementation.
  • Swift runtime manages scheduling of tasks and you cannot pass your own execution dispatcher for scheduling background tasks.

Actors

Actor Model is a classic abstraction from 1970s for managing concurrency where an actor keeps its internal state private and uses message passing for interaction with its state and behavior. An actor can only work on one message at a time, thus it prevents any data races when accessing from multiple background tasks. I have previously written about actors and described them Part II of the concurrency series when covering Erlang and Elixir.

actor

Instead of creating a background task using serial queue such as:

Swift

The actor syntax simplifies such implementation and removes all boilerplate e.g.

Swift

Above syntax protects direct access to the internal state and you must use await syntax to access the state or behavior, e.g.

Swift

Priority Inversion Principle

The dispatch queue API applies priority inversion principle when a high priority task is behind low priority tasks, which bumps up the priority of low priority tasks ahead in the queue. The runtime environment then executes the high priority task after completing those low priority tasks. The actor API instead can choose high priority task directly from the actor’s queue without waiting for completion of the low priority tasks ahead in the queue.

Actor Reentrancy

If an actor invokes another actor or background task in its function, it may get suspended until the background task is completed. In the meantime, another client may invoke the actor and modify its state so you need to check assumptions when changing internal state. A continuation used for the background task may be scheduled on a different thread after resuming the work, you cannot rely on DispatchSemaphore, NSLock, NSRecursiveLock, etc. for synchronizations.

Following code from WWDC-2021 shows how reentrancy can be handled safely:

Swift

The ImageDownloader actor in above example downloads and caches the image and while it’s downloading an image. The actor will be suspended while it’s downloding the image but another client can reenter the downloadAndCache method and download the same image. Above code prevents duplicate requests and reuses existing request to serve multiple concurrent clients.

Actor Isolation

The actors in Swift prevent invoking methods directly but you can annotate methods with nonisolated if you need to call them directly but those methods cannot mutate state, e.g.

Swift

Sendable

The actors requires that any data structure used in its internal state are thread safe and implement Sendable protocol such as:

  • Value types
  • Actors
  • Immutable classes
  • Synchronized classes
  • @Sendable Functions
Swift

@MainActor

The UI apps require that all UI updates are performed on the main thread and previously you had to dispatch UI work to DispatchQueue.main queue. Swift now allows marking functions, classes or structs with a special annotations of @MainActor where the functions are automatically executed on the main thread, e.g.

Swift

Following example shows how a view-controller can be annotated with the @MainActor annotations:

Swift

In above example, all methods for MyViewController are executed on the main thread, however you can exclude certain methods via nonisolated keyword.

@globalActor

The @globalActor annotation defines a singleton global actor and @MainActor is a kind of global actor. You can also define your own global actor such as:

Swift

Message Pattern Matching

As actors in Swift use methods to invoke operations on actor, they don’t support pattern matching similar to Erlang/Elixir, which offer selecting next message to process by comparing one or more fields in the message.

Local only

Unlike actors in Erlang or Elixir, actors in Swift can only communicate with other actors in the same process or application and they don’t support distributed communication to remote actors.

Actor Executor/Dispatcher

The actor protocol defines following property to access the executor:

Swift

However, unownedExecutor is a read-only property that cannot be changed at this time.

Implementing WebCrawler Using Actors and Tasks

Following example shows implementation of WebCrawler using actors and tasks described in Part I of the concurrency series:

Swift

Above implementation uses actors for processing crawling requests but it shares other code for parsing and downloading web pages. As an actor provides a serialize access to its state and behavior, you can’t use a single actor to implement a highly concurrent web crawler. Instead, you may divide the web domain that needs to be crawled into a pool of actors that can share the work.

Performance Comparison

Following table from Part-IV summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

You can download full code for Swift example from https://github.com/bhatti/concurency-katas/tree/main/swift.

Overall, Swift’s new features for structured concurrency including async/await and actors is a welcome addition to its platform. On the downside, Swift concurrency APIs lack support for timeouts, customized dispatcher/executors and micro benchmarks showed higher overhead than expected. However, on the positive side, the Swift runtime catches errors due to data races and the new async/await/actors syntax prevents bugs that were previously caused by incorrect use of completion handlers and error handling. This will help developers write more robust and responsive apps in the future.

December 26, 2020

Applying Structured Concurrency patterns in GO with case-study of Gitlab-runner

Filed under: Concurrency,GO — admin @ 9:31 pm

I recently wrote a series of blogs on structured concurrency (Part-I, Part-II, Part-III, Part-IV) and how it improves readability, concurrency-scope, composition and flow-control of concurrent code and adds better support for error-handling, cancellation, and timeout. I have been using GO on a number of projects over last few years and I will share a few concurrency patterns that I have used or seen in other projects such as gitlab-runner. I demonstrated in above series how structured concurrency considers GO statement harmful similar to GOTO in structured programming. Just as structured programming replaced GOTO with control-flow primitives such as single-entry/exit, if-then, loop, and functions calls; structured concurrency provides scope of concurrency where parent waits for all asynchronous code. I will show how common concurrency patterns in GO can take advantage of structured concurrency.

Asynchronous Tasks

The primary purpose of goroutines is to perform asynchronous tasks where you might be requesting a data from remote API or database, e.g. here is a sample code from gitlab-runner that uses goroutines to copy archive artifacts:

Go

Here is a async/await based syntax based on async_await.go that performs similar task using structured concurrency:

Go

Above code defines scope of concurrency and adds support for timeout while making the code easier to comprehend.

Note: due to lack of generics in GO, interface{} are used to accept different types that may be passed to asynchronous tasks.

Racing Asynchronous Tasks

A common use of goroutines is spawning multiple asynchronous tasks and takes result of first task that completes e.g. here is a sample code from gitlab-runner that uses goroutines to copy stdout and stderr:

Go

Above code creates stdoutErrCh channel to capture errors from stdout and stdinErrCh channel to capture errors from stderr and then waits for either to finish.

Here is equivalent code that uses structured concurrency with async/await primitives from async_racer.go:

Go

Above code uses async/await syntax to define scope of concurrency and clarifies intent of the business logic without distraction of concurrency logic.

Performing cleanup when task is aborted or cancelled

If a goroutine spawns an external process for background work, you may need to kill that process in case goroutine task is cancelled or times out. For example, here is a sample code from gitlab-runner that calls KillAndWait function to terminate external process when context.Done() is invoked:

Go

In above, Run method starts a command in goroutine, waits for completion in another goroutine and then listens to response from waitCh and context.Done channels.

Here is how async/await structure from async_await.go can apply structured concurrency to above code:

Go

Using GO Channels as data pipe/queue

GO channels are designed based on CSP rendezvous primitives where both sender and receiver have to wait to exchange messages. However, you can add buffering to make these channels as bounded queue (for back-pressure). Here is an example code from gitlab-runner that uses channels to stream log messages:

Go

Above code creates a channel linesCh without any buffer and then creates a goroutine where logs are read and sent to linesCh channel. As I mentioned, above code will block sender until the receiver is ready to receive these log messages and you may lose log messages if receiver is slow and goroutine is killed before logs can be read. Though, structured concurrency cannot help in this case but we can simply use an event-bus or a local message-queue to stream these logs.

WaitGroup to wait for completion of goroutines

GO language supports sync.WaitGroup to wait for completion of goroutines but it’s redundant if you are also using channels to receive for reply. For example, here is a sample code from gitlab-runner that uses WaitGroup to wait for completion of groutines:

Go

Here is how above code can be replaced by async/await syntax from my async_await.go:

Go

Above code is not shorter than the original but it is more readable and eliminates subtle bugs where you might be using WaitGroup incorrectly, thus resulting in deadlock.

Fork-Join based Asynchronous Tasks

One common use of goroutines is to spawn multiple asynchronous tasks and wait for their completion similar to fork-join pattern, e.g. here is a sample code from gitlab-runner that uses goroutines to perform cleanup of multiple services and sends back result via a channel.

Go

The cleanupServices method above goes through a collection of services and then calls deleteKubernetesService in goroutine, which calls kubernetes API to remove given service. It waits for all background using WaitGroup and then receives any errors from error channel and logs them.

Here is how you can use async/await code from async_await.go that applies structured concurrency by abstracting low-level goroutines and channels:

Go

Above code uses structured concurrency by defining scope of asynchronous code in cleanupServices and adds better support for cancellation, timeout and error handling. Also, you don’t need to use WaitGroup to wait for completion anymore.

Polling Asynchronous Task

In some cases, you may need to poll a background task to check its status or wait for its completion, e.g. here is a sample code from gitlab-runner that waits for pod until it’s running:

Go

The waitForPodRunning method above repeatedly calls triggerPodPhaseCheck, which creates a goroutine and then invokes kubernetes API to get pod status. It then returns pod status in a channel that waitForPodRunning listens to.

Here is equivalent code using async/await from async_polling.go:

Go

Above code removes complexity due to manually polling and managing goroutines/channels.

Background Task with watchdog

Another concurrency pattern in GO involves starting a background task but then launch another background process to monitor the task or its runtime environment so that it can terminate background task if watchdog finds any errors. For example, here is a sample code from gitlab-runner that executes a command inside kubernetes pod and then it launches another goroutine to monitor pod status, i.e.,

Go

In above code, runWithAttach calls watchPodStatus to monitor status of the pod in background goroutine and then executes command in runInContainer.

Here is equivalent code using async/await from async_watchdog.go:

Go

Above code removes extraneous complexity due to concurrent code embedded with functional code and makes it easier to comprehend with improved support of concurrency scope, error handling, timeout and cancellation.

Other accidental complexity in Gitlab-Runner

Besides concurrency, here are a few other design choices that adds accidental complexity in gitlab-runner:

Abstraction

The primary goal of gitlab-runner is to abstract executor framework so that it can use different platforms such as Docker, Kubernetes, SSH, Shell, etc to execute processes. However, it doesn’t define an interface for common behavior such as managing runtime containers or executing a command for these executors.

Adapter/Gateway pattern

A common pattern to abstract third party library or APIs is to use an adapter or gateway pattern but gitlab-runner mixes external APIs with internal executor logic. The kubernetes executor in gitlab-runner defines logic for both interacting with external Kubernetes server and managing Kubernetes Pod or executing processes inside those pods. For example, my initial intent for looking at the gitlab-runner was to adopt APIs for interacting with Kubernetes but I could not reuse any code as a library and instead I had to copy relevant code for my use-case.

Separation of Concerns

The gitlab-runner is not only riddled with concurrency related primitives such as goroutines and channels but it also mixes other aspects such as configuration, feature-flags, logging, monitoring, etc. For example, it uses configurations for defining containers, services, volumes for Kubernetes but it hard codes various internal configurations for build, helpers, monitoring containers instead of injecting them via external configuration. Similarly, it hard codes volumes for repo, cache, logging, etc. A common design pattern in building software is to use layers of abstractions or separation of concerns where each layer or a module is responsible for a single concern (single-responsibility principle). For example, you can divide executors into three layers: adapter-layer, middle-layer and high-level layer where adapter layer strictly interacts with underlying platform such as Kubernetes and no other layer needs to know internal types or APIs of Kubernetes. The middle layer uses adapter layer to delegate any executor specific behavior and defines methods to configure containers. The high-level layer is responsible for injecting dependent services, containers and configurations to execute jobs within the gitlab environment. I understand some of these abstractions might become leaky if there are significant differences among executors but such design allows broader reuse of lower-level layers in other contexts.

Summary

Though, modern scalable and performant applications demand concurrency but sprinkling low-level concurrency patterns all over your code adds significant accidental or extraneous complexity to your code. In this blog, I demonstrated how encapsulating concurrent code with library of structured concurrency patterns can simplify your code. Concurrency is hard especially with lower-level primitives such as WaitGroup, Mutex, goroutines, channels in GO. In addition, incorrect use of these concurrency primitives can lead to deadlocks or race conditions. Using a small library for abstracting common concurrency patterns can reduce probability of concurrency related bugs. Finally, due to lack of immutability, strong ownership, generics or scope of concurrency in GO, you still have to manage race conditions and analyze your code carefully for deadlocks but applying structured concurrency can help manage concurrent code better.

Powered by WordPress