Shahzad Bhatti Welcome to my ramblings and rants!

August 15, 2021

Structured Concurrency with Swift

Filed under: Concurrency,Uncategorized — Tags: , , , — admin @ 6:19 pm

I wrote about support of structured concurrency in Javascript/Typescript, Erlang/Elixir, Go, Rust, Kotlin and Swift last year (Part-I, Part-II, Part-III, Part-IV) but Swift language was still in development for async/await and actors support. The Swift 5.5 will finally have these new concurrency features available, which are described below:

Async/Await

As described in Part-IV, Swift APIs previously used completion handlers for asynchronous methods that suffered from:

  • Poor error handling because you could not use a single way to handle errors/exceptions instead separate callbacks for errors were needed
  • Difficult to cancel asynchronous operation or exit early after a timeout.
  • Requires a global reasoning of shared state in order to prevent race conditions.
  • Stack traces from the asynchronous thread don’t include the originating request so the code becomes hard to debug.
  • As Swift/Objective-C runtime uses native threads, creating a lot of background tasks results in expensive thread resources and may cause excessive context switching.
  • Nested use of completion handlers turn the code into a callback hell.

Following example shows poor use of control flow and deficient error handling when using completion handlers:

func fetchThumbnails(for ids: [String],
    completion handler: @escaping ([String: UIImage]?, Error?) -> Void) {
    guard let id = ids.first else { return handler([:], nil) }
    let request = thumbnailURLRequest(for: id)
    URLSession.shared.dataTask(with: request) { data, response, error in
        guard let response = response,
              let data = data else { return handler(nil, error) } // Poor error handling
        UIImage(data: data)?.prepareThumbnail(of: thumbSize) { image in
            guard let image = image else { return handler(nil, ThumbnailError()) }
        }
        fetchThumbnails(for: Arrays(ids.dropFirst()) { thumbnail, error in
            // cannot use loop
            ...
        }
    }
}

Though, use of Promise libraries help a bit but it still suffers from dichotomy of control flow and error handling. Here is equivalent code using async/await:

func fetchThumbnails(for ids: [String]) async throws -> [String: UIImage] {
	let thumbnails: [String: UIImage] = [:]
    for id in ids {
    	let request = thumbnailURLRequest(for: id)
        let (data, response) = try await URLSession.shared.dataTask(for: request)
        try validateResponse(response)
        guard let image = await UIImage(data: data)?.byPreparingThumbnail(ofSize: thumbSize) else { throw ThumbnailError()) }
        thumbnails[id] = image
    }
    return thumbnails
}

As you can see, above code not only improves control flow and adds uniform error handling but it also enhances readability by removing the nested structure of completion handlers.

Tasks Hierarchy, Priority and Cancellation

When a new task is created using async/await, it inherits the priority and local values of the parent task, which are then passed to the entire hierarchy of child tasks from the parent task. When a parent task is cancelled, the Swift runtime automatically cancels all child tasks, however Swift uses cooperative cancellation so child tasks must check for cancellation state otherwise they may continue to execute, however the results from cancelled tasks are discarded.

Continuations and Scheduling

Swift previously used native threads to schedule background tasks, where new threads were automatically created when a thread is blocked or waiting for another resource. The new Swift runtime creates native threads based on the number of cores and background tasks use continuations to schedule the background task on the native threads. When a task is blocked, its state is saved on the heap and another task is scheduled for processing on the thread. The await syntax suspends current thread and releases control until the child task is completed. This cooperative scheduling requires runtime support for non-blocking I/O operations and system APIs so that native threads are not blocked and continue to work on other background tasks. This also limits background tasks from using semaphores and locks, which are discussed below.

async function

In above example, when a thread is working on a background task “updateDatabase” that starts a child tasks “add” or “save”, it saves the tasks as continuations on heap. However, if current task is suspended then the thread can work on other tasks as shown below:

Multiple Asynchronous Tasks

The async/await in Swift also allows scheduling multiple asynchronous tasks and then awaiting for them later, e.g.

struct MarketData {
    let symbol: String
    let price: Int
    let volume: Int
}

struct HistoryData {
    let symbol: String
    let history: [Int]
    func sum() -> Int {
      history.reduce(0, +)
    }
}

func fetchMarketData(symbol: String) async throws -> MarketData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(MarketData(symbol: symbol, price: 10, volume: 200)))
        }
    }
}

func fetchHistoryData(symbol: String) async throws -> HistoryData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(HistoryData(symbol: symbol, history: [5, 10, 15, 20])))
        }
    }
}

func getMovingAverage(symbol: String) async throws -> Int {
    async let marketData = fetchMarketData(symbol: symbol)
    async let historyData = fetchHistoryData(symbol: symbol)
    let sum = try await marketData.price + historyData.sum()
    return try await sum / (historyData.history.count+1)
}

The async let syntax is called concurrent binding where the child task executes in parallel to the parent task.

Task Groups

The task groups allow dispatching multiple background tasks that are executed concurrently in background and Swift automatically cancels all child tasks when a parent task is cancelled. Following example demonstrates use of group API:

func downloadImage(id: String) async throws -> UIImage {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(UIImage(data: [])))
        }
    }
}

func downloadImages(ids: [String]) async throws -> [String: UIImage] {
    var images: [String: UIImage] = [:]
    try await withThrowingTaskGroup(of: (String, UIImage).self) { group in
        for id in ids {
            group.addTask(priority: .background) {
                return (id, try await downloadImage(id: id))
            }
        }
        for try await (id, image) in group {
            images[id] = image
        }
    }
    return images
}

As these features are still in development, Swift has recently changed group.async API to group.addTask. In above example, images are downloaded in parallel and then for try await loop gathers results.

Data Races

Swift compiler will warn you if you try to mutate a shared state from multiple background tasks. In above example, the asynchronous task returns a tuple of image-id and image instead of mutating shared dictionary. The parent task then mutates the dictionary using the results from the child task in for try await loop.

Cancellation

You can also cancel a background task using cancel API or cancel all child tasks of a group using group.cancelAll(), e.g.

group.cancelAll()

The Swift runtime automatically cancels all child tasks if any of the background task fails. You can store reference to a child task in an instance variable if you need to cancel a task in a different method, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]

    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
              ...
            }
    }
    func collectionView(_ collectionView: UICollectionView,
        didEndDisplaying cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
        imageTasks[item]?.cancel()
    }
}

As cancellation in Swift is cooperative, you must check cancellation state explicitly otherwise task will continue to execute but Swift will reject the results, e.g.

if Task.isCancelled {
    return // return early
}

Timeout

The task or async/await APIs don’t directly support timeout so you must implement it manually similar to cooperative cancellation.

Semaphores and Locks

Swift does not recommend using Semaphores and Locks with background tasks because they are suspended when waiting for an external resource and can be later resumed on a different thread. Following example shows incorrect use of semaphores with background tasks:

func updateDatabase(_ asyncUpdateDatabase: @Sendable @escaping () async -> Void {
  let semaphore = DispatchSemaphore(value: 0)
  Task {
    await asyncUpdateDatabase()
    semaphore.signal()
  }
  semaphore.wait() // Do not use unsafe primitives to wait across task boundaries
}

TaskLocal

You can annotate certain properties with TaskLocal, which are stored in the context of Task and is available to the task and all of its children, e.g.

enum TracingExample {
    @TaskLocal
    static let traceID: TraceID?
}
...
guard let traceID = TracingExample.traceID else {
  print("no trace id")
  return
}
print(traceID)

Detached Tasks (Unstructured)

Above tasks and async/await APIs are based on structured concurrency where parent task is not completed until all child background tasks are done with their work. However, Swift allows launching detached tasks that can continue to execute in background without waiting for the results, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]
    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
                defer { imageTasks[item] = nil }
                let images = try await getImages(for: ids)
                Task.detached(priority: .background) {
                    await withThrowingTaskGroup(of: Void.self) { g in
                        g.addTask { try await addImageCache(for: images) }
                        g.addTask { try await logImages(for: images) }
                    }
                }
                display(images, in: cell)
            }
    }
}

Legacy APIs

The legacy code that use completion-handlers can use following continuation APIs to support async/await syntax:

func persistPosts() async throws -> [Post] {
    typealias PostContinuation = CheckedContinuation<[Post], Error>
    return try await withCheckedThrowingContinuation { (continuation: PostContinuation) in
        self.getPersistentPosts { posts, error in
            if let error = error {
                continuation.resume(throwing: error)
            } else {
                continuation.resume(returning: posts)
            }
        }
    }
}

In above example, the getPersistentPosts method used completion-handler and persistPosts method provides a bridge so that you can use async/await syntax. The resume method can only called once for the continuation. 

You may also save continuation in an instance variable when you need to resume in another method, e.g.

class MyViewController: UIViewController {
    private var activeContinuation: CheckedContinuation<[Post], Error>?
    func sharePostsFromPeer() async throws -> [Post] {
        try await withCheckedThrowingContinuation { continuation in
            self.activeContinuation = continuation
            self.peerManager.syncSharedPosts()
        }
    }
}
extension MyViewController: PeerSyncDelegate {
    func peerManager(_ manager: PeerManager, received posts: [Post]) {
        self.activeContinuation?.resume(returning: posts)
        self.activeContinuation = nil
    }
    func peerManager(_ manager: PeerManager, hadError error: Error) {
        self.activeContinuation?.resume(throwing: error)
        self.activeContinuation = nil
    }
}

Implementing WebCrawler Using Async/Await

Following example shows implementation of WebCrawler using async/await described in Part I of the concurrency series:

import Foundation
struct Request {
    let url: String
    let depth: Int
    let deadline: DispatchTime
}
enum CrawlError: Error {
    case timeoutError(String)
}
let MAX_DEPTH = 4
let MAX_URLS = 11
let DOMAINS = [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "gh.com",
  "hi.com",
  "ij.com",
  "jk.com",
  "kl.com",
  "lm.com",
  "mn.com",
  "no.com",
  "op.com",
  "pq.com",
  "qr.com",
  "rs.com",
  "st.com",
  "tu.com",
  "uv.com",
  "vw.com",
  "wx.com",
  "xy.com",
  "yz.com",
];
public func crawl(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawl(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}
public func crawlWithActors(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawlWithActors(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}

///////////////// PRIVATE METHODS ////////////////
func doCrawl(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    try await withThrowingTaskGroup(of: (Request, Int).self) { group in
        for req in requests {
	    group.addTask(priority: .background) {
	        return (req, try await handleRequest(req))
	    }
        }
        for try await (req, childURLs) in group {
	    if totalChildURLs % 10 == 0 {
		print("received request \(req)")
	    }
	    totalChildURLs += childURLs
        }
    }
    return totalChildURLs
}
func doCrawlWithActors(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    let crawler = CrawlActor()
    for req in requests {
     	let childURLs = try await crawler.handle(req)
	totalChildURLs += childURLs
    }
    return totalChildURLs
}
func handleRequest(_ request: Request) async throws -> Int {
    let contents = try await download(request.url)
    let newContents = try await jsrender(request.url, contents)
    if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
        try await index(request.url, newContents)
        let urls = try await parseURLs(request.url, newContents)
        let childURLs = try await doCrawl(urls: urls, depth: request.depth + 1, deadline: request.deadline)
        return childURLs + 1
    } else {
        return 0
    }
}
func download(_ url: String) async throws -> String {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    return randomString(100)
}
func jsrender(_ url: String, _ contents: String) async throws -> String {
    // for SPA apps that use javascript for rendering contents
    return contents
}
func index(_ url: String, _ contents: String) async throws {
    // apply standardize, stem, ngram, etc for indexing
}
func parseURLs(_ url: String, _ contents: String) async throws -> [String] {
    // tokenize contents and extract href/image/script urls
    var urls = [String]()
    for _ in 0..<MAX_URLS {
        urls.append(randomUrl())
    }
    return urls
}
func hasContentsChanged(_ url: String, _ contents: String) -> Bool {
    return true
}
func isSpam(_ url: String, _ contents: String) -> Bool {
    return false
}
func randomUrl() -> String {
    let number = Int.random(in: 0..<WebCrawler.DOMAINS.count)
    return "https://" + WebCrawler.DOMAINS[number] + "/" + randomString(20)
}
func randomString(_ length: Int) -> String {
  let letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
  return String((0..<length).map{ _ in letters.randomElement()! })
}

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using try await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following are major features of the structured concurrency in Swift:

  • Concurrency scope?—?The async/await defines scope of concurrency where all child background tasks must be completed before returning from the asynchronous function.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling?—?Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • Swift runtime schedules asynchronous tasks on a fixed number of native threads and automatically suspends tasks when they wait for I/O or other resources.

Following are the major shortcomings in Swift for its support of structured concurrency:

  • The most glaring omission in above implementation is timeout, which is not supported in Swift’s implementation.
  • Swift runtime manages scheduling of tasks and you cannot pass your own execution dispatcher for scheduling background tasks.

Actors

Actor Model is a classic abstraction from 1970s for managing concurrency where an actor keeps its internal state private and uses message passing for interaction with its state and behavior. An actor can only work on one message at a time, thus it prevents any data races when accessing from multiple background tasks. I have previously written about actors and described them Part II of the concurrency series when covering Erlang and Elixir.

actor

Instead of creating a background task using serial queue such as:

final class Counter {
    private var queue = DispatchQueue(label: "counter.queue")
    private var _count : Int = 0
    var count: Int {
        queue.sync {
            _count
        }
    }

    func incr() {
        queue.async(flags: .barrier) {
            self._count += 1
        }
    }
    func decr() {
        queue.async(flags: .barrier) {
            self._count -= 1
        }
    }
}

The actor syntax simplifies such implementation and removes all boilerplate e.g.

actor Counter {
    var count: Int = 0
    func incr() {
        count += 1
    }
    func decr() {
        count -= 1
    }
}

Above syntax protects direct access to the internal state and you must use await syntax to access the state or behavior, e.g.

Task {
	let c = Counter()
    await withTaskGroup(of: Void.self) { group in
        for i in 0..<100 {
            group.async {
                await c.incr()
            }
        }
    }
    print("count \(await c.count)")
}

Priority Inversion Principle

The dispatch queue API applies priority inversion principle when a high priority task is behind low priority tasks, which bumps up the priority of low priority tasks ahead in the queue. The runtime environment then executes the high priority task after completing those low priority tasks. The actor API instead can choose high priority task directly from the actor’s queue without waiting for completion of the low priority tasks ahead in the queue.

Actor Reentrancy

If an actor invokes another actor or background task in its function, it may get suspended until the background task is completed. In the meantime, another client may invoke the actor and modify its state so you need to check assumptions when changing internal state. A continuation used for the background task may be scheduled on a different thread after resuming the work, you cannot rely on DispatchSemaphore, NSLock, NSRecursiveLock, etc. for synchronizations.

Following code from WWDC-2021 shows how reentrancy can be handled safely:

actor ImageDownloader {
    private enum CacheEntry {
        case inProgress(Task.Handle<Image, Error>)
        case ready(Image)
    }
    private var cache: [URL: CacheEntry] = [:]
    func downloadAndCache(from url: URL) async throws -> Image? {
        if let cached = cache[url] {
            switch cached {
            case .ready(let image):
                return image
            case .inProgress(let handle):
                return try await handle.get()
            }
        }
        let handle = async {
            try await downloadImage(from: url)
        }
        cache[url] = .inProgress(handle)
        do {
            let image = try await handle.get()
            cache[url] = .ready(image)
            return image
        } catch {
            cache[url] = nil
            throw error
        }
    }
}

The ImageDownloader actor in above example downloads and caches the image and while it’s downloading an image. The actor will be suspended while it’s downloding the image but another client can reenter the downloadAndCache method and download the same image. Above code prevents duplicate requests and reuses existing request to serve multiple concurrent clients.

Actor Isolation

The actors in Swift prevent invoking methods directly but you can annotate methods with nonisolated if you need to call them directly but those methods cannot mutate state, e.g.

actor Account {
    let id: String
    var balance: Double = 0
    init(id: String) {
        self.id = id
    }
}
extension Account: Hashable {
    nonisolated func hash(into hasher: inout Hasher) {
        hasher.combine(id)
   }
   static func == (lhs: Account, rhs: Account) -> Bool {
        return lhs.id == rhs.id
   }
}

Sendable

The actors requires that any data structure used in its internal state are thread safe and implement Sendable protocol such as:

  • Value types
  • Actors
  • Immutable classes
  • Synchronized classes
  • @Sendable Functions
struct Book: Sendable {
    var title: String
    var authors: [Author]
}

@MainActor

The UI apps require that all UI updates are performed on the main thread and previously you had to dispatch UI work to DispatchQueue.main queue. Swift now allows marking functions, classes or structs with a special annotations of @MainActor where the functions are automatically executed on the main thread, e.g.

@MainActor func checkedOut(_ books: [Book]) {
  booksView.checkedOutBooks = books
}
...
await checkedOut(booksOnLoan)

Following example shows how a view-controller can be annotated with the @MainActor annotations:

@MainActor class MyViewController: UIViewController {
  func onPress() .... // implicitly on main-thread
  nonisolated func fetch() async {
    ...

In above example, all methods for MyViewController are executed on the main thread, however you can exclude certain methods via nonisolated keyword.

@globalActor

The @globalActor annotation defines a singleton global actor and @MainActor is a kind of global actor. You can also define your own global actor such as:

@globalActor
public struct GlobalSettings {
  public actor SettingsActor {
     func rememberPassword() -> Bool {
        return UserDefaults.standard.bool(forKey: "rememberPassword")
     }
  }

  public static let shared = SettingsActor()
}

...
let rememberPassword = await GlobalSettings.shared.rememberPassword()

Message Pattern Matching

As actors in Swift use methods to invoke operations on actor, they don’t support pattern matching similar to Erlang/Elixir, which offer selecting next message to process by comparing one or more fields in the message.

Local only

Unlike actors in Erlang or Elixir, actors in Swift can only communicate with other actors in the same process or application and they don’t support distributed communication to remote actors.

Actor Executor/Dispatcher

The actor protocol defines following property to access the executor:

var unownedExecutor: UnownedSerialExecutor

However, unownedExecutor is a read-only property that cannot be changed at this time.

Implementing WebCrawler Using Actors and Tasks

Following example shows implementation of WebCrawler using actors and tasks described in Part I of the concurrency series:

import Foundation
actor CrawlActor {
    public func handle(_ request: Request) async throws -> Int {
	let contents = try await download(request.url)
	let newContents = try await jsrender(request.url, contents)
  	if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
    	    try await index(request.url, newContents)
    	    let urls = try await parseURLs(request.url, newContents)
    	    let childURLs = try await doCrawlWithActors(urls: urls, depth: request.depth + 1, deadline: request.deadline)
    	    return childURLs + 1
  	} else {
    	    return 0
  	}
    }
}

Above implementation uses actors for processing crawling requests but it shares other code for parsing and downloading web pages. As an actor provides a serialize access to its state and behavior, you can’t use a single actor to implement a highly concurrent web crawler. Instead, you may divide the web domain that needs to be crawled into a pool of actors that can share the work.

Performance Comparison

Following table from Part-IV summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

You can download full code for Swift example from https://github.com/bhatti/concurency-katas/tree/main/swift.

Overall, Swift’s new features for structured concurrency including async/await and actors is a welcome addition to its platform. On the downside, Swift concurrency APIs lack support for timeouts, customized dispatcher/executors and micro benchmarks showed higher overhead than expected. However, on the positive side, the Swift runtime catches errors due to data races and the new async/await/actors syntax prevents bugs that were previously caused by incorrect use of completion handlers and error handling. This will help developers write more robust and responsive apps in the future.

March 24, 2021

Task Scheduling Algorithms

Filed under: Computing,Technology — admin @ 8:16 pm

A task is defined as a unit of work and a task scheduler is responsible for selecting best task for execution based on various criteria. The criteria for selecting best scheduling policy includes:

  • Maximize resource utilization such as CPU or Memory but not exceeding a certain limit such as 80%.
  • Maximize throughput such as tasks per seconds.
  • Minimize turnaround or wall time from task submission to the completion time.
  • Minimize waiting time in ready queue before executing the task.

Above criteria assumes that pending tasks will be stored in a bounded ready-queue before execution so that no new tasks will be stored when the queue reaches the maximum capacity. In this context, the task is more coarse grained and executes to the completion as compared to CPU scheduling that may use preemptive or cooperative scheduling with context switching to dispatch different processes for execution.

Following is a list of common scheduling algorithms:

First-Come First-Serve (FCFS)

This algorithm simply uses FIFO queue to process the tasks in the order they are queued. On the downside, a long-running task can block other tasks and yield very large average wait times in the queue.

Shortest Job-First (SJF)

This algorithm picks smallest job that needs to be done and then next smallest job. It results in best performance, however it may not be possible to predict runtime of a job before the execution. You may need to pass hints about the job runtime or use historical data to predict the job runtime with exponential average such as:

NewEstimatedRuntime = alpha * PreviousActualRuntime + (1.0 - alpha) * PreviousEstimatedRuntime

where alpha is between 0.0 and 1.0 and represents a weighting factor for the relative importance of recent data compared to older data, e.g. alpha with 0 value ignores previous actual time and 1.0 ignores past history of estimates.

Priority Scheduling

This algorithm picks highest priority job that needs to be done and then next highest priority job. It may result in starvation of low-priority tasks and they may wait indefinitely. You can fix this drawback by supporting aging where priority of old tasks is slowly bumped.

Earliest Deadline

This algorithm gives highest priority to the task with the earliest deadline, which is then used to pick the next task to execute. This scheduler improves resource utilization based on the estimated runtime and data requirements.

Speculative Scheduler

The speculative scheduler detects slow running task and starts another instance of the task as a backup to minimize the response time. This generally works for short jobs with homogeneous resources but it doesn’t guarantee complete reliability.

Multilevel Queues

This algorithm allows categorizing tasks and then dispatching the task to the queue based on the category, e.g. you may define distinct queues for small/medium/large tasks based on runtime or definer queues for interactive/background/system/batch tasks. In some cases, a task may need to be jumped from one queue to another based on runtime characteristics of the task and you can use aging and priority inversion to promote low-priority tasks.

Resource aware scheduler

This scheduler tracks provisioned resources and their utilization required by the tasks. The scheduler may simply allocate resources they need when scheduling the tasks or use admission-control algorithm to prevent certain tasks to start until the required resources are available.

Matchmaking scheduler

This scheduler uses affinity based scheduling so that it routes the tasks to workers or nodes where the data resides locally and provide greater data locality.

Delay scheduler

This scheduler waits until the data required by the task is available on the node or worker where task will run to improve data locality. However, in order to prevent starvation, it allows scheduling tasks to another worker or node.

Capacity scheduler

This scheduler shares system resources among tasks where tasks specify the required resources such as memory and CPU. The scheduler tracks resource utilization and allocates resources specified by the tasks.

Fair scheduler

This scheduler ensures that common resources such as CPU, memory, disk and I/O are shared fairly among different tasks.

References



January 10, 2021

Building A Decentralized Messaging with End-to-End Encryption using Open Standards

Filed under: Computing,Encryption,Messaging — Tags: , — admin @ 4:00 pm

Abstract

A number of messaging apps such as FB Messenger, Telegram, Matrix Element, Signal, Status.im, Threema, Whatsapp, etc. offer an end-to-end encryption messaging, however they all use proprietary APIs for message storage/relaying, contacts discovery, keys management, group-administration and other services. These centralized APIs can be used to extract metadata from messages, to build social graph from the contacts or to forge message order/acknowledgements or group membership with malicious access. Though, most of these apps support forward secrecy and post-compromise security (PCS) for pairwise communication but some discard PCS guarantees in groups communication and incur high cost for updating users’ keys with large groups. This paper reviews popular messaging apps for their capabilities and security profile. It explores industry standards for open messaging protocols that can be used to build a secured, decentralized and scalable messaging system with end-to-end encryption and post-compromise security guarantees for both pairwise and group communication. It then proposes an architecture with open standards and decentralized building blocks such as identity, keys management, storage and messaging protocol that can be used to build a trustless and secured messaging system. Lastly, this paper shows how this decentralized architecture can be used to build other high-level decentralized applications for communication, collaboration, payments, social platforms, etc.

Note: This paper only examines text messaging between two parties or among groups and other features such as public channels, bots, payments, WebRTC, audio and video chat are out of scope for this discussion.

Messaging Apps

Following section surveys popular messaging apps and evaluates their strengths and weaknesses:

Survey of Popular Messaging Apps

Briar

Briar is a decentralized messaging app with end-to-end encryption that is designed for activists, journalists and general public. It can sync messages via a variety of network protocols such as Bluetooth, Wi-Fi or Tor [51]. The messages are encrypted using Bramble Transport Protocol (BTP) that supports confidentiality, integrity, authenticity, and forward secrecy. BTP establishes a root key between two parties and derives a temporary key for encrypting communication. The temporary key is rotated periodically based on time periods. It supports group communication by defining a group as a label that can subscribe to a channel similar to Usenet.

Facebook Messenger

Facebook Messenger is a closed-source messaging app that uses Signal protocol for end-to-end encryption. Facebook uses centralized authentication and delivery services for message routing. Its privacy policies are quite appalling and per Apple’s privacy polices, it collects a lot of metadata from users such as [59]:

  • Purchase History
  • Other Financial Info
  • Precise Location
  • Coarse Location
  • Physical Address
  • Email Address
  • Name
  • Phone Number
  • Other User Contact Info
  • Contacts
  • Photos or Videos
  • Gameplay Content
  • Other User Content
  • Search History
  • Browsing History
  • User ID
  • Device ID
  • Product Interaction
  • Advertising Data
  • Other Usage Data
  • Crash Data
  • Performance Data
  • Other Diagnostic Data
  • Other Data Types
  • Browsing History
  • Health
  • Fitness
  • Payment Info
  • Photos or Videos
  • Audio Data
  • Gameplay Content
  • Customer Support
  • Other User Content
  • Search History
  • Sensitive Info
  • iMessage
  • Email address
  • Phone number Search history
  • Device ID

The IOS Messenger disables App Protection (ATS) that can lead to insecure network connections and uses audio, fetch, location, remote-notification and void modes in background. The Android Messenger apps uses smaller than 1024 bits for signing the app and allows dynamic code loading that can be used to inject malicious code. The Android app also uses addJavaScriptInterface() that allows calling native operations from Javascript that can be used for malicious access.

Session App

Session is an open source application that uses decentralized infrastructure for messaging using decentralized storage and onion routing protocol to send end-to-end encrypted messages. It originally used Signal protocol to provide end-to-end encryption for pairwise communication and Sender Keys for group communication. However, Session recently updated its messaging app to use Session protocol instead of Signal protocol that is backward compatible with Signal protocol and provides similar guarantees for end-to-end encryption. Session guarantees Perfect Forward Secrecy (PFS) and deniability, which are further strengthened by using anonymous accounts and disappearing messages. The new Session protocol uses a long-term shared key instead of sender-key for group communication that are shared via pairwise channels and are recreated when a user leaves the group. This also helps better support for multiple devices that shares user’s long-term keypair to a new device and duplicates sent messages into their own swarm [57].

Session reduces metadata collection by using X25519 public/private keypair as identity as opposed to using phone number and using decentralized network of nodes for routing messages, which use onion protocol to hide IP-addresses of users. Session uses a network of Loki Service Node based on Loki blockchain, which itself is based on the Cryptonote protocol [20]. It uses proof-of-stake to prevent against Sybil attack and rewards service-node providers with Loke cryptocurrency that helps create a sustainable network as opposed to public networks such as Tor or I2P. The storage provided by a subset of swarm nodes on this network can be used for storing encrypted-messages when recipients are offline or sharing encrypted attachments with other members of the group. As session app does not use any centralized services, it shares prekey bundles via friend request and adds additional metadata to each message for routing purpose. Also, it uses Loki Name Service (LNS) to map keypair based identity with a short text or username that can be used to share identity with friends [42, 57].

Mobile Clients

The IOS Session app uses CommonCrypto API, Sodium, Curve25519Kit and SessionProtocolKit frameworks for crypto/DR primitivies and includes fetch and remote-notification in background mode. The IOS app also stores sensitive encryption key data in Sqlite database, which is not protected with NSFileProtectionComplete or NSFileProtectionCompleteUntilFirstUserAuthentication flags.

Signal

Signal is an open source messaging app that uses Curve25519, HMAC-SHA256, X3DH and Double Ratchet algorithm to support end-to-end encryption and forward secrecy. It uses phone number as identity and Signal server generates a random password upon installation, which is sent to Signal server with each request. It supports both direct communication between two users and group communication with multiple members.

A group message in Signal is treated as a direct message but with 128-bit group-id. The Signal app sends a message for each member of the group to Signal server for delivery. The server forwards the message and acknowledges messages from sender. The recipient acknowledges receipt to Signal server. The acknowledgements do not use end-to-end encryption [5, 18, 32]. Signal doesn’t enforce any access control for group management and any user can add members and it does not allow any user to remove other members but a member can remove herself from the group. Due to lack of any access control and validation of group membership by invitee, a malicious user can add a member and eavesdrop messages if he acquires a member’s phone number and group id. The server acknowledges messages from sender and receivers acknowledges the receipt to server but these acknowledgements are susceptible to forging. As all messages are routed through Signal server containing plaintext metadata such as sender, receiver, timestamps, IP-addresses, etc, a malicious user with access to the server can change timestamps thus affecting message order or collect metadata about relationships among users [4, 18, 48, 83].

Signal app recently added a feature “Secured Value Recovery” to upload contact list without being accessible by Signal. Signal uses Intel SGX (Software Guard Extension) to enclave data processing. However, Signal uses a low-entropy 4-digit PIN for encryption that is susceptible to dictionary attacks. Also, SGX is vulnerable to “speculative execution” side channels attacks, which can allow an attacker to extract secrets from SGX [16, 17].

Signal also uses dark pattern to automatically opt users into this cloud backup by asking users to generate PIN code without explaining the ramifications of their actions [27, 30]. Though, Signal receives favorable coverage in tech world, but it suffers from WhatsApp envy and it tends to copy dubious features from WhatsApp instead of building a trustless security and federated delivery services like email for secured communication [52]. Instead, Moxie has actively tried to stop other open source projects such as LibreSignal to use their servers [60]. Moxie also has declined collaboration with open messaging specifications such as Matrix [63]. The open source nature of Signal is also questionable because their server code hasn’t been updated for almost a year and it’s unclear what changes they have added locally [78, 83].

Mobile Clients

The IOS Signal app uses audio, fetch, remote-notification and void modes in background. The Android Signal app uses smaller than 1024 bit key for signing the app and allows dynamic code loading that can lead to malicious code injection. The Android Manifest file does not protect broadcast receivers with permissions such as Signature/SignatureorSystem that can result in data leak.

Status.im

Status is an open source messaging platform and Web 3.0 browser based on Ethereum Network and Status Network Token (STN) utility crypto-token. Status supports confidentiality, authenticity, end-to-end encryption and forward secrecy. The end-to-end encryption is implented using Double Ratchet protocol and X3DH based prekeys. The cryptographic primitives include Curve25519, AES-256-GCM, KECCAK-256, SECP256K1, ECDSA, ECIES and AES-256-CTR with HMAC-SHA-256 [55]. Status uses Swarm for decentralized storage and Whisper for peer-to-peer communication. It has recently switched to Waku for peer-to-peer communication for better scalability, spam-resistance and support for libp2p. It uses a modified Signal protocol to suit decentralized infrastructure for end-to-end encryption and mobile app also supports payments and crypto asset wallet. The users are identified by SECP256k1 public key and STN token holders can regiser their usernames to Ehereum Name Service (ENS) to make readable and recoverable access point. As Status doesn’t use phone number or email for identity, it provides better privacy and protection against censorship. Status lacks self-destructing/disappearing messages, audio and video calls. As push notifications generally requires central services, Status supports notifications while the application is running but with Whisper V5 protocol it can store offline inbox, which can also be used for decentralized push notification [43, 44, 46].

Telegram

Telegram is a closed-source messaging app that was created by Pavel Durov. Telegram uses a in-house encryption protocol as opposed to Signal protocol in most other apps, thus it has not been vetted with comprehensive security analysis. It has been found to be lacking security and privacy that it claims, e.g. an investigation by Vice found that Telegram doesn’t enable end-to-end encryption by default. It also stores chat messages in cloud that can be spied by government or spy agencies. Telegram copies all contacts from your phone to their servers for matching other Telegram users. As all outgoing messages are routed through Telegram server, it collects metadata including IP-addresses, device profile, etc that can be used to track users. Telegram was also found to be exposing users’ locations when viewing nearby users [25, 26, 40]. Researches also have found “crime-pizza” vulnerability where attacker could alter the order of messages coming from a client to the telegram server in cloud [81]. Due to its use of customized encryption protocol and implementation, severe security bugs have been found in Telegram [62, 72, 81].

Mobile Clients

The IOS Telegram app does not use App Protection (ATS) that can lead to insecure network connections and uses deprecated and insecure methods unarchiveObjectWithData / unarchiveObjectWithFile for deserialization instead of decodeObjectForKey / decodeObjectOfClass. It uses audio, fetch, location, remote-notification and voip modes in background. The Android app uses hard coded API keys including Google Maps and Android Manifest file does not protect broadcast receivers with permissions such as Signature/SignatureorSystem that can result in data leak.

Threema

Threema started as closed-source but it open sourced its messaging app recently in December 2020 under AGPLv3. Threema messaging app uses federated servers to deliver messages and distribute user keys but they don’t provide relaying messages from server to another. Threema uses a variation of Signal protocol that uses Diffie-Hellman key exchanges (DHKEs) using Curve25519, implements end-to-end encryption using XSalsa20 cipher and validates integrity using Poly1305-AES MAC. Threema limits the maximum members in a group to 50 and allows only creator to manage group membership. All group communication uses same protocol as pairwise communication except group messages are not end-to-end acknowledged. However, all group communication reuses same DHKE of their long term public keys, thus it does not provide forward secrecy. Threema orders messages by received data similar to Signal and WhatsApp, which can be forged by someone with access to Threema server [18].

WhatsApp

WhatsApp is a closed source messaging app that uses Signal protocol for pairwise communication and uses variation of sender-keys for group communication. WhatsApp uses Noise Pipes based on 128-bit Curve25519, AES-GCM and SHA256 to protect communication between client and server [38]. WhatsApp uses libsignal-protocol-c library that uses Signal key exchange protocol based on X3DH and Double Ratchet algorithm to communicate among users [37]. A group contains a unique id, a set of admins and members with max limit of 256 members. Each user generates sender-key upon joining the group that are shared with other members using pairwise channels. The group communication does not provide future secrecy as sender-keys are reused for encrypting group communication. The group’s sender-keys are rotated when a member is removed otherwise a compromised member can passively read future messages [5]. WhatsApp uses centralized servers for all group management instead of using cryptography to protect integrity of group members. As a result, server may add a member to eavesdrop on the conversation [18, 32, 35]. All outgoing messages are routed through WhatsApp server that can collect metadata, forge acknowledgements or change order of the messages. WhatsApp uses a mobile phone to identify each user and supports multiple devices per user by connecting secondary devices via QR code. The incoming message is first received by primary device and it then shares message with secondary devices using pairwise channel. The secondary devices cannot receive messages if the primary device is offline [2, 18].

As Facebook now owns WhatsApp, it now mandates sharing all metadata with Facebook such as phone numbers, contacts, profile names, profile pictures, status messages, diagnostic data, etc from phone [22, 31]. Due to widespread use of WhatsApp, a lot security companies now provide hacking tools to governments to monitor journalists or activists as reported on [28, 29, 45, 47, 54]. Apple’s privacy policies for WhatsApp now includes all metadata collected by the app, which is now shared with Facebook [50].

Apple 'App Privacy' Labels
Mobile Clients

The Android WhatsApp uses smaller than 1024 bit key to sign the app and hard codes API keys including Google Maps and Android Manifest file does not protect broadcast receivers with permissions such as Signature/SignatureorSystem that can result in data leak.

Strengths of aforementioned Messaging Apps

Following are major strengths of the messaging apps discussed above:

Ease of Use

WhatsApp is the gold standard for ease of use and simple messaging and other apps such as Signal, Telegram and Threema also offer similar user experience.

Scalability

The messaging apps such as WhatsApp have been scaled to send 100B+ messages/day. This is biggest strength of tech giants that owns these platforms because they have built huge data centers for supporting this scale of messaging. On the other hand, these tech giants also use this infrastructure to extract as much data as they can from users and store them for marketing purpose or surveillance.

Confidentiality and Data Integrity

All of the messaging apps discussed above provide end-to-end encryption and provide reasonable data integrity. Though closed source apps such as FB Messenger, WhatsApp and Telegram cannot be fully trusted because they use servers for group administration and collect a lot of metadata that hinders data privacy.

Rapid Development

The centralized and controlled server environment allows messaging providers such as FB Messenger, WhatsApp, Signal, Telegram to rapidly develop and deploy new features. This also keeps development cycle of server side and mobile app separate as server side can be easily updated without updating mobile apps.

Weaknesses of aforementioned Messaging Apps

Following are major drawbacks of the messaging apps discussed above:

Single Point of Failure

The centralized API used by above messaging apps become a single point of failure and messaging apps cannot function without these APIs [69].

Bug Fixes

The centralized API adds dependency on the provider for fixing any bugs or updates and despite using open sourcing their code they may not accept issues or fix bugs reported by their users [75].

Walled Gardens

The popular messaging apps such as WhatsApp, FB Messenger, Signal and Threema do not use standard protocols for messaging and you cannot easily create a new client to use their services [79].

Centralized Authentication

The messaging apps discussed above either use provider account for authentication or generate an account upon installation. This account is used to authenticate all requests for message routing, contacts discovery, acknowledgements, presence, notification and other operations. This design adds dependency on the central server for sending or receiving messages and you cannot use messaging when the centralized servers are not accessible [69].

Service outage

As messaging is now part of critical communication infrastructures and any disruption in availability of data centers of messaging providers affects millions of people who cannot connect with others [68].

Proprietary Protocols and Vendor Lock-in

The messaging apps discussed above use proprietary protocols and customized APIs to lock users into their platforms. The messaging apps with large user-base such as WhatsApp, Signal and Telegram use network effect to make it harder to interact with their system from third party clients or switch to another messaging system. These messaging platforms use network effect to prevent competition and grow larger and larger with time. This is true even for open-source messaging platforms such as Signal that does not collaborate with other open messaging specifications and prohibits use of its branding for setting up federated Signal servers or use its servers from other apps such as LibreSignal [52, 60, 63].

Phone or Email as Identity

Most of messaging apps discussed above use phone or email as identity that weakens security and privacy. The requirement of having a phone number before using end-to-end encryption burdens users who don’t have a phone number or who want to use other computing devices such as tablets for messaging. They also require sharing phone numbers with other users for messaging that infringes upon user’s privacy. The phone numbers are owned by telecommunication companies that can ban users or transfer control to malicious users via SIM swapping, social engineering or other attacks. Though, messaging apps such as WhatsApp or Signal send a warning when identity keys are changed but users rarely pay attention in practice. Both Signal and WhatsApp also supports “registration PIN lock” for account recovery that requires both phone number and registration PIN code to modify identity keys, thus reducing the efficacy of PIN locks. Most countries require identity card to obtain phone number that thwarts user privacy and can be used by governments to track whistleblowers or activists [19, 20, 54]. Recent Princeton study found that five major US carriers are vulnerable to SIM-swapping attacks to gain access to victim’s phone number [32, 33].

User Data and Privacy

Big tech companies control data and privacy policies for many of these messaging apps and often integrate messaging system with other parts of their platforms such as recent changes in WhatsApp privacy policies for sharing user data with Facebook. Other messaging apps such as Signal, which recently gained a large user-base that migrated off WhatsApp lacks proper policies regarding data privacy and improper use of their services [73]. Due to low hardware cost, this user data is often stored indefinitely and is not only used for ad tracking but is also shared with law enforcement or government agencies without informing users. 

Metadata Collection

Though, end-to-end encryption guarantees confidentiality of message contents so that only recipient can see the contents but the message includes metadata related to sender, receiver and timestamps, which is needed for routing, delivering or ordering of the message. The messenger services can easily tap into this metadata, Geo-location via IP-addresses of requests because all messages are relayed through their servers. This data is a liability even when the messaging systems such as Signal are not actively collecting it because any malicious user or rogue employee may scrape or leak this data.

Lack of Trustless Security

The messaging apps with centralized APIs such as WhatsApp, Signal and Telegram require trust in the company for securing data and services, which breaks basic tenant of security because truly secure systems don’t require trust [52].

Usage Statistics

A number of messaging apps with centralized servers collect aggregate user statistics such as number of messages, type of messages, Geo-location, etc that can be used to censor activists, journalists or human right defenders [23, 54].

Message Acknowledgement

Messaging apps such as WhatsApp or Signal send acknowledgement when a message is sent or received, which is not secured by the end-to-end encryption. The results of security research showed how these messages can be intercepted or forged by someone with malicious access to the servers.

Censorship

Messaging apps such as WhatsApp and Signal use phone number as identity that can be used to censor or ban users from using the messaging apps. As these apps use centralized APIs for relaying all messages, these servers can refuse to send or receive messages based on metadata from the API request such as sender, receiver, or Geo-location via IP-address. Due to centralized APIs, governments can easily disable access to the messaging servers and some apps including Signal support proxy servers but it’s implemented in leaky fashion that can be easily traced [76]. 

Complexity and Attack Surface

In order to excel from with other messaging apps, messaging providers have added a variety of additional features that have increased the complexity of messaging system. This complexity has become a liability for user security and privacy because it has increased the attack surface. Major security companies such as NSO Group makes millions of dollars for selling zero-day exploits that are used to decrypt messages that are supposed to be guarded by end-to-end encryption. These exploits are often used to hack, persecute or kill journalists, activists or human rights defenders [18, 45, 47, 48, 54, 66].

Closed Source or Customized implementation

Though, some of messaging apps such as Session, Signal are open source but many other apps remain close-source. An end-to-end encryption can only be trusted if the implementation is fully source-code and properly audited by an independent third party. Though, a number of messaging apps use Signal protocol for end-to-end encryption but they use a customized encryption libraries, implementation of Signal protocol and messaging protocol for authentication, communication or group management. Despite availability of open source libraries such as libsodium for encryption, olm/megolm for double-ratchet implementation, MLS standard for group chat, they have gained very little adoption in these messaging apps. As a result, there is very little interoperability among these messaging apps.

Summary of Evaluation

Following table summarizes capabilities and characteristics of messaging apps discussed above:

FeaturesBriarFB MessengerSessionSignalStatusTelegramThreemaWhatsApp
Ease of UseCBCACABA
Security / Forward SecrecyBCAAACBB
IdentityLocal KeypairFB Email or Phone numberLocal KeypairPhone numberLocal KeypairPhone numberEmailPhone number
Open SourceYesNoYesYesYesOpen source clients but not serverYesNo
Decentralization / FederationYesNoYesNoYesNoYesNo
Trustless SecurityYesNoYesNoYesNoNoNo
Metadata PrivacyBFACACCD
Network AnonymityBFAFAFFF
Multiple DevicesNoYesYesYesNoYesYesYes
End-to-End Encryption ProtocolBramble Transport ProtocolSignal protocol with Sender KeysSession ProtocolSignal ProtocolSignal Protocol with variationsCustomized ProtocolSignal Protocol with variationsSignal Protocol with Sender Keys
Encryption Algorithmsrandom function, authenticated cipher Closed SourceX25519, AES-GCM, ED25519Curve25519, HMAC-SHA256Curve25519, AES-256-GCM, KECCAK-256, SECP256K1, ECDSACustomizedXSalsa20, Poly1305-AES MACCurve25519, AES-GCM and SHA256
Open Standards / LibrariesNoNoNoNoNoNoNoNo

Open Messaging Standards

Following section reviews open standards for providing encrypted communication between two parties or among multiple participants in a group conversation:

Pairwise Communication Protocols

Email and PGP

Email uses SMTP (Simple Mail Transfer Protocol) for sending outgoing messages that was defined by IETF in 1982. Phil Zimmerman defined PGP (Pretty Good Privacy) in 1991 to support confidentiality, authenticity and end-to-end encryption, which was later became IETF standard in 1997 as OpenPGP. Email also supports S/MIME standard that uses a centralized public key infrastructure as opposed to decentralized architecture of PGP. There has been very little adoption of these standards despite their long history and have difficult to use in practice. Email envelop includes a lot of metadata such as sender, recipient, timestamps that can be used for tracking communication. Also, Email/PGP lacks adoption of modern cryptography algorithms and does not support future secrecy or post-compromise security [23, 41].

XMPP

XMPP (eXtensible Message and Presence Protocol) is a chat protocol that became an IETF standard in 2004. XMPP uses XML streams to support asynchronous, end-to-end exchange of data. There is an ongoing work to add end-to-end encryption to XMPP [23].

Off-the-record protocol (OTR)

OTR protocol was released in 2004 as an extension to XMPP to provide end-to-end encryption, forward secrecy and deniability. OTR is designed for synchronous communication and requires both sender and receivers to be online and it does not work for group communication or when recipients are offline [23].

Matrix

Matrix.org is an open specification for decentralized communication using JSON. It uses Olm library that implements Signal protocol for providing end-to-end encryption between two users and uses MegOlm for group chat. The Olm/MegOlm libraries use Curve25519 for generating identity keys and prekeys, Ed25519 for signing keys before upload and 3DH/Double-Ratchet algorithms for asynchronous encrypted messages [23, 24]. Matrix also provides interoperability with other protocols and can be used as a bridge to support other messaging apps [36, 65].

Open Whisper System Signal Protocol

The Signal protocol was created by Moxie Marlinspike for Open Whisper System and TextSecure messaging app to provide end-to-end encryption so that only intended recipient can see the plaintext message. It was later adopted by Signal app, WhatsApp, Google Allo (defunct), FB Messenger and other messaging apps. The Signal Protocol uses a rachet system that changes the encryption key after each message for providing forward secrecy or post-compromise security.

This protocol requires each user to generate a long-term identity key-pair, a medium-term signed prekey pair and several ephemeral keys using Curve25519 algorithm. The public keys of these pairs are packed into prekey bundle and uploaded to a Key Exchange Server/Key Distribution Center for dissemination. The prekey bundle is downloaded by other users before sending a message, which is then used to build a new session key based on message sender and recipient keys using Extended Triple Diffie-Hellman (X3DH) key agreement protocol. This protocol can work with offline users where a third party server can store and forward the messages when the user is back online.

The Signal protocol uses AES-256 for encryption and HMAC-SHA256 for validating data integrity. The X3DH generates a shared secret key from X3DH, which is used to derive “root key” and “sending chain key”. These keys are used by Double Ratchet (DR) Algorithm, which is derived from Off-the-Record protocol. The DR algorithm derives a unique message key using a key derivation function (KDF) that is then used to send and receive encrypted messages. The outputs of the DH ratchet at each stage with some additional information to advance receiving key chain are attached to each encrypted message. The “sending chain key” is advanced when a message is sent and sender derives a new “sending chain key”. Similarly, receiver advances “receiving chain key” to generate new decryption key. The “root key” is advanced upon initialization session so that each message uses new ephemeral or ratchet key that guarantees forward-secrecy and post-compromise security. The first message also attaches sender’s prekey bundle so that receiver derive the complementary session [12, 13].

OMEMO

OMEMO is an extension of XMPP develped in 2015 to provide end-to-end encryptio using Signal protocol [23].

The Messaging Layer Security (MLS) Protocol

Messaging Layer Security (MLS) is a standard protocol being built by IETF working group for providing end to end encryption. It defines specification for security context in architecture document and specification for protocol itself in protocol document. Each user publishes init keys consisting of credentials and public keys ahead of time that are handled by delivery service. There are two types of messages: handshake messages, which are control messages for group membership with global order and application messages with text/multimedia contents.

Group Messaging Protocols

Group messaging with 3+ members require additional complexity with end-to-end encryption. Each messaging app uses its own protocol for group management and communication among members. Following are some solutions that are used in messaging apps:

mpOTR

The multi-party off-the-shelf messaging (mpOTR) [3] extends OTR for providing security and deniability. It uses a number of interactive rounds of communication where all parties have to be online, which doesn’t work in presence of unreliable or mobile networks [2].

Sender-keys with Signal Protocol

Sender-keys was developed by Signal protocol and is used by a number of messaging apps such as libSignal, Whatsapp, Google Allo (defunt), FB Messenger, Session and Keybase for group messaging but it’s no longer used by Signal app. A user generates the sender-key randomly, encrypts it for each user using the pair key and then sends it to that user. As this sender key is reused for group communication and is not refreshed or removed after each message, this protocol cannot guarantee forward-secrecy or PCS. This protocol allows messenger to send a message once to their delivery server that then fans out the message to each member of the group. These messaging apps regenerate sender-key when a membership is updated, which creates O(N^2) messages for a group of size N because each member has to create a new sender key and send it to other group members. However, a bad actor in the group can eavesdrop message communication until that member is removed and sender keys are refreshed [2, 5, 18].

Open Whisper System Signal Protocol

As Signal protocol uses sender-keys for group communication, it requires encrypting and sending message for each group member, which takes O(N^2) messages for a group with size N. Also, the double ratchet algorithm is very expensive in group chat because it requires each message has to generate new session keys for each other group member. Group messaging also adds more complexity to authentication, participant consistency for group membership and ordering of messages.

The Double-Rachet does not preserve forward-secrecy or post-compromise-security (PCS) in group messaging due to high computation and bandwidth overhead and Signal messenger instead uses sender-keys in group-communication that can compromise entire group without informing the users even if a single member is compromised. Regularly, rebroadcasting sender-keys over secure pairwise channel can prevent this attack but it doesn’t scale as group size increases [1, 2].

When sending a message to group, the Signal app sends a message for each message to their delivery API that then stores and forward the message to the recipients.

Asynchronous Ratcheting Trees (ART)

Asynchronous Ratcheting Trees (ART) offers confidentiality, authenticity, strong security (PCS) with better scalability as group size increases. The ART protocol trusts initiator and in order to support asynchronicity, it supports weaker security if members are offline similar to zero round-trip mode of TLS 1.3. It uses signature to authenticate initial group message and a MAC to authenticate subsequent updates. A tree-based group DH protocol is used to manage group members and a member updates personal keys in this tree structure to guarantee PCS [2].

The Messaging Layer Security (MLS) Protocol for Groups

The MLS protocol is an IETF standard for providing asynchronous end-to-end encrypted messaging. The MLS protocol provides more efficient key change for providing end-to-end encryption with forward security and PCS for group messaging where group size can scale up to thousands of members [4]. The MLS protocol uses a rachet tree, which is a left-balanced binary tree whose leaves represent a member and intermediate nodes carry a Diffie-Hellman public key and private key [5].

Each member retains a copy of the rachet tree with public keys but private keys are shared using the rachet-tree property:

If a member M is a descendant of intermediate node N, then M knows the private key of N.

The root is labeled with shared symmetric key known to all users. The sender keys are derived using key-derivation function (KDF) from the root node’s key and the private keys are distributed to copath nodes when a member is removed [2]. Unlike signal-protocol that results in O(N^2) messages after a membership change, MLS only costs O(N) for initially adding users and O(N log N) thereafter when keys are refreshed [7].

Each group in MLS is identified by a unique ID and it keeps a epoch number or a version to track changes to the membership. The epoch number is incremented upon each change to the membership, e.g.

The MLS protocol guarantees membership consistency by including group membership in shared key derivation during key negotiations. For example, a group operation includes group-identifier, epoch number that represents version and content-type to determine the ordering requirements [4].

Casual TreeKEM

Causal TreeKEM [7] uses CRDT (Conflict-free Replicated Data Type) to improve TreeKEM by eliminating the need for a linear order on changes to group membership. The CRDTs allow replication of mutable data among group of users with the guarantee that all users see the same state regardless of the order in which updates are received. This allows building collaborative applications with end-to-end encryption. The CRDTs requires casual order to support partial order for each sender but it doesn’t require linear order. However, users cannot delete old until they have received all concurrent state change messages, which diminishes guarantees of forward secrecy.

Essential Features and Design Goals of a New Messaging System

Based on evaluation of current landscape of messaging apps and lack of adoption of open standards and decentralized architecture, this section recommends essential features and design goals for a new messaging system that can address these limitations and provide better support for decentralization, trustless security, scalability of group conversation, minimization of data collection, censorship, etc. Following sections delineate these features and design goals:

Open Protocols and Standards

Internet was designed by ARPA in 1960s to replace centralized network switches with a network of nodes. Internet defined open protocols such as TCP/UDP/IP for network communication, RIP/BGP for routing, DNS for domain name lookup, TELNET/FTP for remote access, POP3/IMAP/SMTP for emails, IRC for chat, NNTP for usenet groups. Sir Tim Berner-Lee used similar design principles for world-wide-web when he designed HTTP protocols. These protocols were designed with decentralized architecture to withstand partial failure in case of attacks by atomic bomb. For example, DNS architecture is designed as a hierarchical tree where root level node manages DNS nodes for top-level-domains (TLD), secondary authoritative TLD DNS node maintains list of authoritative DNS nodes for each domain and lower level nodes maintains subdomains.

Similarly, SMTP for email delivery uses MX record in DNS to find Message transfer agent (MTA) server and delivers the message. In addition, SMTP uses a relay protocol when target MTA is not on the same network as sender to forward the message to another MTA that keeps forwarding the message until it reaches the target MTA.

The proposed messaging system will instead use open standards and protocols similar to Email/SMTP and HTTP(s) as advocated by Protocols, Not Platforms: A Technological Approach to Free Speech. Also, in absence of open standards, de-facto industry standards such as Signal protocol will be used.

Decentralized Architecture

Previous section discussed deficiencies of centralized messaging systems such as FB Messenger, WhatsApp, Signal, Telegram, etc. The proposed messaging system will use decentralized or federated servers similar to Email/SMTP design that can relay messages from one server to another or deliver messages to recipients.

Baran, P. (1964). On Distributed Communications, Memorandum RM-3420-PR.
Baran, P. (1964). On Distributed Communications, Memorandum RM-3420-PR.

The decentralized architecture may need peer-to-peer communication layer such as Tor or I2P used by Briar, libp2p used by Status.im app or a network of nodes and onion routing used by Session app.

Open Source

The proposed messaging system will use open source libraries such as libsodium for encryption, olm/megolm for double-ratchet implementation, MLS standard for group chat and matrix for bridging with existing messaging systems.

Asynchronicity

Asynchronicity feature requires that users are able to send messages, share keys or manage group members when other users are offline. The decentralized storage is used as a temporary queue to store encrypted messages that are yet to be delivered. Also, users may share large attachments or media files such as pictures/videos/audios, which can be stored on the decentralized storage and then a link to the attachments are shared with other users via secure messaging. This will be further protected by symmetric encryption and decentralized identity claims to provide safe access to the shared file. A number of decentralized storage services such as IPFS, Storj, Sia, Skynet, etc are already available that can be used to build decentralized messaging.

Secrecy and Confidentiality

The proposed messaging system will guarantee secrecy and confidentiality so that only recipient user can compute the plaintext of the message when communicating in one-to-one conversation or in a group conversation.

Authenticity with Decentralized Identity

The proposed messaging system will use decentralized identity and will allow participants to validate identity of other members of conversations.

Authentication, Deniability and Non-repudiation

Deniability or deniable authentication allows conversation participants to authenticate the origin of the message but prevents either party from proving the origin of the message to a third party post-conversation. Non-repudiation is opposite of deniability and it is a legal term that provides proof of the origin of data and integrity of the data. Centralized messengers such as FB Messenger uses Authentication to prevent impersonation or deniability but public/private keypair based authentication supports deniability and it is considered a feature in protocols such as OTR, mOTR and WhatsApp. One of the ways to provide deniability is to use ephemeral signature keys when messages are signed by medium-term signature keys [7].

Data Integrity

The data integrity guarantees that data is not tempered or modified during transmission and the messaging system will use Message Authentication Code (MAC) to validate message integrity and detect any modification to the message.

Forward Secrecy and Post-Compromise Security

Forward secrecy protects former messages from being read when long-term key information is exposed. Similarly, Post-compromise security guarantees that if adversary gets hold of keys, they cannot be used to decrypt messages indefinitely after the compromise is healed. The proposed messaging system will use Signal and Message Layer Security (MLS) protocols to guarantee forward secrecy and post-compromise security. These protocols use Diffie-Hellman Exchange to establish an pre-computed/ephemeral DH prekeys for each conversation. The forward secrecy for each message is achieved by additionally using deterministic key ratcheting that derives shared key using a chain of hash functions and deletes keys after the use [10].

Minimize Metadata Collection

The decentralized architecture will protect from collecting metadata from centralized servers but proposed messaging system will further reduce any exposure to metadata by using design principles from Session app such as using self-hosted infrastructure for storing attachments, using onion routing to hide IP-addresses of users in transport message-exchange layer and using public/private keypair for identity as opposed to phone number or email address [19].

Network Anonymity

The proposed messaging system may use P2P standards such as Tor or I2P to protect metadata or routing data. For example, Pond, Ricochet, Briar provides network anonymity by using Tor network but they don’t hide metadata such as sender, receiver, timestamp, etc. The Session app protects metadata such as IP-addresses of senders and receivers using onion network [19, 23].

Message Acknowledgement

The proposed messaging system will use end-to-end encrypted messages for acknowledgement so that they cannot be intercepted or forged by someone with access to centralized server [18].

Turn off Backup

In order to guarantee robust data privacy, the messaging app will turn off any backup of chat history offered by IOS or Android APIs.

Access Control for Group Administration

A member in a group will be able to perform following actions:

  • Create a group and invite others to join
  • Update personal keys
  • Request to join an existing group
  • Leave an existing group
  • Send a message to members of the group
  • Receive a message from another member in the group

The creator of group will automatically become admin of the group but additional admins may be added who can perform other actions such as:

  • Add member(s) to join an existing group
  • Remove member(s) from an existing group
  • Kick a member from an existing group

A group will define set of administrators who can update the membership and will be signed by the administrator’s group signature key. Further, a group secret or ticket will be used for proof of membership so that each member verifies the ticket and guest list of group [18].

Group Communication

The proposed messaging system will use Message Layer Security (MLS) protocol and Tree-KEM based ratchet trees to manage group keys in scalable fashion and to support large groups. This will provide better support forward-secrecy in a group conversation where keys can be refreshed with O(N Log N) messages as opposed to O(N^2) messages in Signal protocol.

Multiple Devices

A user can use multiple devices where each device is considered a new client. Though, a user can add new device but that device won’t be able to access message history. The multiple devices by same user can be represented by a sub-tree under the user-node in Tree-KEM. User can publish the public key of their sub-tree as an ephemeral prekey and hide actual identity of the device so that you cannot reveal the device that triggered the update [2].

Account Recovery

The messaging app will use 12-24 words mnemonic seed to generate long-term identity keypair similar to BIP39 standard to generate deterministic wallets for Bitcoin. Alternatively, the messaging system may implement social recovery similar to Argent wallet and the Loopring wallet [61].

Monetary Incentive

Decentralized architecture requires new infrastructure for peer-to-peer communication, decentralized identity, keys management, etc. Public peer-to-peer infrastructures are not designed to handle the needs of modern messaging throughput of 100B+ messages/day. In order to scale this infrastructure, the node-runners will require a monetary incentive to expand this infrastructure. For example, a number of blockchain based projects such as Sia, Storj, and Serto use crypto rewards for providing storage or network services.

Disappearing Messages

This feature will be implemented by the mobile messaging app to automatically delete old messages in order to bolster the security in case the phone is seized or stolen.

Contacts Discovery

As contacts on the phone are generally searchable by phone number or email so they can be readily used for connecting with others on the messaging system that use Phone number or email for identity. Decentralized messaging systems that use a local a public/private keypair for identity such as Status and Session cannot use these features. Instead, they rely on a different lookup service, e.g. Session uses Loki Name Service and Status uses Ethereum Name Service to register usernames that can be used to connect with friends.

Attachments/Multimedia Messages

Small attachments and multi-media files can be embedded with the messages using the same end-to-end encryption but large attachments and multimedia files can be shared via a decentralized storage system. The large files can be uploaded with symmetric encryption that will be available for a limited time and symmetric password can be shared via exchange of encrypted messages.

Compromise of User Devices

Due to its decentralized nature, this messaging system does not suffer any security risk if decentralized authentication is compromised. All user data and keys will be stored locally and if an attacker gets hold of user or group’s private keys, he may be able to send messages impersonating as the user. However, the attacker won’t be able to access future messages if user or group’s keys are refreshed and old keys are deleted from the device.

Data Privacy Regulations

This messaging system will provide better support for the data privacy regulations such as General Data Protection Regulations (GDPR) from the EU or the California Consumer Privacy Act that allow users to control their data and privacy.

Offline Members

As forward-secrecy and post-compromise-security rely on updating member keys, an offline member will be holding old keys and thus may be susceptible to compromise.

Proposed Messaging Architecture

The architecture for building a new messaging system is largely inspired by design philosophy of Internet that was designed as decentralized to withstand nuclear attack and by vision of Sir Tim Berners-Lee for using open protocols such as SMTP, FTP, HTTP(S). The data privacy architecture is influenced by Web 3.0 architecture for building decentralized applications (DApps) that keep user data private and securely communicate with other systems using encrypted channels in a trustless fashion. As the users use increasingly powerful devices with high-bandwidth network, this architecture leverages edge computing to bring computation closer to the user device where private data is securely stored as opposed to storing this data in cloud services. However, this does not preclude storing user-data remotely as long as it’s encrypted and the owner of data can control who can access it based on privacy preferences.

Following building blocks are defined for designing this messaging system based on open standards and decentralized architecture.

High level components
High level components

Decentralized Services

The proposed messaging architecture uses following decentralized services:

Decentralized Identity

The Decentralized Identity Foundation (DIF) and the W3C Credentials Community Group are standard bodies that are building open standards for decentralized identity such as Decentralized identifiers (DID) that use public/private keypair for generating user identities. The users hold their identities in their digital wallet that can be used to control access to personal data or identity hubs, form relationships with other entities and communicate with other users. The user identity contain a decentralized identifier (DID) that resemble Uniform Resource Identifier (URI) and specifies location of service that hosts DID document, which includes additional information for identity verification [8]. A user manages access control via private keys accessible only to the user. A user may also create multiple identities for different service providers using multiple DID based identifiers and user consent is required to access claims for providing granular access control [9].

Though, using this standard is not a strict requirement for this architecture but it will be beneficial to describe user identity in a self-describing format so that dependent services can handle identity consistently.

Decentralized Key Exchange Servers

The key-exchange servers are needed to store and disseminate public keys or prekey-bundles for each user based on their identities. Each user uploads their public keys/prekey bundles when they join and updates those prekey bundles periodically when new ephemeral keys are generated. However, such a service can be breached or a malicious directory service may impersonate a user so a public log such as Key Transparency may be needed that publishes the binding between identity and keys in a public log.

Decentralized Storage

Asynchronicity feature requires that users are able to send messages, share keys or manage group members when other users are offline. The decentralized storage will be used to store encrypted messages. Also, users may share large attachments or media files such as pictures/videos/audios, which can be stored on the decentralized storage and then a link to the attachments are shared with other users via secure messaging. This will be further protected by symmetric encryption and decentralized identity claims to provide safe access to the shared file. A number of decentralized storage services such as IPFS, Storj, Sia, Skynet, etc are already available that can be used to build decentralized messaging.

Federation and Peer to Peer Communication

A decentralized system may need a peer to peer communication such as libp2p or i2p for communication. Alternatively, it may just use a federated servers similar to Email/SMTP that can route a messages from one server to another or deliver the message to a local user.

Mobile App / Desktop Client

The mobile app or desktop client will use following components to provide secured messaging:

Encryption and MLS Libraries

The mobile app will use open source encryption libraries such as as libsodium for encryption, olm/megolm for double-ratchet implementation and MLS for group communication.

Digital Wallet for Keys

The digital wallet behaves as a registry of keys, which will be created after app installation and it will securely store key-pairs and key-bundles of user, contacts and groups. This wallet will be encrypted using user’s master password and a local device password, which is automatically generated. The wallet is updated when a user updates the ephemeral keys or other members update their key-bundles. In order to satisfy post compromise security, it removes old ephemeral keys and keys of deleted group member. The messaging app will use open standards such as BIP39/BIP43 to create deterministic identity using mnemonic seed of phrases and BIP32/BIP44 for hierarchical deterministic format for storing keys so that they can be easily exported to another device by the owner. Another benefit of using these standards is that this hierarchy allows storing crypto-keys for crypto-assets and using them for payments.

Group Management

This component provides group administration such as

  • Create a group and invite others to join
  • Join an existing group
  • Leave an existing group
  • Add member(s) to join an existing group
  • Remove member(s) from an existing group

Status / Presence

It’s hard to sync contacts with status and presence in a decentralized architecture because no single server maintains all contacts. However, this architecture may use a Gossip Protocol that periodically exchanges presence with a subset of contacts along with other list of contacts who are known to be online.

Local Push Notification

The push notification is also hard to implement in a decentralized architecture and decentralized messaging apps such as Session supports push notification in foreground mode. Alternatively, the mobile app can wake up periodically to poll for messages in background and use a local notification to notify users when new messages arrive. However, periodic waking up the app may drain phone battery if it’s run too often.

Local Authentication

A unique device password will be automatically generated upon installation and the user will then create a master password, which will be used for local authentication. The device and master password will be used to encrypt identity, signature and ephemeral keypairs in the digital wallet. The user will be required to authenticate locally before using the app to decrypt these keys, which in turn will be used to decrypt outgoing or incoming messages.

Local Delivery Service for Message Broadcast

The delivery service in this decentralized architecture will run locally on the client side to send outgoing messages. It will use the transport layer to send messages asynchronously that may use protocol-bridge to integrate with other messaging systems or use decentralized storage to store messages when recipients are offline. The delivery service interacts with local digital wallet to encrypt outgoing messages using Signal/MLS protocols. The delivery service routes both user-messages and meta-messages that include user-invitation, changes to group membership, etc. The local delivery service also receives incoming messages, decrypt them using double-ratchet algorithm and user’s private keys in the key registry. The incoming messages are then stored in local message store for direct access from the messaging app.

Transport Message-Exchange

The transport message-exchange is a network layer for sending or receiving messages. This component only handles encrypted outgoing and incoming messages without access to private keys. If message recipients are not online, it will use decentralized storage to store and forward messages. It may use protocol bridge to support other protocols such as email/SMTP, XMPP, Matrix, Gossip Protocol, P2P, etc. to synchronize message datasets directly [13, 14].

Protocol Bridge

This component will provide interoperability with other messaging protocols and may use Matrix protocol to bridge with other messaging apps [36, 65].

Local Message Store

The local message-store stores all outgoing and incoming messages that can be viewed by user in messaging app. The local message store keeps messages secured using symmetric encryption using user’s master password and device specific password. The messages will only be displayed after local authentication. The local message-store may subscribe to local delivery service to display new messages when they are received. The user may also specify retention policy for messages to archive or remove old messages.

Local Directory of Contacts

The local directory of contacts keeps metadata about contacts and groups such as name, avatar, etc.

Messaging Adapter

The messaging adapter abstracts core messaging functionality and provides high level APIs that UI can use to manage contacts, groups or send/receive messages.

Self-Destruct Button

In order to protect user’s private data when their phone is stolen or seized, the messaging app may allow users to delete all data that can be triggered remotely using a special poison message, self-destruct timer or poison PIN [39].

High-level Applications

Decentralized messaging with end-to-end encryption is an indispensable component for building decentralized applications. A message can be used to represent data, an event or a command that can be communicated with end-to-end encryption and validate integrity of messages using digital signatures based on public/private keypair. Following section describes a few examples of such decentralized applications that can be built on top of secure messaging:

Social platforms

The social platform such as Facebook, YouTube, Twitter are facing increasingly demands to address wide spread hatred, fake news and conspiracy theories on their platform. The Communications Decency Act, Section 230 protects these platforms from being liable due to user contents but tech companies face increasing pressure to police their contents. These problems were highlighted by Mike Masnick in his article Protocols, Not Platforms: A Technological Approach to Free Speech [6]. He advocated building protocols like Internet protocols such as SMTP, IRC, NNTP and HTTP as opposed to platforms controlled by tech giants. It would delegate responsibility of tolerating speech to end users and controlling contents to the end of network. For example, ActivityPub and OpenSocial are open specification for social network platforms and a number of decentralized social platforms are already growing such as Scuttlebutt, GurlicMastodonPeerTube, Serto, Fediverse, Pixelfed, WriteFreely, Plume, Funkwhale, etc.

Source Code Management

The source code management systems such as Git or Mercurial were designed as distributed and decentralized architecture but commercial offerings such as Github, Atlassian, Gitlab hijacked them by packaging it with additional features such as code-reviews, release management, continuous integration, etc. However, as with any central control systems such services are subject to censorship, e.g.

The decentralized identity, key management system and storage systems discussed in this paper can help revive original vision of these systems. As decentralized systems require smart clients or edge computing, many of additional features such as code-reviews can be built directly into the clients or other decentralized applications. The messaging protocol can be used to trigger an event for integration with other systems, e.g. when a source code is checked, it fires a message to the build system that kicks off build/integration, which in turn can send message(s) to developer group or other systems upon completion or failure.

Audio and Video Communication

A messaging app will be incomplete without audio and video communication but these features can be built on top of messaging protocols with end-to-end encryption.

Collaborative Tools

You can build communication / collaborative tools such as Google docs using a layer of Conflict-free Replicated Data Types (CRDTs) on top of end-to-end encryption [7].

Document Signing

The document signing features such as offered by DocuSign can be implemented using digital identities and signatures primitives used in secure messaging to sign documents safely and share these documents with other stake holders.

IoT Software/Firmware Upgrades

Despite widespread adoption of IoT devices, their software/firmware rarely use strong encryption for upgrades and can be susceptible to hacking. Instead, end-to-end encryption, signed messages by vendor and attachments can be used to securely upgrade software.

Health Privacy

The health privacy laws such as HIPAA can benefit from strong end-to-end encryption so that patients and doctors can safely communicate and share medical records.

Edge Computing

Modern consumer and IoT devices such as smart phones, tablets provide powerful processing and network capabilities that can be used to move computation closer to the devices where user data is stored safely. Edge computing will improve interaction with applications because data is available locally and will provide stronger security.

Conclusion

Secure and confidential messaging is an indispensable necessity to communicate with your family, friends and work colleagues safely. However, most popular messaging apps use centralized architecture to control access to their services. These messaging apps lack trustless security and truly secure systems don’t require trust. The tech companies running these services use network effect as a moat to prevent competition because people want to use the messaging platform that is used by their friends, thus these platforms get gigantic with time. Further, a number of messaging apps collect metadata that is integrated with other products for tracking users, marketing purpose or surveillance. This paper reviews open standards and open source libraries that can be used to build a decentralized messaging system that provides better support for data privacy, censorship, anonymity, deniability and confidentiality. It advocates using protocols rather than building platforms. Unfortunately, protocols are difficult to monetize and slow to adopt new changes so most messaging apps use centralized servers, proprietary APIs and security protocols. Using open standards and libraries will minimize security issues due to bad implementation or customized security protocols. It also helps interoperability so that users can choose any messaging app and securely communicate with their friends. Decentralized architecture prevents reliance on a single company that may cut off your access or disrupt the service due to outage. The open standards help build other decentralized applications such as communication, collaboration and payments on top of messaging. This architecture makes use of powerful devices in hands of most people to store private data instead of storing it in the cloud. Internet was built with decentralized protocols such as Email/SMTP and DNS that allows you to setup a federated server with your domain to process your requests or relay/delegate requests to other federated servers. The messaging apps such as Briar, Matrix, Session and Status already support decentralized architecture, however they have gained a little adaption due to lack of features such as VoIP and difficulty of use. This is why open standards and interoperability is critical because new messaging apps can be developed with better user experience and features. This will be challenging as decentralized systems are harder than centralized ones and security protocols or messaging standards such as Signal protocol or MLS are not specifically designed with decentralized architecture in mind. But the benefits of decentralized architecture outweigh these hardships. Just as you can choose your email provider whether it be GMail, Outlook, or other ISPs and choose any email client to read/write emails, open standards and decentralization empowers users to choose any messaging provider or client. In the end, decentralized messaging systems with end-to-end encryption provide trustless security, confidentiality, better protection of data privacy and a freedom to switch messaging providers or clients when trust of the service is broken such as recent changes to WhatsApp’s privacy policies.

References

  1. Katriel Cohn-Gordon, Cas Cremers, and Luke Garratt. 2016. On post-compromise security. In Computer Security Foundations Symposium (CSF), 2016 IEEE 29th.IEEE, 164–178.
  2. Katriel Cohn-Gordon, Cas Cremers, and Luke Garratt. On Ends-to-Ends Encryption Asynchronous Group Messaging with Strong Security Guarantees.
  3. Ian Goldberg, Berkant Ustaoglu, Matthew Van Gundy, and Hao Chen. 2009. Multi-party off-the-record messaging. In ACM CCS 09. Ehab Al-Shaer, SomeshJha, and Angelos D. Keromytis, (Eds.) ACM Press, (Nov. 2009), 358–368.
  4. The Messaging Layer Security (MLS) Protocol. https://datatracker.ietf.org/doc/draft-ietf-mls-protocol.
  5. Better Encrypted Group Chat. https://blog.trailofbits.com/2019/08/06/better-encrypted-group-chat/.
  6. Protocols, Not Platforms: A Technological Approach to Free Speech, August 21, 2019. https://knightcolumbia.org/content/protocols-not-platforms-a-technological-approach-to-free-speech.
  7. Matthew A. Weidner, Churchill College. Group Messaging for Secure Asynchronous Collaboration (dissertation), June 6, 2019. https://mattweidner.com/acs-dissertation.pdf.
  8. Decentralized Identifiers (DIDs) v1.0. https://w3c.github.io/did-core/.
  9. Microsoft Decentralized identity. https://www.microsoft.com/en-us/security/business/identity/own-your-identity
  10. Vinnie Moscaritolo, Gary Belvin, and Phil Zimmermann. Silent circle instant messaging protocol protocol specification. Technical report, 2012.
  11. The Double Ratchet Algorithm https://www.signal.org/docs/specifications/doubleratchet/.
  12. The X3DH Key Agreement Protocol https://www.signal.org/docs/specifications/x3dh/.
  13. Prince Mahajan, Srinath Setty, Sangmin Lee, Allen Clement, Lorenzo Alvisi, MikeDahlin, and Michael Walfish. Depot: Cloud storage with minimal trust.ACM Trans.Comput. Syst., 29(4):12:1–12:38, December 2011.
  14. Joel Reardon, Alan Kligman, Brian Agala, Ian Goldberg, and David R. Cheriton.KleeQ : Asynchronous key management for dynamic ad-hoc networks. Tech report, 2007.
  15. WhatsApp. WhatsApp encryption overview, 2017. https://www.whatsapp.com/security/WhatsApp-Security-Whitepaper.pdf.
  16. Leaking Data on Intel CPUs via Cache Evictions. https://arxiv.org/pdf/2006.13353.pdf.
  17. How SGX Fails in Practice. https://sgaxe.com/files/SGAxe.pdf.
  18. Paul R ?osler, Christian Mainka, and J ?org Schwenk. More is less: On the end-to-endsecurity of group chats in Signal, WhatsApp, and Threema. In2018 IEEE EuropeanSymposium on Security and Privacy (EuroSP), pages 415–429, April 2018. https://eprint.iacr.org/2017/713.pdf.
  19. Kee Jefferysm, Maxim Shishmarev, Simon Harman. Session: A Model for End-To-End Encrypted Conversations With Minimal Metadata Leakage. February 11, 2020. https://getsession.org/wp-content/uploads/2020/02/Session_Whitepaper.pdf.
  20. Cryptonote 2.0. https://cryptonote.org/whitepaper.pdf.
  21. Large-scale SIM swap fraud. https://securelist.com/large-scale-sim-swap-fraud/90353/.
  22. WhatsApp updates its Terms and Privacy Policy to mandate data-sharing with Facebook. https://www.xda-developers.com/whatsapp-updates-terms-privacy-policy-mandate-data-sharing-facebook/.
  23. Kseniia Ermoshina, Francesca Musiani, Harry Halpin. End-to-End Encrypted Messaging Protocols:An Overview. Third International Conference, INSCI 2016 – Internet Science, Sep 2016, Florence, Italy. pp.244 – 254. https://hal.inria.fr/hal-01426845/file/paper_21.pdf.
  24. A look at Matrix.org’s OLM | MEGOLM encryption protocol. https://blog.jabberhead.tk/2019/03/10/a-look-at-matrix-orgs-olm-megolm-encryption-protocol/.
  25. Telegram publishes users’ locations online. https://blog.ahmed.nyc/2021/01/if-you-use-this-feature-on-telegram.html.
  26. Five Reasons You Should Delete Telegram from Your Phone. https://www.vice.com/en/article/jgqqv8/five-reasons-you-should-delete-telegram-from-your-phone.
  27. Why is Signal asking users to set a PIN, or “A few thoughts on Secure Value Recovery” https://blog.cryptographyengineering.com/2020/07/10/a-few-thoughts-about-signals-secure-value-recovery/.
  28. German police can access any WhatsApp message without any malware. https://androidrookies.com/german-police-can-access-any-whatsapp-message-without-any-malware/.
  29. What’s wrong with WhatsApp. https://www.theguardian.com/technology/2020/jul/02/whatsapp-groups-conspiracy-theories-disinformation-democracy.
  30. Signal secure messaging can now identify you without a phone number. https://nakedsecurity.sophos.com/2020/05/22/signal-secure-messaging-can-now-identify-you-without-a-phone-number/.
  31. WhatsApp gives users an ultimatum: Share data with Facebook or stop using the app. https://arstechnica.com/tech-policy/2021/01/whatsapp-users-must-share-their-data-with-facebook-or-stop-using-the-app/.
  32. Attack of the Week: Group Messaging in WhatsApp and Signal. https://blog.cryptographyengineering.com/2018/01/10/attack-of-the-week-group-messaging-in-whatsapp-and-signal/.
  33. Kevin Lee, Ben Kaiser, Jonathan Mayer. An Empirical Study of Wireless Carrier Authentication for SIM Swaps, January 10, 2020. https://www.issms2fasecure.com/assets/sim_swaps-01-10-2020.pdf.
  34. Study finds five major US carriers vulnerable to SIM-swapping tactics. https://www.engadget.com/2020-01-12-princeton-study-sim-swap.html.
  35. Reverse Engineering WhatsApp Encryption For Chat Manipulation And More. https://i.blackhat.com/USA-19/Wednesday/us-19-Zaikin-Reverse-Engineering-WhatsApp-Encryption-For-Chat-Manipulation-And-More.pdf.
  36. Matrix protocol. https://matrix.org/docs/spec/.
  37. Analyzing WhatsApp Calls with Wireshark, radare2 and Frida. https://medium.com/@schirrmacher/analyzing-whatsapp-calls-176a9e776213.
  38. WhatsApp white paper. https://www.whatsapp.com/security/.
  39. Every secure messaging app needs a self-destruct button. https://techcrunch.com/2019/06/12/every-secure-messaging-app-needs-a-self-destruct-button/.
  40. Chinese Cyberattack Hits Telegram, App Used by Hong Kong Protesters. https://www.nytimes.com/2019/06/13/world/asia/hong-kong-telegram-protests.html.
  41. Stop Using Encrypted Email. https://latacora.micro.blog/2020/02/19/stop-using-encrypted.html.
  42. Session: An Open Source Private Messenger That Doesn’t Need Your Phone Number. https://itsfoss.com/session-messenger/.
  43. Status.im Whitepaper. https://status.im/whitepaper.pdf.
  44. Peer-to-Peer Messaging – Where Whisper Falls Short and Waku Picks Up. https://our.status.im/peer-to-peer-messaging-where-whisper-falls-short-and-waku-picks-up/.
  45. WhatsApp Security Flaws Could Allow Snoops to Slide Into Group Chats. https://www.wired.com/story/whatsapp-security-flaws-encryption-group-chats/.
  46. Status.im Specs. https://status.im/research/secure_messaging_compendium.html.
  47. Germany’s data chief tells ministries WhatsApp is a no-go. https://www.dw.com/en/germanys-data-chief-tells-ministries-whatsapp-is-a-no-go/a-53474413.
  48. Abusing WebRTC to Reveal Coarse Location Data in Signal. https://medium.com/tenable-techblog/turning-signal-app-into-a-coarse-tracking-device-643eb4298447.
  49. We Chat, They Watch. https://citizenlab.ca/2020/05/we-chat-they-watch/.
  50. WhatsApp Beaten By Apple’s New iMessage Privacy Update. https://www.forbes.com/sites/zakdoffman/2021/01/03/whatsapp-beaten-by-apples-new-imessage-update-for-iphone-users.
  51. Briar Messaging app. https://briarproject.org/how-it-works/.
  52. I don’t trust Signal. https://drewdevault.com/2018/08/08/Signal.html.
  53. Hardware Security Module (HSM). https://copperhead.co/blog/secure-phone-series-device-security/.
  54. The Great iPwn Journalists Hacked with Suspected NSO Group iMessage ‘Zero-Click’ Exploit. https://citizenlab.ca/2020/12/the-great-ipwn-journalists-hacked-with-suspected-nso-group-imessage-zero-click-exploit/.
  55. Status Perfect Forward Secrecy Whitepaper. https://status.im/technical/pfs.html.
  56. Peer to Peer Communication Library. https://libp2p.io/.
  57. Session Protocol: Technical implementation details. https://getsession.org/session-protocol-technical-information/.
  58. Signal: Private Group Messaging. https://signal.org/blog/private-groups/.
  59. Apple’s privacy labels reveals Whatsapp and Facebook Messenger’s hunger for user data. https://www.techradar.com/news/apples-privacy-labels-reveals-whatsapp-and-facebook-messengers-hunger-for-user-data.
  60. Authoritarianism Through Coding: Signal. https://www.oyd.org.tr/en/articles/signal/.
  61. Why we need wide adoption of social recovery wallets. https://vitalik.ca/general/2021/01/11/recovery.html.
  62. Cryptography Dispatches: The Most Backdoor-Looking Bug I’ve Ever Seen. https://buttondown.email/cryptography-dispatches/archive/cryptography-dispatches-the-most-backdoor-looking/.
  63. On Privacy versus Freedom. https://matrix.org/blog/2020/01/02/on-privacy-versus-freedom.
  64. Michael Egorov, MacLane Wilkison, David Nunez. NuCypher KMS: Decentralized key management system, November 15, 2017. https://arxiv.org/pdf/1707.06140.pdf.
  65. Bridging Matrix with WhatsApp running on a VM. https://matrix.org/docs/guides/whatsapp-bridging-mautrix-whatsapp.
  66. Issue 1936: Signal: RTP is processed before call is answered. https://bugs.chromium.org/p/project-zero/issues/detail?id=1936.
  67. Apple can’t read your device data, but it can read your backups. https://www.theverge.com/2020/1/21/21075033/apple-icloud-end-to-end-encryption-scrapped-fbi-reuters-report.
  68. Signal outage is keeping messages from sending. https://www.theverge.com/2021/1/15/22232993/signal-outage-new-users-messages-not-sending.
  69. We can do better than Signal. https://icyphox.sh/blog/signal.
  70. Fediverse. https://medium.com/@VirtualAdept/a-friendly-introduction-to-the-fediverse-5b4ef3f8ed0e.
  71. Centralisation is a danger to democracy. https://redecentralize.org/blog/2021/01/18/centralization-is-a-danger-to-democracy.
  72. The Secure Messaging App Conundrum: Signal vs. Telegram. https://cqi.inf.usi.ch/publications/telegram_vs_signal.pdf.
  73. The battle inside Signal. https://www.platformer.news/p/-the-battle-inside-signal.
  74. The State of State Machines. https://googleprojectzero.blogspot.com/2021/01/the-state-of-state-machines.html.
  75. Signal’s TLS Proxy Failed to be Probing Resistant. https://github.com/net4people/bbs/issues/60.
  76. Signal’s proxy implementation. https://github.com/net4people/bbs/issues/63.
  77. Can The FBI Hack Into Private Signal Messages On A Locked iPhone? Evidence Indicates Yes. https://www.forbes.com/sites/thomasbrewster/2021/02/08/can-the-fbi-can-hack-into-private-signal-messages-on-a-locked-iphone-evidence-indicates-yes/.
  78. Signal Server is effectively closed source software right now. https://lemmy.ml/post/55595.
  79. WhatsApp and most alternatives share the same problem. https://stuker.com/2021/whatsapp-and-most-alternatives-share-the-same-problem/.
  80. Signal is a government op. https://yasha.substack.com/p/signal-is-a-government-op-85e.
  81. Cryptographers unearth vulnerabilities in Telegram’s encryption protocol. https://www.cyberscoop.com/telegram-app-security-encryption.
  82. Improved E2E Encryption. https://www.groupsapp.online/post/improved-e2e-encryption.
  83. Why not Signal? https://dessalines.github.io/essays/why_not_signal.html.
  84. interview-with-signals-new-president https://www.schneier.com/blog/archives/2022/10/interview-with-signals-new-president.html/#comment-411335.
  85. I don’t trust Signal https://blog.dijit.sh/i-don-t-trust-signal.

December 26, 2020

Applying Structured Concurrency patterns in GO with case-study of Gitlab-runner

Filed under: Concurrency,GO — admin @ 9:31 pm

I recently wrote a series of blogs on structured concurrency (Part-I, Part-II, Part-III, Part-IV) and how it improves readability, concurrency-scope, composition and flow-control of concurrent code and adds better support for error-handling, cancellation, and timeout. I have been using GO on a number of projects over last few years and I will share a few concurrency patterns that I have used or seen in other projects such as gitlab-runner. I demonstrated in above series how structured concurrency considers GO statement harmful similar to GOTO in structured programming. Just as structured programming replaced GOTO with control-flow primitives such as single-entry/exit, if-then, loop, and functions calls; structured concurrency provides scope of concurrency where parent waits for all asynchronous code. I will show how common concurrency patterns in GO can take advantage of structured concurrency.

Asynchronous Tasks

The primary purpose of goroutines is to perform asynchronous tasks where you might be requesting a data from remote API or database, e.g. here is a sample code from gitlab-runner that uses goroutines to copy archive artifacts:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
	waitCh := make(chan error)
	go func() {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
		waitCh <- err
	}()

	select {
	case <-ctx.Done():
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
		return <-waitCh

	case err := <-waitCh:
		return err
	}
}

Here is a async/await based syntax based on async_await.go that performs similar task using structured concurrency:

func (s *Client) Run(ctx context.Context, cmd Command) error {
...
    ctx := context.Background()
    timeout := ..
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
		err := session.Wait()
		if _, ok := err.(*ssh.ExitError); ok {
			err = &ExitError{Inner: err}
		}
        return nil, err
    }
    abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
		_ = session.Signal(ssh.SIGKILL)
		_ = session.Close()
    }
    async.Execute(ctx, handler, abort).Await(ctx, timeout)
}

Above code defines scope of concurrency and adds support for timeout while making the code easier to comprehend.

Note: due to lack of generics in GO, interface{} are used to accept different types that may be passed to asynchronous tasks.

Racing Asynchronous Tasks

A common use of goroutines is spawning multiple asynchronous tasks and takes result of first task that completes e.g. here is a sample code from gitlab-runner that uses goroutines to copy stdout and stderr:

	stdoutErrCh := make(chan error)
	go func() {
		_, errCopy := stdcopy.StdCopy(output, output, hijacked.Reader)
		stdoutErrCh <- errCopy
	}()

	// Write the input to the container and close its STDIN to get it to finish
	stdinErrCh := make(chan error)
	go func() {
		_, errCopy := io.Copy(hijacked.Conn, input)
		_ = hijacked.CloseWrite()
		if errCopy != nil {
			stdinErrCh <- errCopy
		}
	}()

	// Wait until either:
	// - the job is aborted/cancelled/deadline exceeded
	// - stdin has an error
	// - stdout returns an error or nil, indicating the stream has ended and
	//   the container has exited
	select {
	case <-ctx.Done():
		err = errors.New("aborted")
	case err = <-stdinErrCh:
	case err = <-stdoutErrCh:
	}

Above code creates stdoutErrCh channel to capture errors from stdout and stdinErrCh channel to capture errors from stderr and then waits for either to finish.

Here is equivalent code that uses structured concurrency with async/await primitives from async_racer.go:

    ctx := context.Background()
    timeout := ..
    pollin
    handler1 := func(ctx context.Context) (interface{}, error) {
        return nil, stdcopy.StdCopy(output, output, hijacked.Reader)
    }
    handler2 := func(ctx context.Context) (interface{}, error) {
		defer hijacked.CloseWrite()
        return nil, io.Copy(hijacked.Conn, input)
    }
    future, _ := async. ExecuteRacer(ctx, handler1, handler2)
     _, err := future.Await(ctx, timeout)

Above code uses async/await syntax to define scope of concurrency and clarifies intent of the business logic without distraction of concurrency logic.

Performing cleanup when task is aborted or cancelled

If a goroutine spawns an external process for background work, you may need to kill that process in case goroutine task is cancelled or times out. For example, here is a sample code from gitlab-runner that calls KillAndWait function to terminate external process when context.Done() is invoked:

   func (c *command) Run() error {
       err := c.cmd.Start()
       if err != nil {
           return fmt.Errorf("failed to start command: %w", err)
       }

       go c.waitForCommand()

       select {
       case err = <-c.waitCh:
           return err

       case <-c.context.Done():
           return newProcessKillWaiter(c.logger, c.gracefulKillTimeout, c.forceKillTimeout).
               KillAndWait(c.cmd, c.waitCh)
       }
   }

In above, Run method starts a command in goroutine, waits for completion in another goroutine and then listens to response from waitCh and context.Done channels.

Here is how async/await structure from async_await.go can apply structured concurrency to above code:

   func (c *command) Run() error {
       timeout := ...
       ctx := context.Background()
       handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, c.cmd.Run()
       }
       abort := func(ctx context.Context, payload interface{}) (interface{}, error) {
         return nil, KillAndWait(c.cmd, c.waitCh)
       }
       Execute(ctx, handler, abort, nil).Await(ctx, timeout)
   }

Using GO Channels as data pipe/queue

GO channels are designed based on CSP rendezvous primitives where both sender and receiver have to wait to exchange messages. However, you can add buffering to make these channels as bounded queue (for back-pressure). Here is an example code from gitlab-runner that uses channels to stream log messages:

 func (l *kubernetesLogProcessor) scan(ctx context.Context, logs io.Reader) (*bufio.Scanner, <-chan string) {
     logsScanner := bufio.NewScanner(logs)

     linesCh := make(chan string)
     go func() {
         defer close(linesCh)

         // This goroutine will exit when the calling method closes the logs stream or the context is cancelled
         for logsScanner.Scan() {
             select {
             case <-ctx.Done():
                 return
             case linesCh <- logsScanner.Text():
             }
         }
     }()

     return logsScanner, linesCh
 }

Above code creates a channel linesCh without any buffer and then creates a goroutine where logs are read and sent to linesCh channel. As I mentioned, above code will block sender until the receiver is ready to receive these log messages and you may lose log messages if receiver is slow and goroutine is killed before logs can be read. Though, structured concurrency cannot help in this case but we can simply use an event-bus or a local message-queue to stream these logs.

WaitGroup to wait for completion of goroutines

GO language supports sync.WaitGroup to wait for completion of goroutines but it’s redundant if you are also using channels to receive for reply. For example, here is a sample code from gitlab-runner that uses WaitGroup to wait for completion of groutines:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
         wg := sync.WaitGroup{}
         for _, service := range e.services {
             wg.Add(1)
             go func(service *types.Container) {
                 _ = e.waitForServiceContainer(service, time.Duration(waitForServicesTimeout)*time.Second)
                 wg.Done()
             }(service)
         }
         wg.Wait()
     }
 }

Here is how above code can be replaced by async/await syntax from my async_await.go:

 func (e *executor) waitForServices() {
...
     // wait for all services to came up
     if waitForServicesTimeout > 0 && len(e.services) > 0 {
        ctx := context.Background()
        timeout := time.Duration(waitForServicesTimeout)*time.Second
        futures := make([]async.Awaiter, len(e.services))
        handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
          return e.waitForServiceContainer(payload.(string)), nil
        }
        for i:=0; i<len(e.services); i++ {
          futures[i] = async.Execute(ctx, handler, async.NoAbort, e.services[i])
	    }
        async.AwaitAll(ctx, timeout, futures...)
     }
 }

Above code is not shorter than the original but it is more readable and eliminates subtle bugs where you might be using WaitGroup incorrectly, thus resulting in deadlock.

Fork-Join based Asynchronous Tasks

One common use of goroutines is to spawn multiple asynchronous tasks and wait for their completion similar to fork-join pattern, e.g. here is a sample code from gitlab-runner that uses goroutines to perform cleanup of multiple services and sends back result via a channel.

func (s *executor) cleanupServices() {
	ch := make(chan serviceDeleteResponse)
	var wg sync.WaitGroup
	wg.Add(len(s.services))

	for _, service := range s.services {
		go s.deleteKubernetesService(service.ObjectMeta.Name, ch, &wg)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for res := range ch {
		if res.err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.err))
		}
	}
}

func (s *executor) deleteKubernetesService(serviceName string, ch chan<- serviceDeleteResponse, wg *sync.WaitGroup) {
	defer wg.Done()

	err := s.kubeClient.CoreV1().
		Services(s.configurationOverwrites.namespace).
		Delete(serviceName, &metav1.DeleteOptions{})
	ch <- serviceDeleteResponse{serviceName: serviceName, err: err}
}

The cleanupServices method above goes through a collection of services and then calls deleteKubernetesService in goroutine, which calls kubernetes API to remove given service. It waits for all background using WaitGroup and then receives any errors from error channel and logs them.

Here is how you can use async/await code from async_await.go that applies structured concurrency by abstracting low-level goroutines and channels:

func (s *executor) cleanupServices() {
    ctx := context.Background()
    timeout := time.Duration(5 * time.Second)
    futures := make([]async.Awaiter, len(s.services))
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
         serviceName := payload.(string)
         err := s.kubeClient.CoreV1().
      		Services(s.configurationOverwrites.namespace).
			Delete(serviceName, &metav1.DeleteOptions{})
         return nil, err
    }
    for i:=0; i<len(s.services); i++ {
        futures[i] = async.Execute(ctx, handler, async.NoAbort, s.services[i])
	}
    results := async.AwaitAll(ctx, timeout, futures...)
	for res := range results {
		if res.Err != nil {
			s.Errorln(fmt.Sprintf("Error cleaning up the pod service %q: %v", res.serviceName, res.Err))
		}
	}
}

Above code uses structured concurrency by defining scope of asynchronous code in cleanupServices and adds better support for cancellation, timeout and error handling. Also, you don’t need to use WaitGroup to wait for completion anymore.

Polling Asynchronous Task

In some cases, you may need to poll a background task to check its status or wait for its completion, e.g. here is a sample code from gitlab-runner that waits for pod until it’s running:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
	pollInterval := config.GetPollInterval()
	pollAttempts := config.GetPollAttempts()
	for i := 0; i <= pollAttempts; i++ {
		select {
		case r := <-triggerPodPhaseCheck(c, pod, out):
			if !r.done {
				time.Sleep(time.Duration(pollInterval) * time.Second)
				continue
			}
			return r.phase, r.err
		case <-ctx.Done():
			return api.PodUnknown, ctx.Err()
		}
	}
	return api.PodUnknown, errors.New("timed out waiting for pod to start")
}

func triggerPodPhaseCheck(c *kubernetes.Clientset, pod *api.Pod, out io.Writer) <-chan podPhaseResponse {
	errc := make(chan podPhaseResponse)
	go func() {
		defer close(errc)
		errc <- getPodPhase(c, pod, out)
	}()
	return errc
}

The waitForPodRunning method above repeatedly calls triggerPodPhaseCheck, which creates a goroutine and then invokes kubernetes API to get pod status. It then returns pod status in a channel that waitForPodRunning listens to.

Here is equivalent code using async/await from async_polling.go:

func waitForPodRunning(
	ctx context.Context,
	c *kubernetes.Clientset,
	pod *api.Pod,
	out io.Writer,
	config *common.KubernetesConfig,
) (api.PodPhase, error) {
    timeout := config.getPodRunningTimeout()
    pollInterval := config.GetPollInterval()
    handler := func(ctx context.Context, payload interface{}) (bool, interface{}, error) {
         r := getPodPhase(c, pod, out)
         return r.done, r.phase, r.err
    }
    _, phase, err := async.ExecutePolling(ctx, handler, NoAbort, 0, pollInterval).
  						Await(ctx, timeout)
	return r.phase, r.err
}

Above code removes complexity due to manually polling and managing goroutines/channels.

Background Task with watchdog

Another concurrency pattern in GO involves starting a background task but then launch another background process to monitor the task or its runtime environment so that it can terminate background task if watchdog finds any errors. For example, here is a sample code from gitlab-runner that executes a command inside kubernetes pod and then it launches another goroutine to monitor pod status, i.e.,

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...
...

	podStatusCh := s.watchPodStatus(ctx)

	select {
	case err := <-s.runInContainer(containerName, containerCommand):
		s.Debugln(fmt.Sprintf("Container %q exited with error: %v", containerName, err))
		var terminatedError *commandTerminatedError
		if err != nil && errors.As(err, &terminatedError) {
			return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
		}

		return err
	case err := <-podStatusCh:
		return &common.BuildError{Inner: err}
	case <-ctx.Done():
		return fmt.Errorf("build aborted")
	}
}

func (s *executor) watchPodStatus(ctx context.Context) <-chan error {
	// Buffer of 1 in case the context is cancelled while the timer tick case is being executed
	// and the consumer is no longer reading from the channel while we try to write to it
	ch := make(chan error, 1)

	go func() {
		defer close(ch)

		t := time.NewTicker(time.Duration(s.Config.Kubernetes.GetPollInterval()) * time.Second)
		defer t.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-t.C:
				err := s.checkPodStatus()
				if err != nil {
					ch <- err
					return
				}
			}
		}
	}()

	return ch
}

func (s *executor) runInContainer(name string, command []string) <-chan error {
	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)

		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		err := retryable.Run()
		if err != nil {
			errCh <- err
		}

		exitStatus := <-s.remoteProcessTerminated
		if *exitStatus.CommandExitCode == 0 {
			errCh <- nil
			return
		}

		errCh <- &commandTerminatedError{exitCode: *exitStatus.CommandExitCode}
	}()

	return errCh
}

In above code, runWithAttach calls watchPodStatus to monitor status of the pod in background goroutine and then executes command in runInContainer.

Here is equivalent code using async/await from async_watchdog.go:

func (s *executor) runWithAttach(cmd common.ExecutorCommand) error {
...
	ctx, cancel := context.WithCancel(cmd.Context)
	defer cancel()
    containerName = ...
    containerCommand = ...

...
    handler := func(ctx context.Context, payload interface{}) (interface{}, error) {
      return nil, s.runInContainer(containerName, containerCommand)
    }
    watchdogHandler := func(ctx context.Context, payload interface{}) error {
         return nil, s.checkPodStatus()
    }

    res, err := async.ExecuteWatchdog(ctx, handler, watchdogHandler, async.NoAbort, nil, poll).
  				Await(ctx, timeout)
	if err != nil {
		return &common.BuildError{Inner: err, ExitCode: terminatedError.exitCode}
	}
}

func (s *executor) runInContainer(name string, command []string) error {
		...
		retryable := retry.New(retry.WithBuildLog(&attach, &s.BuildLogger))
		return retryable.Run()
}

Above code removes extraneous complexity due to concurrent code embedded with functional code and makes it easier to comprehend with improved support of concurrency scope, error handling, timeout and cancellation.

Other accidental complexity in Gitlab-Runner

Besides concurrency, here are a few other design choices that adds accidental complexity in gitlab-runner:

Abstraction

The primary goal of gitlab-runner is to abstract executor framework so that it can use different platforms such as Docker, Kubernetes, SSH, Shell, etc to execute processes. However, it doesn’t define an interface for common behavior such as managing runtime containers or executing a command for these executors.

Adapter/Gateway pattern

A common pattern to abstract third party library or APIs is to use an adapter or gateway pattern but gitlab-runner mixes external APIs with internal executor logic. The kubernetes executor in gitlab-runner defines logic for both interacting with external Kubernetes server and managing Kubernetes Pod or executing processes inside those pods. For example, my initial intent for looking at the gitlab-runner was to adopt APIs for interacting with Kubernetes but I could not reuse any code as a library and instead I had to copy relevant code for my use-case.

Separation of Concerns

The gitlab-runner is not only riddled with concurrency related primitives such as goroutines and channels but it also mixes other aspects such as configuration, feature-flags, logging, monitoring, etc. For example, it uses configurations for defining containers, services, volumes for Kubernetes but it hard codes various internal configurations for build, helpers, monitoring containers instead of injecting them via external configuration. Similarly, it hard codes volumes for repo, cache, logging, etc. A common design pattern in building software is to use layers of abstractions or separation of concerns where each layer or a module is responsible for a single concern (single-responsibility principle). For example, you can divide executors into three layers: adapter-layer, middle-layer and high-level layer where adapter layer strictly interacts with underlying platform such as Kubernetes and no other layer needs to know internal types or APIs of Kubernetes. The middle layer uses adapter layer to delegate any executor specific behavior and defines methods to configure containers. The high-level layer is responsible for injecting dependent services, containers and configurations to execute jobs within the gitlab environment. I understand some of these abstractions might become leaky if there are significant differences among executors but such design allows broader reuse of lower-level layers in other contexts.

Summary

Though, modern scalable and performant applications demand concurrency but sprinkling low-level concurrency patterns all over your code adds significant accidental or extraneous complexity to your code. In this blog, I demonstrated how encapsulating concurrent code with library of structured concurrency patterns can simplify your code. Concurrency is hard especially with lower-level primitives such as WaitGroup, Mutex, goroutines, channels in GO. In addition, incorrect use of these concurrency primitives can lead to deadlocks or race conditions. Using a small library for abstracting common concurrency patterns can reduce probability of concurrency related bugs. Finally, due to lack of immutability, strong ownership, generics or scope of concurrency in GO, you still have to manage race conditions and analyze your code carefully for deadlocks but applying structured concurrency can help manage concurrent code better.

November 17, 2020

Structured Concurrency in modern programming languages – Part-IV

Filed under: Computing,Kotlin,Languages,Swift,Technology — admin @ 12:58 pm

In this fourth part of the series on structured concurrency (Part-I, Part-II, Part-III, Swift-Followup), I will review Kotlin and Swift languages for writing concurrent applications and their support for structured concurrency:

Kotlin

Kotlin is a JVM language that was created by JetBrains with improved support of functional and object-oriented features such as extension functions, nested functions, data classes, lambda syntax, etc. Kotlin also uses optional types instead of null references similar to Rust and Swift to remove null-pointer errors. Kotlin provides native OS-threads similar to Java and coroutines with async/await syntax similar to Rust. Kotlin brings first-class support for structured concurrency with its support for concurrency scope, composition, error handling, timeout/cancellation and context for coroutines.

Structured Concurrency in Kotlin

Kotlin provides following primitives for concurrency support:

suspend functions

Kotlin adds suspend keyword to annotate a function that will be used by coroutine and it automatically adds continuation behavior when code is compiled so that instead of return value, it calls the continuation callback.

Launching coroutines

A coroutine can be launched using launch, async or runBlocking, which defines scope for structured concurrency. The lifetime of children coroutines is attached to this scope that can be used to cancel children coroutines. The async returns a Deferred (future) object that extends Job. You use await method on the Deferred instance to get the results.

Dispatcher

Kotlin defines CoroutineDispatcher to determine thread(s) for running the coroutine. Kotlin provides three types of dispatchers: Default – that are used for long-running asynchronous tasks; IO – that may use IO/network; and Main – that uses main thread (e.g. on Android UI).

Channels

Kotlin uses channels for communication between coroutines. It defines three types of channels: SendChannel, ReceiveChannel, and Channel. The channels can be rendezvous, buffered, unlimited or conflated where rendezvous channels and buffered channels behave like GO’s channels and suspend send or receive operation if other go-routine is not ready or buffer is full. The unlimited channel behave like queue and conflated channel overwrites previous value when new value is sent. The producer can close send channel to indicate end of work.

Using async/await in Kotlin

Following code shows how to use async/await to build the toy web crawler:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerWithAsync(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls using async/await
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()

        withTimeout(timeout) {
            val jobs = mutableListOf<Deferred<Int>>()
            for (u in urls) {
                jobs.add(async {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                })
            }
            for (j in jobs) {
                j.await()
            }
        }
        return res.completed(size.get())
    }
}

In above example, CrawlerWithAsync class defines timeout parameter for crawler. The crawl function takes list of URLs to crawl and defines high-level scope of concurrency using runBlocking. The private crawl method is defined as suspend so that it can be used as continuation. It uses async with timeout to start background tasks and uses await to collect results. This method recursively calls handleCrawl to crawl child URLs.

Following unit tests show how to test above crawl method:

package concurrency

import org.junit.Test
import org.slf4j.LoggerFactory
import kotlin.test.assertEquals

class CrawlerAsynTest {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutinesTest::class.java)
    val urls = listOf("a.com", "b.com", "c.com", "d.com", "e.com", "f.com",
            "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com")

    @Test
    fun testCrawl() {
        val crawler = CrawlerWithAsync(4, 1000L)
        val started = System.currentTimeMillis()
        val res = crawler.crawl(urls);
        val duration = System.currentTimeMillis() - started
        logger.info("CrawlerAsync - crawled %d urls in %d milliseconds".format(res.childURLs, duration))
        assertEquals(19032, res.childURLs)
    }

    @Test(expected = Exception::class)
    fun testCrawlWithTimeout() {
        val crawler = CrawlerWithAsync(1000, 100L)
        crawler.crawl(urls);
    }
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/kot_pool.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • Kotlin supports cancellation and timeout APIs and the crawl method will fail with timeout error if crawling exceeds the time limit.
  • The crawl method captures error from async response and returns so that client code can perform error handling.
  • The async syntax in Kotlin allows easy composition of asynchronous code.
  • Kotlin allows customized dispatcher for more control on the asynchronous behavior.

Following are shortcomings using this approach for structured concurrency and general design:

  • As Kotlin doesn’t enforce immutability by default, you will need synchronization to protect shared state.
  • Async/Await support is still new in Kotlin and lacks stability and proper documentation.
  • Above design creates a new coroutine for crawling each URL and it can strain expensive network and IO resources so it’s not practical for real-world implementation.

Using coroutines in Kotlin

Following code uses coroutine syntax to implement the web crawler:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerWithCoroutines(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls using coroutines
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()
        withTimeout(timeout) {
            for (u in urls) {
                coroutineScope {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                }
            }
        }
        return res.completed(size.get())
    }
}

Above example is similar to async/await but uses coroutine syntax and its behavior is similar to async/await implementation.

Following example shows how async coroutines can be cancelled:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerCancelable(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls to show cancel operation
    // internal method will call cancel instead of await so this method will
    // fail.
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    ////////////////// Internal methods
    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()

        withTimeout(timeout) {
            val jobs = mutableListOf<Deferred<Int>>()
            for (u in urls) {
                jobs.add(async {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                })
            }
            for (j in jobs) {
                j.cancel()
            }
        }
        return res.completed(size.get())
    }
}

You can download above examples from https://github.com/bhatti/concurency-katas/tree/main/kot_pool.

Swift

Swift was developed by Apple to replace Objective-C and offer modern features such as closures, optionals instead of null-pointers (similar to Rust and Kotlin), optionals chaining, guards, value types, generics, protocols, algebraic data types, etc. It uses same runtime system as Objective-C and uses automatic-reference-counting (ARC) for memory management, grand-central-dispatch for concurrency and provides integration with Objective-C code and libraries.

Structured Concurrency in Swift

I discussed concurrency support in Objective-C in my old blog [1685] such as NSThread, NSOperationQueue, Grand Central Dispatch (GCD), etc and since then GCD has improved launching asynchronous tasks using background queues with timeout/cancellation support. However, much of the Objective-C and Swift code still suffers from callbacks and promises hell discussed in Part-I. Chris Lattner and Joe Groff wrote a proposal to add async/await and actor-model to Swift and provide first-class support for structured concurrency. As this work is still in progress, I wasn’t able to test it but here are major features of this proposal:

Coroutines

Swift will adapt coroutines as building blocks of concurrency and asynchronous code. It will add syntactic sugar for completion handlers using async or yield keywords.

Async/Await

Swift will provide async/await syntactic sugar on top of coroutines to mark asynchronous behavior. The async code will use continuations similar to Kotlin so that it suspends itself and schedules execution by controlling context. It will use Futures (similar to Deferred in Kotlin) to await for the results (or errors). This syntax will work with normal error handling in Swift so that errors from asynchronous code are automatically propagated to the calling function.

Actor model

Swift will adopt actor-model with value based messages (copy-on-write) to manage concurrent objects that can receive messages asynchronously and the actor can keep internal state and eliminate race conditions.

Kotlin and Swift are very similar in design and both have first-class support of structured concurrency such as concurrency scope, composition, error handling, cancellation/timeout, value types, etc. Both Kotlin and Swift use continuation for async behavior so that async keyword suspends the execution and passes control to the execution context so that it can be executed asynchronously and control is passed back at the end of execution.

Structured Concurrency Comparison

Following table summarizes support of structured concurrency discussed in this blog series:

FeatureTypescript (NodeJS)ErlangElixirGORustKotlinSwift
Structured scopeBuilt-inmanuallymanuallymanuallyBuilt-inBuilt-inBuilt-in
Asynchronous CompositionYesNoNoNoYesYesYes
Error HandlingNatively using ExceptionsManually storing errors in ResponseManually storing errors in ResponseManually storing errors in ResponseManually using Result ADTNatively using ExceptionsNatively using Exceptions
CancellationCooperative CancellationBuilt-in Termination or Cooperative CancellationBuilt-in Termination or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative Cancellation
TimeoutNoYesYesYesYesYesYes
Customized Execution ContextNoNoNoNoNoYesYes
Race ConditionsNo due to NodeJS architectureNo due to Actor modelNo due to Actor modelPossible due to shared stateNo due to strong ownershipPossible due to shared statePossible due to shared state
Value TypesNoYesYesYesYesYesYes
Concurrency paradigmsEvent loopActor modelActor modelGo-routine, CSP channelsOS-Thread, coroutineOS-Thread, coroutine, CSP channelsOS-Thread,
GCD queues, coroutine, Actor model
Type CheckingStaticDynamicDynamicStatic but lacks genericsStrongly static types with genericsStrongly static types with genericsStrongly static types with generics
Suspends Async code using Continuations NoNoNoNoYesYesYes
Zero-cost based abstraction ( async)NoNoNoNoYesNoNo
Memory ManagementGCGC
(process-scoped)
GC (process-scoped)GC(Automated) Reference counting, BoxingGCAutomated reference counting

Performance Comparison

Following table summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

Summary

Overall, Typescript/NodeJS provides a simpler model for concurrency but lacks proper timeout/cancellation support and it’s not suitable for highly concurrent applications that require blocking APIs. The actor based concurrency model in Erlang/Elixir supports high-level of concurrency, error handling, cancellation and isolates process state to prevent race conditions but it lacks composing asynchronous behavior natively. Though, you can compose Erlang processes with parent-child hierarchy and easily start or stop these processes. GO supports concurrency via go-routines and channels with built-in cancellation and timeout APIs but GO statements are considered harmful by structured concurrency and it doesn’t protect against race conditions due to mutable state. Erlang and GO are the only languages that were designed from ground-up with support for actors and coroutines and their schedulers have strong support for asynchronous IO and non-blocking APIs. Erlang also offers process-scoped garbage collection to clean up related data easily as opposed to global GC in other languages. The async/await support in Rust is still immature and lacks proper support of cancellation but strong ownership properties of Rust eliminate race conditions and allows safe concurrency. Rust, Kotlin and Swift uses continuation for async/await that allows composition for multiple async/await chained together. For example, instead of using await (await download()).parse() in Javascript/Typescript/C#, you can use await download().parse(). The async/await changes are still new in Kotlin and lack stability whereas Swift has not yet released these changes as part of official release. As Kotlin, Rust, and Swift built coroutines or async/await on top of existing runtime and virtual machine, their green-thread schedulers are not as optimal as schedulers in Erlang or GO and may exhibit limitations on concurrency and scalability.

Finally, structured concurrency helps your code structure with improved data/control flow, concurrency scope, error handling, cancellation/timeout and composition but it won’t solve data races if multiple threads/coroutines access mutable shared data concurrently so you will need to rely on synchronization mechanisms to protect the critical section.

Note: You can download all examples in this series from https://github.com/bhatti/concurency-katas.

November 10, 2020

Structured Concurrency in modern programming languages – Part-III

Filed under: Computing,Languages,Technology — admin @ 4:23 pm

In this third part of the series on structured concurrency (Part-I, Part-II, Part-IV, Swift-Followup), I will review GO and Rust languages for writing concurrent applications and their support for structured concurrency:

GO

GO language was created by Rob Pike, and Ken Thompson and uses light-weight go-routines for asynchronous processing. Go uses channels for communication that are designed after Tony Hoare’s rendezvous style communicating sequential processes (CSP) where the sender cannot send the message until receiver is ready to accept it. Though, GO supports buffering for channels so that sender/receiver don’t have to wait if buffer is available but channels are not designed to be used as mailbox or message queue. The channels can be shared by multiple go-routines and the messages can be transmitted by value or by reference. GO doesn’t protect against race conditions and shared state must be protected when it’s accessed in multiple go-routines. Also, if a go-routine receives a message by reference, it must be treated as transfer of ownership otherwise it can lead to race conditions. Also, unlike Erlang, you cannot monitor lifetime of other go-routines so you won’t be notified if a go-routine exits unexpectedly.

Following is high-level architecture of scheduling and go-routines in GO process:

Using go-routines/channels in GO

GO doesn’t support async/await syntax but it can be simulated via go-routine and channels. As the cost of each go-routine is very small, you can use them for each background task.

Following code shows how to use go-routines and channels to build the toy web crawler:

package async

import (
	"context"
	"fmt"
	"time"
)

// type of async function
type Handler func(ctx context.Context, request interface{}) (interface{}, error)

// type of abortHandler function that is called if async operation is cancelled
type AbortHandler func(ctx context.Context, request interface{}) (interface{}, error)

func NoAbort(ctx context.Context, request interface{}) (interface{}, error) {
	return nil, nil
}

// Awaiter - defines method to wait for result
type Awaiter interface {
	Await(ctx context.Context, timeout time.Duration) (interface{}, error)
	IsRunning() bool
}

// task - submits task asynchronously
type task struct {
	handler      Handler
	abortHandler AbortHandler
	request      interface{}
	resultQ      chan Response
	running      bool
}

// Response encapsulates results of async task
type Response struct {
	Result interface{}
	Err    error
}

// Execute executes a long-running function in background and returns a future to wait for the response
func Execute(
	ctx context.Context,
	handler Handler,
	abortHandler AbortHandler,
	request interface{}) Awaiter {
	task := &task{
		request:      request,
		handler:      handler,
		abortHandler: abortHandler,
		resultQ:      make(chan Response, 1),
		running:      true,
	}
	go task.run(ctx) // run handler asynchronously
	return task
}

// IsRunning checks if task is still running
func (t *task) IsRunning() bool {
	return t.running
}

// Await waits for completion of the task
func (t *task) Await(
	ctx context.Context,
	timeout time.Duration) (result interface{}, err error) {
	result = nil
	select {
	case <-ctx.Done():
		err = ctx.Err()
	case res := <-t.resultQ:
		result = res.Result
		err = res.Err
	case <-time.After(timeout):
		err = fmt.Errorf("async task timedout %v", timeout)
	}
	if err != nil {
		go t.abortHandler(ctx, t.request) // abortHandler operation
	}
	return
}

// AwaitAll waits for completion of multiple tasks
func AwaitAll(
	ctx context.Context,
	timeout time.Duration,
	all ...Awaiter) []Response {
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	results := make([]Response, 0)
	for _, next := range all {
		res, err := next.Await(ctx, timeout)
		results = append(results, Response{Result: res, Err: err})
	}
	return results
}

////////////////////////////////////// PRIVATE METHODS ///////////////////////////////////////
func (t *task) run(ctx context.Context) {
	go func() {
		result, err := t.handler(ctx, t.request)
		t.resultQ <- Response{Result: result, Err: err} // out channel is buffered by 1
		t.running = false
		close(t.resultQ)
	}()
}

In above example, Async method takes a function to invoke in background and creates a channel for reply. It then executes the function and sends back reply to the channel. The client uses the future object return by Async method to wait for the response. The Await method provides timeout to specify the max wait time for response. Note: The Await method listens to ctx.Done() in addition to the response channel that notifies it if client canceled the task or if it timed out by high-level settings.

Following code shows how crawler can use these primitives to define background tasks for crawler:

package crawler

import (
	"context"
	"errors"
	"sync/atomic"
	"time"

	"plexobject.com/crawler/async"
	"plexobject.com/crawler/domain"
	"plexobject.com/crawler/utils"
)

// MaxDepth max depth of crawling
const MaxDepth = 4

// MaxUrls max number of child urls to crawl
const MaxUrls = 11

// Crawler is used for crawing URLs
type Crawler struct {
	crawlHandler      async.Handler
	downloaderHandler async.Handler
	rendererHandler   async.Handler
	indexerHandler    async.Handler
	totalMessages     uint64
}

// New Instantiates new crawler
func New(ctx context.Context) *Crawler {
	crawler := &Crawler{totalMessages: 0}
	crawler.crawlHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		req := payload.(*domain.Request)
		return crawler.handleCrawl(ctx, req)
	}
	crawler.downloaderHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		// TODO check robots.txt and throttle policies
		// TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
		return utils.RandomString(100), nil
	}
	crawler.rendererHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		// for SPA apps that use javascript for rendering contents
		return utils.RandomString(100), nil
	}
	crawler.indexerHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		return 0, nil
	}
	return crawler
}

// Crawl - crawls list of URLs with specified depth
func (c *Crawler) Crawl(ctx context.Context, urls []string, timeout time.Duration) (int, error) {
	// Boundary for concurrency and it will not return until all
	// child URLs are crawled up to MaxDepth limit.
	return c.crawl(ctx, urls, 0, timeout)
}

// TotalMessages - total number of messages processed
func (c *Crawler) TotalMessages() uint64 {
	return c.totalMessages
}

// handles crawl
func (c *Crawler) handleCrawl(ctx context.Context, req *domain.Request) (*domain.Result, error) {
	atomic.AddUint64(&c.totalMessages, 1)
	timeout := time.Duration(req.Timeout * time.Second)
	res := domain.NewResult(req)
	if contents, err := async.Execute(ctx, c.downloaderHandler, async.NoAbort, req.URL).Await(ctx, timeout); err != nil {
		res.Failed(err)
	} else {
		if newContents, err := async.Execute(ctx, c.rendererHandler, async.NoAbort, [...]string{req.URL, contents.(string)}).Await(ctx, timeout); err != nil {
			res.Failed(err)
		} else {
			if hasContentsChanged(ctx, req.URL, newContents.(string)) && !isSpam(ctx, req.URL, newContents.(string)) {
				async.Execute(ctx, c.indexerHandler, async.NoAbort, [...]string{req.URL, newContents.(string)}).Await(ctx, timeout)
				urls := parseURLs(ctx, req.URL, newContents.(string))
				if childURLs, err := c.crawl(ctx, urls, req.Depth+1, req.Timeout); err != nil {
					res.Failed(err)
				} else {
					res.Succeeded(childURLs + 1)
				}
			} else {
				res.Failed(errors.New("contents didn't change"))
			}
		}
	}

	return res, nil
}

/////////////////// Internal private methods ///////////////////////////
// Crawls list of URLs with specified depth
func (c *Crawler) crawl(ctx context.Context, urls []string, depth int, timeout time.Duration) (int, error) {
	if depth < MaxDepth {
		futures := make([]async.Awaiter, 0)
		for i := 0; i < len(urls); i++ {
			futures = append(futures, async.Execute(ctx, c.crawlHandler, async.NoAbort, domain.NewRequest(urls[i], depth, timeout)))
		}
		sum := 0
		var savedError error
		for i := 0; i < len(futures); i++ {
			res, err := futures[i].Await(ctx, timeout)
			if err != nil {
				savedError = err // returning only a single error
			}
			if res != nil {
				sum += res.(*domain.Result).ChildURLs
			}
		}

		return sum, savedError
	}
	return 0, nil
}

func parseURLs(ctx context.Context, url string, contents string) []string {
	// tokenize contents and extract href/image/script urls
	urls := make([]string, 0)
	for i := 0; i < MaxUrls; i++ {
		urls = append(urls, utils.RandomChildUrl(url))
	}
	return urls
}

func hasContentsChanged(ctx context.Context, url string, contents string) bool {
	return true
}

func isSpam(ctx context.Context, url string, contents string) bool {
	return false
}

In above implementation, crawler defines background tasks for crawling, downloading, rendering and indexing. The Crawl defines concurrency boundary and waits until all child tasks are completed. Go provides first class support for cancellation and timeout via context.Context, but you have to listen special ctx.Done() channel.

Following unit tests show examples of cancellation, timeout and normal processing:

package crawler

import (
	"context"
	"log"
	"testing"
	"time"
)

const EXPECTED_URLS = 19032

func TestCrawl(t *testing.T) {
	rootUrls := []string{"https://a.com", "https://b.com", "https://c.com", "https://d.com", "https://e.com", "https://f.com", "https://g.com", "https://h.com", "https://i.com", "https://j.com", "https://k.com", "https://l.com", "https://n.com"}
	started := time.Now()
	timeout := time.Duration(8 * time.Second)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	crawler := New(ctx)
	received, err := crawler.Crawl(ctx, rootUrls, timeout)
	elapsed := time.Since(started)
	log.Printf("Crawl took %s to process %v messages -- %v", elapsed, received, crawler.TotalMessages())
	if crawler.totalMessages != EXPECTED_URLS {
		t.Errorf("Expected %v urls but was %v", EXPECTED_URLS, crawler.totalMessages)
	}
	if err != nil {
		t.Errorf("Unexpected error %v", err)
	} else if EXPECTED_URLS != received {
		t.Errorf("Expected %v urls but was %v", EXPECTED_URLS, received)
	}
}

func TestCrawlWithTimeout(t *testing.T) {
	started := time.Now()
	timeout := time.Duration(4 * time.Millisecond)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	crawler := New(ctx)
	received, err := crawler.Crawl(ctx, []string{"a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"}, timeout)
	if err == nil {
		t.Errorf("Expecting timeout error")
	}
	elapsed := time.Since(started)
	log.Printf("Timedout took %s to process %v messages -- %v - %v", elapsed, received, crawler.TotalMessages(), err)
}

func TestCrawlWithCancel(t *testing.T) {
	started := time.Now()
	timeout := time.Duration(3 * time.Second)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	crawler := New(ctx)
	var err error
	var received int
	go func() {
		// calling asynchronously
		received, err = crawler.Crawl(ctx, []string{"a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"}, timeout)
	}()
	time.Sleep(5 * time.Millisecond)
	cancel()
	time.Sleep(50 * time.Millisecond)
	if err == nil {
		t.Errorf("Expecting cancel error")
	}
	elapsed := time.Since(started)
	log.Printf("Cancel took %s to process %v messages -- %v - %v", elapsed, received, crawler.TotalMessages(), err)
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/go_pool.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main Crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • Go supports cancellation and timeout APIs and the Crawl method passes timeout parameter so that the crawling all URLs must complete with the time period.
  • The Crawl method captures error from async response and returns so that client code can perform error handling.

Following are shortcomings using this approach for structured concurrency and general design:

  • You can’t monitor life-time of go-routines and you won’t get any errors if background task dies unexpectedly.
  • The cancellation API returns without cancelling underlying operation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Go doesn’t support specifying execution context for go-routines and all asynchronous code is automatically scheduled by GO (G0 go-routines).
  • GO go-routines are not easily composable because they don’t have any parent/child relationship as opposed to async methods that can invoke other async methods in Typescript, Rust or other languages supporting async/await.
  • As Go doesn’t enforce immutability so you will need mutex to protect shared state. Also, mutex implementation in GO is not re-entrant aware so you can’t use for any recursive methods where you are acquiring locks.
  • Above code creates a new go-routine for crawling each URL and though the overhead of each process is small but it may use other expensive resources such as network resource.

Using worker-pool in GO

As opposed to creating new go-routine, we can use worker-pool of go-routines to perform background tasks so that we can manage external resource dependencies easily.

Following code shows an implementation of worker-pool in GO:

package pool

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/google/uuid"
)

const BUFFER_CAPACITY = 2 // allow buffering to support asynchronous behavior  as by default sender will be blocked

type Handler func(ctx context.Context, payload interface{}) (interface{}, error)

type Awaiter interface {
	Await(ctx context.Context, timeout time.Duration) (interface{}, error)
}

// Request encapsulates request to process
type Request struct {
	id      string
	payload interface{}
	outQ    chan Result
}

// Result encapsulates results
type Result struct {
	id      string
	payload interface{}
	err     error
}

// Worker structure defines inbound channel to receive request and lambda function to execute
type Worker struct {
	id                   int
	handler              Handler
	workerRequestChannel chan *Request
}

// NewWorker creates new worker
func NewWorker(id int, handler Handler) Worker {
	return Worker{
		id:                   id,
		handler:              handler,
		workerRequestChannel: make(chan *Request),
	}
}

func (w Worker) start(ctx context.Context, workersReadyPool chan chan *Request, done chan bool) {
	go func(w Worker) {
		for {
			// register the current worker into the worker queue.
			workersReadyPool <- w.workerRequestChannel

			select {
			case <-ctx.Done():
				break
			case req := <-w.workerRequestChannel:
				payload, err := w.handler(ctx, req.payload)
				req.outQ <- Result{id: req.id, payload: payload, err: err} // out channel is buffered by 1
				close(req.outQ)
			case <-done:
				return
			}
		}
	}(w)
}

// WorkPool - pool of workers
type WorkPool struct {
	size                int
	workersReadyPool    chan chan *Request
	pendingRequestQueue chan *Request
	done                chan bool
	handler             Handler
}

// New Creates new async structure
func New(handler Handler, size int) *WorkPool {
	async := &WorkPool{
		size:                size,
		workersReadyPool:    make(chan chan *Request, BUFFER_CAPACITY),
		pendingRequestQueue: make(chan *Request, BUFFER_CAPACITY),
		done:                make(chan bool),
		handler:             handler}
	return async
}

// Start - starts up workers and internal goroutine to receive requests
func (p *WorkPool) Start(ctx context.Context) {
	for w := 1; w <= p.size; w++ {
		worker := NewWorker(w, p.handler)
		worker.start(ctx, p.workersReadyPool, p.done)
	}
	go p.dispatch(ctx)
}

// Add request to process
func (p *WorkPool) Add(ctx context.Context, payload interface{}) Awaiter {
	// Adding request to process
	req := &Request{id: uuid.New().String(), payload: payload, outQ: make(chan Result, 1)}
	go func() {
		p.pendingRequestQueue <- req
	}()
	return req
}

// Await for reply -- you can only call this once
func (r Request) Await(ctx context.Context, timeout time.Duration) (payload interface{}, err error) {
	select {
	case <-ctx.Done():
		err = errors.New("async_cancelled")
	case res := <-r.outQ:
		payload = res.payload
		err = res.err
	case <-time.After(timeout):
		payload = nil
		err = fmt.Errorf("async_timedout %v", timeout)
	}

	return
}

// Stop - stops thread pool
func (p *WorkPool) Stop() {
	close(p.pendingRequestQueue)
	go func() {
		p.done <- true
	}()
}

// Receiving requests from inbound channel and forward it to the worker's workerRequestChannel
func (p *WorkPool) dispatch(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-p.done:
			return
		case req := <-p.pendingRequestQueue:
			go func(req *Request) {
				// Find next ready worker
				workerRequestChannel := <-p.workersReadyPool
				// dispatch the request to next ready worker
				workerRequestChannel <- req
			}(req)
		}
	}
}

You can download above examples from https://github.com/bhatti/concurency-katas/tree/main/go_pool.

Rust

Rust was designed by Mozilla Research to provide better performance, type safety, strong memory ownership and safe concurrency. With its strong ownership and lifetime scope, Rust minimizes race conditions because each object can only have one owner that can update the value. Further, strong typing, traits/structured-types, abstinence of null references, immutability by default eliminates most of common bugs in the code.

Rust uses OS-threads for multi-threading but has added support for coroutines and async/await recently. Rust uses futures for asynchronous behavior but unlike other languages, it doesn’t provide runtime environment for async/await. Two popular runtime systems available for Rust are https://tokio.rs/ and https://github.com/async-rs/async-std. Also, unlike other languages, async/await in Rust uses zero-cost abstraction where async just creates a future without scheduling until await is invoked. The runtime systems such as async-std and tokio provides executor that polls future until it returns a value.

Following example shows how async/await can be used to implement

extern crate rand;
use std::{error::Error, fmt};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rand::Rng;
use rand::distributions::Alphanumeric;
use rand::seq::SliceRandom;
use futures::future::{Future, join_all, BoxFuture};
use futures::stream::{FuturesUnordered};
use futures::executor;
use async_std::{task, future};
use async_std::future::timeout;

const MAX_DEPTH: u8 = 4;
const MAX_URLS: u8 = 11;

// Request encapsulates details of url to crawl
#[derive(Debug, Clone, PartialEq)]
pub struct Request {
    pub url: String,
    pub depth: u8,
    pub timeout: Duration,
    pub created_at: u128,
}

impl Request {
    pub fn new(url: String, depth: u8, timeout: Duration) -> Request {
        let epoch = SystemTime::now().duration_since(UNIX_EPOCH).expect("epoch failed").as_millis();
        Request{url: url.to_string(), depth: depth, timeout: timeout, created_at: epoch}
    }
}

#[derive(Debug, Copy, Clone)]
pub enum CrawlError {
    Unknown,
    MaxDepthReached,
    DownloadError,
    ParseError,
    IndexError,
    ContentsNotChanged,
    Timedout,
}

impl Error for CrawlError {}

impl fmt::Display for CrawlError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            CrawlError::MaxDepthReached => write!(f, "MaxDepthReached"),
            CrawlError::DownloadError => write!(f, "DownloadError"),
            CrawlError::ParseError => write!(f, "ParseError"),
            CrawlError::IndexError => write!(f, "IndexError"),
            CrawlError::ContentsNotChanged => write!(f, "ContentsNotChanged"),
            CrawlError::Timedout=> write!(f, "Timedout"),
            CrawlError::Unknown => write!(f, "Unknown"),
        }
    }
}


//////// PUBLIC METHODS
// crawling a collection of urls
pub fn crawl(urls: Vec<String>, timeout_dur: Duration) -> Result<usize, CrawlError> {
    // Boundary for concurrency and it will not return until all
    // child URLs are crawled up to MAX_DEPTH limit.
    //
    match task::block_on(
        timeout(timeout_dur, async {
            do_crawl(urls, timeout_dur, 0)
        })
    ) {
        Ok(res) => res,
        Err(_err) => Err(CrawlError::Timedout),
    }
}

//////// PRIVATE METHODS
fn do_crawl(urls: Vec<String>, timeout_dur: Duration, depth: u8) -> Result<usize, CrawlError> {
    if depth >= MAX_DEPTH {
        return Ok(0)
    }

    let mut futures = Vec::new();
    let mut size = 0;

    for u in urls {
        size += 1;
        futures.push(async move {
            let child_urls = match handle_crawl(Request::new(u, depth, timeout_dur)) {
                Ok(urls) => urls,
                Err(_err) => [].to_vec(),
            };
            if child_urls.len() > 0 {
                do_crawl(child_urls, timeout_dur, depth+1)
            } else {
                Ok(0)
            }
        });
    }
    task::block_on(
        async {
            let res: Vec<Result<usize, CrawlError>> = join_all(futures).await;
            let sizes: Vec<usize> = res.iter().map(|r| r.map_or(0, |n|n)).collect::<Vec<usize>>();
            size += sizes.iter().fold(0usize, |sum, n| n+sum);
        }
    );
    Ok(size)
}

// method to crawl a single url
fn handle_crawl(req: Request) -> Result<Vec<String>, CrawlError> {
    let res: Result<Vec<String>, CrawlError> = task::block_on(
        async {
            let contents = match download(&req.url).await {
                Ok(data) => data,
                Err(_err) => return Err(CrawlError::DownloadError),
            };

            if has_contents_changed(&req.url, &contents) && !is_spam(&req.url, &contents) {
                let urls = match index(&req.url, &contents).await {
                    Ok(_) =>
                        match parse_urls(&req.url, &contents) {
                            Ok(urls) => urls,
                            Err(_err) => return Err(CrawlError::ParseError),
                        },
                    Err(_err) => return Err(CrawlError::IndexError),
                };
                return Ok(urls)
            } else {
                return Err(CrawlError::ContentsNotChanged)
            }
        }
    );
    match res {
        Ok(list) => return Ok(list),
        Err(err) => return Err(err),
    }
}

async fn download(url: &str) -> Result<String, CrawlError> {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    // invoke jsrender to generate dynamic content
    jsrender(url, &random_string(100)).await
}

async fn jsrender(_url: &str, contents: &str) -> Result<String, CrawlError> {
    // for SPA apps that use javascript for rendering contents
    Ok(contents.to_string())
}

async fn index(_url: &str, _contents: &str) -> Result<bool, CrawlError> {
    // apply standardize, stem, ngram, etc for indexing
    Ok(true)
}

fn parse_urls(_url: &str, _contents: &str) -> Result<Vec<String>, CrawlError> {
    // tokenize contents and extract href/image/script urls
    Ok((0..MAX_URLS).into_iter().map(|i| random_url(i)).collect())
}

fn has_contents_changed(_url: &str, _contents: &str) -> bool {
    true
}

fn is_spam(_url: &str, _contents: &str) -> bool {
    false
}

fn random_string(max: usize) -> String {
    rand::thread_rng().sample_iter(&Alphanumeric).take(max).collect::<String>()
}

fn random_url(i: u8) -> String {
    let domains = vec!["ab.com", "bc.com", "cd.com", "de.com", "ef.com", "fg.com", "gh.com", "hi.com", "ij.com", "jk.com", "kl.com", "lm.com", "mn.com",
		"no.com", "op.com", "pq.com", "qr.com", "rs.com", "st.com", "tu.com", "uv.com", "vw.com", "wx.com", "xy.com", "yz.com"];
    let domain = domains.choose(&mut rand::thread_rng()).unwrap();
    format!("https://{}/{}_{}", domain, random_string(20), i)
}

The crawl method defines scope of concurrency and asynchronously crawls each URL recursively but the parent URL waits until child URLs are crawled. The async-std provides support for timeout so that asynchronous task can fail early if it’s not completed within the bounded time-frame. However, it doesn’t provide cancellation support so you have to rely on cooperative cancellation.

Following unit-test and main routine shows example of crawling a list of URLs:

use std::time::Duration;
use futures::prelude::*;
use std::time::{Instant};
use crate::crawler::crawler::*;

mod crawler;

fn main() {
    let _ = do_crawl(8000);
}

fn do_crawl(timeout: u64) -> Result<usize, CrawlError> {
    let start = Instant::now();
    let urls = vec!["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"].into_iter().map(|s| s.to_string()).collect();
    let res = crawl(urls, Duration::from_millis(timeout));
    let duration = start.elapsed();
    println!("Crawled {:?} urls in () is: {:?}", res, duration);
    res
}

#[cfg(test)]
mod tests {
    use super::do_crawl;
    #[test]
    fn crawl_urls() {
        match do_crawl(8000) {
            Ok(size) => assert_eq!(size, 19032),
            Err(err) => assert!(false, format!("Unexpected error {:?}", err)),
        }
    }
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/rust_async.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • The async-std runtime environment supports timeout APIs and the crawl method takes the timeout parameter so that the crawling all URLs must complete with the time period.
  • The crawl method captures error from async response and returns the error so that client code can perform error handling.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.

Following are shortcomings using this approach for structured concurrency and general design:

  • Rust async/await APIs doesn’t support native support for cancellation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Rust async/await APIs doesn’t allow you to specify execution context for asynchronous code.
  • The async/await support in Rust is relatively new and has not matured yet. Also, it requires separate runtime environment and there are a few differences in these implementations.
  • Above design for crawler is not very practical because it creates a asynchronous task for each URL that is crawled and it may strain network or IO resources.

Overall, GO provides decent support for low-level concurrency but its complexity can create subtle bugs and incorrect use of go-routines can result in deadlocks. Also, it’s prone to data races due to mutable shared state. Just like structured programming considered GOTO statements harmful and recommended if-then, loops, and function calls for control flow, structured concurrency considers GO statements harmful and recommends parent waits for children completion and supports propagating errors from children to parent. Rust offers async/await syntax for concurrency scope and supports composition and error propagation with strong ownership that reduces chance of data races. Also, Rust uses continuations by suspending async block and async keyword just creates a future and does not start execution so it results in better performance when async code is chained together. However, async/await is still in its inception phase in Rust and lacks proper support for cancellation and customized execution context.

November 4, 2020

Structured Concurrency in modern programming languages – Part-II

Filed under: Computing,Erlang,Languages — admin @ 8:46 pm

In this second part of the series on structured concurrency (Part-I, Part-III, Part-IV, Swift-Followup), I will review Elixir and Erlang languages for writing concurrent applications and their support for structured concurrency:

Erlang

The Erlang language was created by late Joe Armstrong when he worked at Ericsson and it is designed for massive concurrency by means of very light weight processes that are based on actors. Each process has its own mailbox for storing incoming messages of various kinds. The receive block in Erlang is triggered upon new message arrival and the message is removed and executed when it matches specific pattern match. The Erlang language uses supervisors for monitoring processes and immutable functional paradigm for writing robust concurrent systems. Following is high-level architecture of Erlang system:

As the cost of each process or actor is only few hundred bytes, you can create millions of these processes for writing highly scalable concurrent systems. Erlang is a functional language where all data is immutable by default and the state within each actor is held private so there is no shared state or race conditions.

An actor keeps a mailbox for incoming messages and processes one message at a time using the receive API. Erlang doesn’t provide native async/await primitives but you can simulate async by sending an asynchronous message to an actor, which can then reply back to the sender using its process-id. The requester process can then block using receive API until reply is received. Erlang process model has better support for timeouts with receive API to exit early if it doesn’t receive response within a time period. Erlang system uses the mantra of let it crash for building fault tolerant applications and you can terminate a process and all children processes connected.

Using actor model in Erlang

Following code shows how native send and receive primitives can be used to build the toy web crawler:

-module(erlcrawler).

-export([start_link/0, crawl_urls/3, total_crawl_urls/1]).

-record(request, {clientPid, ref, url, depth, timeout, created_at=erlang:system_time(millisecond)}).
-record(result, {url, status=pending, child_urls=0, started_at=erlang:system_time(millisecond), completed_at, error}).

-define(MAX_DEPTH, 4).
-define(MAX_URL, 11).
-define(DOMAINS, [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "yz.com"]).

make_request(ClientPid, Ref, Url, Depth, Timeout) ->
    #request{clientPid=ClientPid, ref=Ref, url=Url, depth=Depth, timeout=Timeout}.

make_result(Req) ->
    Url = Req#request.url,
    #result{url=Url}.

%%% Client API
start_link() ->
    spawn_link(fun init/0).

%%%%%%%%%%%% public method for crawling %%%%%%%%%%%%
%%% calling private method for crawling
%%% Pid - process-id of actor
%%% 0 - current depth
%%% Urls - list of urls to crawl
%%% Timeout - max timeout
crawl_urls(Pid, Urls, Timeout) when is_pid(Pid), is_list(Urls)  ->
    %% Boundary for concurrency and it will not return until all
    %% child URLs are crawled up to MAX_DEPTH limit.
    do_crawl_urls(Pid, 0, Urls, [], Timeout, 0).

total_crawl_urls(Pid) when is_pid(Pid) ->
    Self = self(),
    Pid ! {total, Self},
    receive {total_reply, Self, N} ->
        N
    end.

%%% Server functions
init() ->
    {ok, DownloaderPid} = downloader:start_link(),
    {ok, IndexerPid} = indexer:start_link(),
    loop(DownloaderPid, IndexerPid, 0).

%%% Main server loop
loop(DownloaderPid, IndexerPid, N) ->
    receive
        {crawl, Req} ->
            CrawlerPid = self(),
            spawn_link(fun() -> handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) end),
            debug_print(N),
            loop(DownloaderPid, IndexerPid, N+1);
        {total, Pid} ->
            Pid ! {total_reply, Pid, N},
            loop(DownloaderPid, IndexerPid, N);
        terminate ->
            ok
    end.


%%% Internal client functions
debug_print(N) when N rem 10000 == 0 ->
    io:format("~p...~n", [{N}]);
debug_print(_) ->
    ok.

%% Go through URLs to crawl, send asynchronous request to crawl and
%% then add request to a list to monitor that will be used to receive
%% reply back from the crawling actor.
do_crawl_urls(_, _, [], [], _, ChildURLs) ->
    ChildURLs; % all done
do_crawl_urls(_, ?MAX_DEPTH, _, _, _, _) ->
    0; % reached max depth, stop more crawling
do_crawl_urls(Pid, Depth, [Url|T], SubmittedRequests, Timeout, 0) when is_pid(Pid), is_integer(Depth), is_integer(Timeout) ->
    %%% monitoring actor so that we are notified when actor process dies
    Ref = erlang:monitor(process, Pid),
    %%% crawling next url to process
    Req = make_request(self(), Ref, Url, Depth, Timeout),
    Pid ! {crawl, Req},
    do_crawl_urls(Pid, Depth, T, SubmittedRequests ++ [Req], Timeout, 0);
do_crawl_urls(Pid, Depth, [], [Req|T], Timeout, ChildURLs) when is_pid(Pid) ->
    %%% receiving response from the requests that were previously stored
    Ref = Req#request.ref,
    receive
        {crawl_done, Ref, Res} ->
            erlang:demonitor(Ref, [flush]),
            do_crawl_urls(Pid, Depth, [], T, Timeout, Res#result.child_urls+ChildURLs+1);
        {'DOWN', Ref, process, Pid, Reason} ->
            erlang:error(Reason)
    after Timeout ->
        erlang:error({crawl_timeout, Timeout})
    end.


%%% Internal server functions called by actor to process the crawling request
handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) ->
    Res = make_result(Req),
    ClientPid = Req#request.clientPid,
    Url = Req#request.url,
    Ref = Req#request.ref,
    Depth = Req#request.depth,
    Timeout = Req#request.timeout,

    case downloader:download(DownloaderPid, Url) of
        {ok, Contents} ->
        {ok, Contents1} = downloader:jsrender(DownloaderPid, Url, Contents),
        Changed = has_content_changed(Url, Contents1),
        Spam = is_spam(Url, Contents1),
        if Changed and not Spam ->
            indexer:index(IndexerPid, Url, Contents1), % asynchronous call
        Urls = parse_urls(Url, Contents1),
                %% Crawling child urls synchronously before returning
                ChildURLs = do_crawl_urls(CrawlerPid, Depth+1, Urls, [], Timeout, 0) + 1,
                Res1 = Res#result{completed_at=erlang:system_time(millisecond), child_urls=ChildURLs},
                ClientPid ! {crawl_done, Ref, Res1};
            true ->
                Res1 = Res#result{completed_at=erlang:system_time(millisecond)},
                ClientPid ! {crawl_done, Ref, Res1}
            end;
        Err ->
            Res1 = Res#result{completed_at=erlang:system_time(millisecond), error = Err},
            ClientPid ! {crawl_done, Ref, Res1}
        end,
    ok.

%%%%%%%%%%%%%%% INTERNAL METHODS FOR CRAWLING %%%%%%%%%%%%%%%%
parse_urls(_Url, _Contents) ->
    % tokenize contents and extract href/image/script urls
    random_urls(?MAX_URL).

random_urls(N) ->
    [random_url() || _ <- lists:seq(1, N)].

has_content_changed(_Url, _Contents) ->
     % calculate hash digest and compare it with last digest
    true.

is_spam(_Url, _Contents) ->
     % apply standardize, stem, ngram, etc for indexing
    false.

random_url() ->
    "https://" ++ random_domain() ++ "/" ++ random_string(20).

random_domain() ->
    lists:nth(random:uniform(length(?DOMAINS)), ?DOMAINS).

random_string(Length) ->
    AllowedChars = "abcdefghijklmnopqrstuvwxyz",
    lists:foldl(fun(_, Acc) -> [lists:nth(random:uniform(length(AllowedChars)), AllowedChars)] ++ Acc end, [], lists:seq(1, Length)).

In above implementation, crawl_urls method takes list of URLs and time out and waits until all URLs are crawled. It uses spawn_link to create a process, which invokes handle_crawl method to process requests concurrently. The handle_crawl method recursively crawl the URL and its children up to MAX_DEPTH limit. This implementation uses separate Erlang OTP processes for downloading, rendering and indexing contents. The handle_crawl sends back the response with number of child URLs that it crawled.

-module(erlcrawler_test).
-include_lib("eunit/include/eunit.hrl").

-define(ROOT_URLS, ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]).

crawl_urls_test() ->
    {spawn, {timeout,30, do_crawl_urls(10000)}}.

%% Testing timeout and by default, it will terminate the test process so we will instead convert
%% kill signal into a message using erlang:exit
crawl_urls_with_timeout_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Timeout = 10, % We know that processing takes longer than 10 milliseconds
    Pid = erlcrawler:start_link(),
    process_flag(trap_exit, true),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout)
    end),
    {{crawl_timeout, _}, _} = receive
        {'EXIT', _, Reason} -> Reason
    after 1000 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_timeout_test: timedout as expected in millis ~p ~n", [{Elapsed}]).

%% Testing terminate/cancellation and killing a process will kill all its children
crawl_urls_with_terminate_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, 1000) % crawl_urls is synchronous method so calling in another process
    end),
    receive
    after 15 -> % waiting for a bit before terminating (canceling) process
        exit(Pid, {test_terminated})
    end,
    {test_terminated} = receive
        {'EXIT', Pid, Reason} -> Reason
    after 200 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_terminate_test: terminated as expected in millis ~p ~n", [{Elapsed}]).

do_crawl_urls(Timeout) ->
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    N = erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout),
    N1 = erlcrawler:total_crawl_urls(Pid),
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("do_crawl_urls: Crawled URLs in millis: ~p ~n", [{N, N1, Elapsed}]),
    ?assertEqual(N1, 19032).

Above tests show three ways to try out the crawl_urls API. First test crawl_urls_test tests happy case of crawling URLs within 10 seconds. The crawl_urls_with_timeout_test tests the timeout behavior to make sure proper error message is returned and all Erlang processes are terminated. The crawl_urls_with_terminate_test tests cancellation behavior by terminating the main crawling process. You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/erl_actor.

Following are major benefits of using this process model to implement structured concurrency:

  • The main crawl_urls method defines high level scope of concurrency and it waits for the completion of child tasks.
  • crawl_urls method takes a timeout parameter so that the crawling all URLs must complete with the time period.
  • Erlang allows parent-child relationship between processes where you can monitor child processes and get notified when a child process dies. You can use this feature to cancel the asynchronous task. However, it will abruptly end all processes and all state within the process will be lost.
  • Erlang implementation captures the error within the response so the client can handle all error handling using pattern matching or other approach common in Erlang applications.

Following are shortcomings using this approach for structured concurrency:

  • The terminate API is not suitable for clean cancellation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Though, you can combine processes in groups or parent child relationships manually but Erlang doesn’t give you a lot of flexibility to specify the context for execution.
  • Unlike async declared methods in Typescript, Erlang code is not easily composable but you can define client code to wrap send/receive messages so that high level code can be comprehended easily. Also, Erlang processes can be connected with parent-child relationships and you can manage composition via process-supervisor hierarchy.
  • Above code creates a new process for crawling each URL and though the overhead of each process is small but it may use other expensive resources such as network resource. We won’t use such approach for real crawler as it will strain the resources on the website being crawled. Instead, we may need to limit how many concurrent requests can be sent to a given website or maintain delay between successive requests.

Using pmap in Erlang

We can generalize above approach into a general purpose pmap that processes an array (similar to map function in functional languages) concurrently and then waits for their response such as:

-module(pmap).

-export([pmap/3]).

pmap(F, Es, Timeout) ->
   Parent = self(),
   Running = [exec(Parent, F, E) || E <- Es],
   collect(Running, Timeout).

exec(Parent, F, E) ->
    spawn_monitor(fun() -> Parent ! {self(), F(E)} end).

collect([], _Timeout) -> [];
collect([{Pid, MRef} | Next], Timeout) ->
  receive
    {Pid, Res} ->
      erlang:demonitor(MRef, [flush]),
      [{ok, Res} | collect(Next, Timeout)];
    {'DOWN', MRef, process, Pid, Reason} ->
      [{error, Reason} | collect(Next, Timeout)]
  after Timeout ->
    erlang:error({pmap_timeout, Timeout})
  end.

You can download full pmap example from https://github.com/bhatti/concurency-katas/tree/main/erl_pmap.

Elixir

The Elixir language is built upon Erlang BEAM VM and was created by Jose Valim to improve usability of Erlang language and introduce Rubyist syntax instead of Prologist syntax in Erlang language. It also removes some of the boilerplate that you needed in Erlang language and adds higher level abstractions for writing highly concurrent, distributed and fault tolerant applications.

Using a worker-pool and OTP in Elixir

As Elixir uses Erlang VM and runtime system, the application behavior will be similar to Erlang applications but following approach uses a worker pool design where the parent process keeps a list of child-processes and delegates the crawling work to child processes in a round-robin fashion:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  # {:ok, pid} = Crawler.start_link(100000)
  def start_link(size) when is_integer(size) do
    GenServer.start_link(__MODULE__, size)
  end

  def total_crawl_urls(pid) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, 30000)
  end

  ### Public client APIs
  def crawl_urls(pid, urls) when is_pid(pid) and is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self())
  end

  ### Internal client APIs
  def crawl_urls(pid, urls, depth, clientPid) when is_pid(pid) and is_list(urls) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ## init method create pool of workers based on given size
  def init(size) when is_integer(size) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    pids = Map.keys(pid_to_workers)
    {:ok, {pid_to_workers, pids, 0}}
  end

  ## handles crawling
  def handle_cast({:crawl, request}, {pid_to_workers, [pid|rest], total_in}) do
    GenServer.cast(pid, {:crawl, request}) # send request to workers in round-robin fashion
    {:noreply, {pid_to_workers, rest ++ [pid], total_in+1}}
  end

  def handle_call({:total_crawl_urls}, _from, {_, _, total_in} = state) do
    {:reply, total_in, state}
  end

  ## OTP Callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, _, total_in}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)
    pids = Map.keys(new_pid_to_workers)
    {:noreply, {new_pid_to_workers, pids, total_in}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

The parent process in above example defines crawl_urls method for crawling URLs, which is defined as an asynchronous API (handle_cast) and forwards the request to next worker. Following is implementation of the worker:

defmodule Worker do
  @moduledoc """
  Documentation for crawling worker.
  """
  @max_url 11
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"

  use GenServer

  # Client APIs
  def start_link(crawler_pid) when is_pid(crawler_pid) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {crawler_pid, downloader_pid, indexer_pid})
  end

  @doc """
  Crawls web url asynchronously
  """
  def handle_cast({:crawl, request}, {crawler_pid, downloader_pid, indexer_pid}=state) do
    handle_crawl(crawler_pid, downloader_pid, indexer_pid, request)
    {:noreply, state}
  end

  def init(crawler_pid) do
      {:ok, crawler_pid}
  end

  # Internal private methods
  defp handle_crawl(crawler_pid, downloader_pid, indexer_pid, req) do
    res = Result.new(req)
    contents = Downloader.download(downloader_pid, req.url)
    new_contents = Downloader.jsrender(downloader_pid, req.url, contents)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(indexer_pid, req.url, new_contents)
      urls = parse_urls(req.url, new_contents)
      Crawler.crawl_urls(crawler_pid, urls, req.depth+1, req.clientPid)
      send req.clientPid, {:crawl_done, Result.completed(res)}
    else
      send req.clientPid, {:crawl_done, Result.failed(req, :skipped_crawl)}
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

The worker process starts downloader and indexer processes upon start and crawls the URL upon receiving the next request. It then sends back the response to the originator of request using process-id in the request. Following unit tests are used to test the behavior of normal processing, timeouts and cancellation:

defmodule CrawlerTest do
  use ExUnit.Case
  doctest Crawler
  @max_processes 10000
  @max_wait_messages 19032
  @root_urls ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]

  test "test crawling urls" do
    started = System.system_time(:millisecond)
    {:ok, pid} = Crawler.start_link(@max_processes)
    Crawler.crawl_urls(pid, @root_urls)
    wait_until_total_crawl_urls(pid, @max_wait_messages, started)
  end

  defp wait_until_total_crawl_urls(pid, 0, started) do
    n = Crawler.total_crawl_urls(pid)
    elapsed = System.system_time(:millisecond) - started
    IO.puts("Crawled URLs in millis: #{n} #{elapsed}")
    assert n >= @max_wait_messages
  end

  defp wait_until_total_crawl_urls(pid, max, started) do
    if rem(max, 1000) == 0 do
      IO.puts("#{max}...")
    end
    receive do
      {:crawl_done, _} -> wait_until_total_crawl_urls(pid, max-1, started)
    end
  end

end

Following are major benefits of this approach for its support of structured concurrency:

  • The crawl_urls method in parent process defines high level scope of concurrency and it waits for the completion of child tasks.
  • Above implementation also uses timeout similar to the Erlang example to ensure task is completed within given time period.
  • Above implementation also captures the error within the response similar to Erlang for error handling.
  • This approach addresses some of the shortcomings of previous approach in Erlang implementation where a new process was created for each request. Instead a pool of process is used to manage the capacity of resources.

Following are shortcomings using this approach for structured concurrency:

  • This approach also suffers the same drawbacks as Erlang approach regarding cancellation behavior and you will need to implement a cooperative cancellation to cleanup the resources properly.
  • Similar to Erlang, Elixir also doesn’t give you a lot of flexibility to specify the context for execution and it’s not easily composable.

Using async-await in Elixir

Elixir defines abstracts Erlang process with Task when you only need to execute a single action throughout its lifetime. Here is an example that combines Task async/await with pmap implementation:

defmodule Parallel do
  def pmap(collection, func, timeout) do
    collection
    |> Enum.map(&(Task.async(fn -> func.(&1) end)))
    |> Enum.map(fn t -> Task.await(t, timeout) end)
  end
end
defmodule Crawler do
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "ef.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"
  @max_depth 4
  @max_url 11

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def crawl_urls(urls, timeout) when is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    ## Starting external services using OTP for downloading and indexing
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    res = crawl_urls(urls, downloader_pid, indexer_pid, 0, timeout)
    ## Stopping external services using OTP for downloading and indexing
    Process.exit(downloader_pid, :normal)
    Process.exit(indexer_pid, :normal)
    res
  end

  def crawl_urls(urls, downloader_pid, indexer_pid, depth, timeout) when is_list(urls) and is_pid(downloader_pid) and is_pid(indexer_pid) and is_integer(depth) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, downloader_pid, indexer_pid, depth, timeout)))
      Parallel.pmap(requests, &(handle_crawl/1), timeout)
    else
      []
    end
  end

  # Internal private methods
  defp handle_crawl(req) do
    {:ok, contents} = Downloader.download(req.downloader_pid, req.url, req.timeout)
    {:ok, new_contents} = Downloader.jsrender(req.downloader_pid, req.url, contents, req.timeout)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(req.indexer_pid, req.url, new_contents, req.timeout)
      urls = parse_urls(req.url, new_contents)
      res = Crawler.crawl_urls(urls, req.downloader_pid, req.indexer_pid, req.depth+1, req.timeout)
      Enum.reduce(res, 0, &(&1 + &2)) + 1
    else
      0
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

Above example is a bit shorter due to the high level Task abstraction but its design has similar pros/cons as actor and pmap implementation of Erlang example. You can download full source code for this implementation from https://github.com/bhatti/concurency-katas/tree/main/elx_pmap.

Using Queue in Elixir

Following example shows web crawler implementation using queue:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def start_link(size) when is_integer(size) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {size, downloader_pid, indexer_pid})
  end

  ## crawl list of url
  def crawl_urls(pid, urls, timeout) when is_pid(pid) and is_list(urls) and is_integer(timeout) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self(), timeout)
  end

  # returns number of urls crawled
  def total_crawl_urls(pid, timeout) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, timeout)
  end

  ## dequeue returns pops top request from the queue and returns it
  def dequeue(pid) when is_pid(pid) do
    GenServer.call(pid, {:dequeue})
  end

  ###########################################
  ## internal api to crawl urls
  def crawl_urls(pid, urls, depth, clientPid, timeout) when is_pid(pid) and is_list(urls) and is_pid(clientPid) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid, timeout)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ###########################################
  ## init method create pool of workers based on given size
  def init({size, downloader_pid, indexer_pid}) when is_integer(size) and is_pid(downloader_pid) and is_pid(indexer_pid) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    {:ok, {pid_to_workers, :queue.new, 0, 0, downloader_pid, indexer_pid}}
  end

  ## asynchronous server handler for adding request to crawl in the queue
  def handle_cast({:crawl, request}, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    new_queue = :queue.in(request, queue)
    {:noreply, {pid_to_workers, new_queue, total_in+1, total_out, downloader_pid, indexer_pid}}
  end

  ## synchronous server handler for returning total urls crawled
  def handle_call({:total_crawl_urls}, _from, {_, _, _total_in, total_out, _, _} = state) do
    {:reply, total_out, state}
  end

  ## synchronous server handler to pop top request from the queue and returning it
  def handle_call({:dequeue}, _from, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    {head, new_queue} = :queue.out(queue)
    if head == :empty do
      {:reply, {head, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out, downloader_pid, indexer_pid}}
    else
      if rem(:queue.len(queue), 1000) == 0 or rem(total_out+1, 1000) == 0do
        IO.puts("#{total_out+1}...")
      end
      {:value, req} = head
      {:reply, {req, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out+1, downloader_pid, indexer_pid}}
    end
  end

  ## OTP helper callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, queue, total_in, total_out}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)

    {:noreply, {new_pid_to_workers, queue, total_in, total_out}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

You can download full source code of this example from https://github.com/bhatti/concurency-katas/tree/main/elx_queue.

Using Actor model as Abstract Data Structure

As the cost of actors is very small, you can also use it as an abstract data structure or objects that maintains internal state. Alan Kay, the pioneer in object-oriented programming described message-passing, isolation and state encapsulation as foundation of object-oriented design and Joe Armstrong described Erlang as the only object-oriented language. For example, let’s say you need to create a cache of stock quotes using dictionary data structure, which is updated from another source and provides easy access to the latest quotes. You would need to protect access to shared data in multi-threaded environment with synchronization. However, with actor-based design, you may define an actor for each stock symbol that keeps latest value internally and provides API to access or update quote data. This design will remove the need to synchronize shared data structure and will result in better performance.

Overall, Erlang process model is a bit low-level compared to async/await syntax and lacks composition in asynchronous code but it can be designed to provide structured scope, error handling and termination. Further, immutable data structures and message passing obviates the need for locks to protect shared state. Another benefit of Erlang/Elixir is its support of distributed services so it can be used for automatically distributing tasks to remote machines seamlessly.

October 29, 2020

Structured Concurrency in modern programming languages – Part-I

Filed under: Computing,Languages,Technology — admin @ 11:01 pm

Herb Sutter wrote about fifteen years ago how free performance lunch is over and you need to leverage concurrency to build high performance applications on modern multi-core machines. Unfortunately, adding concurrency support is not simple and low-level concurrency primitives in many languages can lead to buggy code with potential deadlocks and concurrent code can be hard to understand. Over the last few years, a number of programming languages have been improving support for concurrency and in this series of blogs (Part-II, Part-III, Part-IV, Swift-Followup), I will review some of programming languages that I have used in my current or past project such as Typescript, Elixir, Erlang, GO, Kotlin, and Rust. In particular, I will examine how these languages support structured concurrency so that concurrent code looks like sequential code and can be reasoned easily. Strictly speaking, concurrency relates to application behavior when modern operating systems use context switching to interleave execution of multiple tasks, whereas parallelism allow those tasks to be executed simultaneously. I will evaluate concurrency support in the context of multi-core hardware where we can guarantee correct behavior with preemptive/collaborative based multi-tasking and gain parallelism by utilizing multiple cores. The parallelism across multiple machines or distributed computing will be out of scope for this discussion.

Pitfalls with Concurrency

Before diving into the structured concurrency support, let’s review a few barriers that make writing concurrent code hard such as:

The control and data flow

The sequential code is easier to understand because you can predict the order of execution and though compilers or runtime environments may optimize that code with slightly different order but the top-down structure would remain intact. As opposed, concurrent code using threads/executors is disconnected from the main control and data flow that makes composition, error handling, timeout or cancellation in asynchronous code much harder. In addition, concurrent code requires coordination between threads with some overhead and requires synchronization to prevent data races that is hard to get right resulting in obscure and brittle code.

Race conditions

The race conditions is caused when the application behavior is determined by timing and interleaving of execution steps by multiple threads. The race conditions cause faulty behavior when the shared state or critical section is not properly guarded in a multi-threaded environment. You can eliminate race conditions by removing the shared state, using immutable objects or protecting critical sections using synchronization, mutex or locks.

Mutual Exclusion

The low-level locking primitives such as mutex, read/write/re-entrant locks are difficult to work with and add considerable complexity to your code. The buggy or incorrect implementation can lead to starvation, deadlocks or faulty behavior. Some libraries provide lock-free or concurrent data structures using atomic compare-and-swap (CAS) but they can still prone to contention when accessing from multiple threads.

Deadlocks

You may need to use locks to protect critical sections of the code and avoid race condition but incorrect implementation can lead to deadlocks where a threads can’t make any progress because it’s waiting for the resource held by another thread. In addition, the concurrent code may experience starvation when a thread can’t make a progress or livelock when multiple threads are stepping on each other. In order to circumvent deadlocks and livelocks, you can avoid nested locks, reduce number of locks and reduce scope of critical section. Instead, you can use re-entrant lock or fair locks to favor thread waiting for the longest time.

Callback Hell

A common pattern in many languages when calling an asynchronous method is to pass a callback function or lambda that is invoked when background task is completed. However, this structure devolves into complete mess when multiple asynchronous methods are chained, e.g.

class Crawler {
    crawl(url) {
        download(url, (contents) => {
            render(url, contents, (rendered) => {
                index(url, contents, (data) => { // index could have been running in parallel to parse
                    parse(contents, (urls) => {
                        urls.forEach((u) => crawl(u))
                    } // parse
                }) // index
            }) // render
        }) // download
    }

    download(url, cb) {

        ....
            cb(result)
    }

    render(url, contents, cb) {
        ....
            cb(result)
    }

    index(url, contents, cb) {
        ....
            cb(result)
    }

    parse(url, contents, cb) {
        ....
            cb(result)
    }
}

As you can see, the callback pattern quickly divulges into unwieldy mess and it’s hard to manage the results and error handling from within nested scope. Each callback is called upon completion of previous operation and you can’t use these callbacks for concurrent operations easily.

Promise Hell

Another common pattern for invoking an asynchronous method is to use promise or future objects such as:

class Crawler {
    crawl(url) {
        return download(url)
            .then((contents) =>
                render(url, contents)
            ).then((rendered) => {
                const indexPromise = index(url, contents)
                const parsePromise = parse(url, contents)
                return Promise.all([indexPromise, parsePromise]) // run both tasks in parallel
            }).then([indexResult, urls] => {
                return parse(contents, (urls) => {
                    urls.forEach((u) => crawl(u))
                } // parse
            }).catch((e) => {
                // error handling
            })
        }) // download
    }

    download(url) {
        return new Promise((resolve, reject) => {
            // async operation
            // ....
            if (ok) {
                resolve(result)
            } else {
                reject(new Error('failed'))
            }
        })
    }

    render(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    index(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    parse(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }
}

Though, promise model is a bit improved and you can manage concurrent tasks better using Promise.all but you still have to nest operations and error handling requires two separate ways to catch errors (using catch blocks and native try/catch). Another gotcha in dynamic languages such as Javascript is forgetting return in the last statement of promise.

Error Handling

Above examples show that error handling in asynchronous code is tricky and error prone. For example, when using callbacks, you can pass two callback methods, one for valid results and another for error but the nested scope makes it hard to handle these errors. Similarly, when using catch blocks in promises, it’s not clear which operation failed and adds substantial complexity if you need to recover some errors or perform an alternate operation based on conditional logic. You also have to combine promise specific catch blocks with normal try/catch blocks and it’s easy to miss proper error checking.

Cancellation and Timeout

As asynchronous code is run in a separate thread of execution, the cancelling or timing out requires some coordination between threads and it can be hard to implement in absence of library or language support. For example, a thread in expensive computation or database query can’t be cancelled if it’s blocking until that operation is completed. Some libraries support APIs to stop threads but that can leave process in unpredictable state, other libraries use signals to notify threads about termination. In order to properly cancel, you need non-blocking and cooperative model where the detached task checks for cancellation request periodically. Optimally, cancellation needs to cancel underlying operation so that application state remains consistent. Timeout is just an extension of cancellation behavior where asynchronous task is cancelled if it’s not completed within a specified time bound.

Debugging/Stack-traces

Asynchronous code makes it hard to see the call graph or stack traces from caller’s perspective due to execution in a separate thread. For example, you may see following stack trace in case of a database error on NodeJS where root-cause is not easily apparent:

at new QueryResultError (node_modules/pg-promise/lib/errors/queryResult.js:122:24)
at Query.ctx.db.client.query (node_modules/pg-promise/lib/query.js:192:41)
at Query.handleReadyForQuery (node_modules/pg/lib/query.js:126:10)
at Connection.<anonymous> (node_modules/pg/lib/client.js:163:19)
at Socket.<anonymous> (node_modules/pg/lib/connection.js:118:12)
at addChunk (_stream_readable.js:288:12)
at readableAddChunk (_stream_readable.js:269:11)
at Socket.Readable.push (_stream_readable.js:224:10)
at TCP.onStreamRead [as onread] (internal/stream_base_commons.js:94:17)

Out of scope

Note: you may need to handle other concerns such as retries with exponential back-off, circuit-breakers, or idempotence behavior with asynchronous code in distributed systems but it won’t be discussed here.

Concurrency Constructs

I have discussed some of concurrency primitives in my old blogs [1634, 1638, 1621] but following are common constructs or building blocks that are used for implementing concurrency support:

Threads

A thread defines a smallest unit of execution and multi-threading allows executing concurrent operations. There are two types of threads:

Native/OS-Thread

The native threads are tied with kernel threads and are scheduled using preemptive multi-tasking on modern operating systems. The operating system preempts native thread upon IO operation, wait/sleep, hardware interrupts or context switching. Native threads have high cost due to its stack size (about 256KB) and system overhead. As a result, a thread-pool of limited size is often used to share system resources for background processing.

Green/User-space Thread

The green threads use cooperative scheduling where a user-space scheduler performs context switching thus the overhead of spawning new threads is very small. The user-space schedulers use M:N model for mapping M schedulers to N native threads such as:

As green threads use cooperative scheduling, they generally require yield to allow other threads to proceed. The user-space schedulers are not suitable for blocking operations such as sleep or blocking IO. For example, Java initially supported green threads but it replaced with native threads to support preemptive multi-tasking. Similarly, earlier version of Rust used green threads with blocking IO that resulted in slow performance so it replaced green threads with native threads in later version. Thus, green threads are generally used with non-blocking IO or waits that automatically preempts the thread, saves its stack and resumes another thread. As a general rule, native threads work better if an application is CPU bound or requires real-time priority and green threads/coroutines provide better concurrency with IO bound applications.

Structured Concurrency

The structured concurrency was envisioned by Martin Sústrik to simplify writing concurrent applications. Following are building blocks of structured concurrency that remedy concurrency related issues discussed above:

Concurrency Scope

The structured concurrency defines a single entry and exit similar to top-down structured programming. It defines a concurrency scope or boundary where all asynchronous tasks must complete by the end of scope. The scope may optionally define the context or queue where the tasks will be run. This model simplifies semantics of asynchronous behavior because the lifetime of child tasks is tied with the parent scope. The parent scope automatically waits until all child tasks are completed.

Execution Context

The structured concurrency allows you to specify context, threads or queues where asynchronous code will run so that you can manage related asynchronous code and underlying resources easily.

Cancellation and Timeout

The structured concurrency provides first-class support for cancellation and timeout though it still requires that child tasks support cooperative cancellation as blocking operations cannot be easily interrupted.

Error Handling

The errors from child tasks are automatically propagated to the parent scope where they can be handled in a consistent way using language provided syntax.

Immutability or value semantics

The structured concurrency encourages use of immutable objects or pass by value semantics to avoid race condition or need for locks to protect the critical section.

Composition

The structured concurrency allows composing asynchronous code within another asynchronous function so that data and control flow can be managed easily. It allows errors to be propagated from the nested asynchronous code to calling function and it cancels all child asynchronous tasks if parent task is cancelled.

What’s missing from Structured Concurrency

The structured concurrency doesn’t specify exact paradigm of concurrency mechanism such as threads, coroutines, fibers, generators, actors, etc and instead it focuses on concurrency scope, data/control flow, error handling, cancellation/timeout and composition. Thus, it won’t solve data races if your application requires mutable shared data, which is accessed from multiple threads or coroutines concurrently so you will need to rely on synchronization primitives to protect the critical section.

Toy Web Crawler

I will use a toy implementation of a simple web crawler to show support of structured concurrency in some of my preferred languages. Following is a pseudocode of sequential version of this crawler:

MAX_DEPTH = 5
class URLRequest: 
  url, status, depth, error, created_at, started_at, completed_at

class WebCrawler:
  def crawl_all(root_urls):
	# priority such as page-rank
	pq = PriorityQueue()
	# track duplicate urls
	visited = Set()
	# add root urls to queue
	for url in root_urls:
		pq.add(URLRequest(url, pending, 0))

	# crawl urls using BFS
	total = 0
	while not pq.isEmpty():
		total+=1
		request = pq.pop()
		visited.add(request.url)
		urls = crawl(request)
		for url in urls:
			if not visited.contains(url) and 
            not is_spam(url) and request.depth+1 < MAX_DEPTH:
				pq.add(URLRequest(url, pending, request.depth+1))
	# all done
	print total

  # download, parse and index given url in the request
  def crawl(request):
	urls = []
	try:
		rqeuest.started_at = Date()
		contents = download(request.url)
		contents = jsrender(request.url, contents)
		if has_content_changed(request.url, contents):
			index(request.url, contents)
			urls = parse_urls(request.url, contents)
		request.status = completed
	except: err
		request.status = failed
		request.error = err
	# mark request completion time
	request.completed_at = Date()
	return urls

  def download(url):
	# check robots.txt and throttle policies
 	# may need timeout for slow websites and linearize 
    # requests to the same domain to prevent denial of service attack

  def jsrender(url, contents):
 	# for SPA apps that use javascript for rendering contents
	return contents

  def index(parent_url, contents):
	# apply standardize, stem, ngram, etc for indexing

  def parse_urls(parent_url, contents):
	# tokenize contents and extract href/image/script urls
	return urls

  def is_spam(url):
	# check spam or low-quality urls
	return false

  def has_content_changed(url, contents):
	# calculate hash digest and compare it with last digest
	return true

Above example defines crawl_all method to crawl list of root-urls that recursively invokes crawl method using breadth-first-search. The crawl method invokes stub-out methods for downloading url, parsing contents and indexing the contents.

Typescript

Typescript/Javascript on NodeJS platform offers a unique design for managing concurrency where only a single thread processes requests from event queue. Following is high-level architecture of NodeJS:

NodeJS process uses a single thread that takes next operation to execute form the event queue and executes in an event loop. It delegates some of system calls and asynchronous code to a small thread pool and uses non-blocking API when performing disk or network IO operations. This architecture eliminates the need to synchronize shared data as there is only a single thread accessing application state at a time (similar to actor model).

Using async/await in Typescript

Following is an implementation of web crawler using async-await syntax in Typescript:

import { Request, Response } from '../types/index';

const MAX_DEPTH = 4;
const MAX_URLS = 11;
const DOMAINS = [
  'ab.com',
  'bc.com',
  'cd.com',
  'yz.com',
];

export class Crawler {
  async crawl(urls: string[], timeoutMillis: number): Promise<number> {
    // Main scope of concurrency begin
    const res = await doCrawl(urls, 0, timeoutMillis);
    return res.childURLs;
    // Main scope of concurrency end    
  }
}

///////////////// PRIVATE METHODS ////////////////
const doCrawl = async (
  urls: string[],
  depth: number,
  timeoutMillis: number
): Promise<Response> => {
  const res = new Response();
  if (depth >= MAX_DEPTH) {
    res.failed('max-depth');
    return res;
  }
  const requests = urls.map((u) => new Request(u, depth, timeoutMillis));
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

  const childURLs : number = results.reduce((total: number, r: Response) => total + r.childURLs, 0);
  res.succeeded(childURLs);
  return res;
};


const handleCrawl = async (req: Request): Promise<Response> => {
  const res = new Response();
  const contents = await download(req.url);
  const newContents = await jsrender(req.url, contents);
  if (
    hasContentsChanged(req.url, newContents) &&
    !isSpam(req.url, newContents)
  ) {
    await index(req.url, newContents);
    const urls = await parseURLs(req.url, newContents);
    const childResp = await doCrawl(urls, req.depth + 1, req.timeoutMillis);
    res.succeeded(childResp.childURLs + 1);
  } else {
    res.failed("contents didn't change");
  }
  return res;
};

const download = async (url: string): Promise<string> => {
  // TODO check robots.txt and throttle policies
  // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
  return randomString(100);
};

const jsrender = async (url: string, contents: string): Promise<string> => {
  // for SPA apps that use javascript for rendering contents
  return contents;
};

const index = async (url: string, contents: string) => {
  // apply standardize, stem, ngram, etc for indexing
};

const parseURLs = (url: string, contents: string): string[] => {
  // tokenize contents and extract href/image/script urls
  const urls = [];
  for (var i = 0; i < MAX_URLS; i++) {
    urls.push(randomUrl());
  }
  return urls;
};

const hasContentsChanged = (url: string, contents: string): boolean => {
  return true;
};

const isSpam = (url: string, contents: string): boolean => {
  return false;
};

const randomUrl = (): string => {
  const i = Math.floor(Math.random() * DOMAINS.length);
  return 'https://' + DOMAINS[i] + '/' + randomString(20);
};

const randomString = (n: number): string => {
  let letters =
    'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
  let text = '';
  for (let i = 0; i < n; i++) {
    text += letters.charAt(Math.floor(Math.random() * letters.length));
  }
  return text;
};

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

The async-await syntax in Typescript is a syntactic sugar on top of promises. The async function returns a promise and automatically wraps return value with Promise.

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following is a unit test for testing the behavior of async/await based crawler:

import { Crawler } from '../lib/index';
import { expect } from 'chai';

const EXPECTED_URLS = 19032;
const ROOT_URLS = [
  'a.com',
  'b.com',
  'c.com',
  'd.com',
  'e.com',
  'f.com',
  'g.com',
  'h.com',
  'i.com',
  'j.com',
  'k.com',
  'l.com',
  'n.com',
];

describe('crawler', async () => {
  it('crawling urls with nesting', async () => {
    const started = new Date().getTime();
    const timeout = 5000;
    const crawler = new Crawler();
    const res = await crawler.crawl(ROOT_URLS, timeout);
    const elapsed = new Date().getTime() - started;
    console.log(`Crawl took ${elapsed} to process ${res}`);
    expect(res).equal(EXPECTED_URLS);
  });

});

Following are some of the concurrency support in Typescript:

  • Concurrency scope – Though, typescript doesn’t support concurrency scope as first class citizen or an option to specify queue/context but you can manage boundary using await in the main method.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling – Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • As NodeJS uses a single thread in an event loop, you don’t need to worry about shared state being updated in multiple threads.

Following are the major shortcomings in Typescript for its support of structured concurrency:

  • Typescript doesn’t support value semantics and objects are passed by reference except primitive types.
  • As NodeJS uses a single thread for even loop with small thread pool for asynchronous operation, it limits the tasks that you can run in parallel on multi-core hardware.
  • Typescript doesn’t support cancellation and timeout natively so you have to rely on cooperative cancellation. You can implement timeout partially using Promise.race but it’s not a reliable way to handle timeouts, e.g.
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

You can download full code for Typescript example from https://github.com/bhatti/concurency-katas/tree/main/node.

Overall, Typescript/NodeJS is suitable for IO-bound applications where a little time is spent on each operation so that event-loop can switch to next task but it’s not suitable when the application has blocking/CPU-bound operations, requires more background tasks or requires high level of concurrency and parallelism.

September 6, 2020

Review of “Software Engineering at Google”

Filed under: Computing,Technology — admin @ 5:09 pm

In “Software Engineering at Google”, engineers from Google share practices from software development life-cycle at Google. Here are a few lessons from the book that are applicable to most engineers while omitting Google’s unique internal practices and tools:

Software Engineering

Hyrum’s law

With a sufficient number of users of an API, it does not matter what you promise in the contract: all observable behavior of your system will be depended on by somebody.

Shifting Left

Finding problems earlier in the developer workflow usually reduces costs.

Hiding considered Harmful

Share ideas early to prevent personal missteps and vet your ideas.

Bus Factor

Disperse knowledge to reduce the bus factor.

3 Pillars of social interaction

  • Humility (lose the ego and learn to give/take criticism)
  • Respect
  • Trust (fail fast and iterate)

Blameless Post-Mortem Culture

  • Summary
  • Timeline of event
  • Primary cause
  • Impact/damage assessment
  • Actions for quick fix
  • Actions for prevent in future
  • Lessons learned.

Knowledge sharing

  • Psychological safety
  • Respect
  • Recognition
  • Developer guides
  • static analysis
  • newsletter
  • readability certification – where each Changelist (CL) requires readability approval from readability certified engineer

Leadership

Servant leadership

  • Create an atmosphere of humility and trust
  • Helping a team achieve consensus and serve your team

Antipattern

  • Hire pushovers
  • Ignore low performers
  • Ignore human issues
  • Be everyone’s friend
  • Compromise the hiring bar
  • Treat your team like children.

Positive patterns

  • Lose the ego – ownership, accountability, responsibility
  • Find people who can give constructive feedback
  • Be a zen master – leader is always on stage, maintain calmness, ask questions
  • Be a catalyst (build consensus)
  • Remove roadblocks
  • Be a teacher and mentor
  • Set clear goals – create mission statement for the team
  • Be honest (give hard feedback without using compliment sandwich)
  • Track happiness – recognition
  • Delegate but get your hands dirty
  • Seek to replace yourself
  • Know when to make waves
  • Shield your team from chaos
  • Give your team air cover – defend team from uncertainty and frivolous demands
  • Let your team know when they are doing well.

Leading

  • Always be deciding (weigh trade-offs)
  • Identify the blinder
  • Identify the key trade-offs
  • Decide/Iterate (e.g. trade-offs within web search are latency, quality and capacity – pick two)
  • Always be leaving
  • Build a self-driving team
  • Divide the problem space (delegate sub-problems to leaders)
  • Anchoring a team’s identity (rather than putting a team in charge of a specific product/solution, anchor team to the problem)
  • Always be scaling
  • Cycle of success: analysis (trade-off/consensus) -> struggle (fake it) -> traction (progress) -> Reward (solves new problem)
  • Important vs Urgent (delegate urgent things, dedicate time, tools such as GTD)
  • Learn to drop balls (split tasks between top 20%, bottom %20, middle %60 – drop bottom 80%)
  • Protecting your energy (vacation/breaks).

Measurement

Goals:

Goal is a desired result/property without reference to metric – QUANTS (Quality, Attention, Intellectual, Tempo, Satisfaction), e.g. quality of the code, attention from engineers, intellectual complexity, tempo/velocity and satisfaction.

Signal:

Signal are things that need to be measured – may not be measurable.g. if goal is learning from readability, signals can be reporting learning from the readability process.

Metrics:

Metric is a proxy for signal, e.g. quantitative metrics such as readability survey how readability process has improved the code quality. Each metric should be traceable.

Styling guiding principles

  • Rules must pull their weight
  • Optimize for the reader
  • Be consistent (scaling, minimize ramp-up, resilience to time)
  • Setting the standard (coding conventions)
  • Avoiding error-prone/surprising constructs
  • Concede to practicalities.
  • Use tools such as error checkers, code formatters, etc

Code Review

  • Correctness & comprehension
  • Approval from OWNERS (stored as a regular file in repository)
  • Approval from readability
  • Knowledge sharing
  • Be polite & professional
  • Write small changes
  • Write good change description
  • Keeping reviewers to minimum
  • Automate where possible

Documentation

  • Know your audience (experience level, domain knowledge, purpose)
  • Documentation types:
    • Reference documentation
    • Design documents
    • Tutorials
    • Conceptual documentation
    • Landing pages)
  • Documentation philosophy (WHO, WHAT, WHEN, WHERE and WHY).

Testing

Benefits

  • Less debugging
  • Increased confidence in changes
  • Improved documentation
  • Simpler reviews
  • Thoughtful design, fast/high quality releases
  • Test scope – 5% functional, 15% integration, 80% unit
  • Beyonce Rule – If you liked it, then you shoulda put a test on it
  • Pitfall of large test suits (no more mocks)
  • Test certified (project health pH tool to gather metrics such as test coverage, test latency, etc)

Unit testing

  • Prevent brittle tests
  • Strive for unchanging tests (pure refactoring, new features, bug fixes, behavioral changes)
  • Test via public APIs (to avoid brittle tests)
  • Test State, Not Interactions (avoid mock objects)
  • Writing clear tests, Make tests complete and concise (using helper methods object-factories, assertions)
  • Test behavior, Not methods (assert each behavior in separate method instead of testing each method per test, e.g. display_showName, display_showWarning)
  • Structure tests to emphasize behavior
    • comment Given/When/Then
    • use And to break it further
    • can have multiple combinations of When/Then for dependent behavior
  • Name tests after the behavior being tested
  • Don’t put logic in tests
  • Write clear failure messages
  • Test and code sharing: DAMP (descriptive and meaningful phrases), Not DRY (duplicating some construction logic of objects instead of helper methods)
  • Shared Values – use builder methods to construct objects instead of static constants
  • Shared Setup (be careful with too much dependencies in common setup)
  • Share helpers and validations/assertions
  • Define test infrastructure – sharing code across multiple test suites

Test Doubles

  • Testable code
  • Applicability to avoid brittleness and complexity
  • Fidelity in how close the behavior mimics real implementation
  • Avoid mocking frameworks if possible
  • Seams – Use Dependency injection to make the code testable

Techniques

  • Faking – lightweight implementation but low fidelity
  • Stubbing – specify the expected behavior with Mocks
  • Interaction testing – verifying method is called properly but it can lead to complex tests so avoid it
  • Real implementation – high fidelity and give more confidence but evaluate based on execution time, determinism
  • Prefer State testing over interaction testing and use interaction testing only for state changing functions
  • Avoid over specification.

Large functional tests

  • Obtain a system under test, seed data, perform actions, verify behavior
  • You may use multiple tests in chains and store intermediate data so that output of one test is used as input to another
  • Each SUT is judged based on hermeticity (SUT’s isolation from usage and interactions from other components) and fidelity (SUT’s accuracy in reflecting the prod environment). For example, staging tests use staging deployment but it requires code to be deployed there. Avoid 3rd party dependencies in SUT environment and use doubles to fake it
  • You can also use record/play proxies or use consumer-driven contract that defines contract for client and provider of the service (Pact contract testing)

Test data

  • Seeded data
  • Test traffic
  • Domain data – pre-populated data in database
  • Realistic baseline/data
  • Seeding API.

Verification

  • Manual
  • Assertions

Types of larger tests

  • Functional testing
  • Browser and device testing
  • Performance
  • Load and stress testing
  • Deployment configuration testing
  • Exploratory testing (manual)
  • Bug bashes
  • A/B diff regression testing
  • UAT, Probes and canary analysis (in prod)
  • Disaster recovery and chaos engineering
  • User evaluation

Version Control and Build System

Google uses Mono-repo, one-version rule for version control to avoid confusing choices and a task based build system. All dependencies also follow one-version rule to simplify deployment. Google also uses static analysis/linters to provide feedback on code before it’s committed.

Dependency management

Google uses semantic versioning for managing dependencies. You can think of dependency as a directed graph and requirements as edges. The process of finding a mutually compatible set of dependencies is akin to SAT-solvers. Minimum version selection (MVS) can be used to find next higher version to make dependencies compatible as semantic version is not reliable way to trust backward compatibility.

Continuous integration

The code goes through edit/compile -> pre-submit -> post-submit -> release-candidate -> rc-promotion -> final-rc phases of time time. The CI provides fast feedback using automated and continuous builds and delivery. Pre-submit uses fast unit tests and post-submit uses large tests (hermetic tests against test environment with greater determinism and isolation) to verify changes and SLO.

Continuous delivery

Google uses idioms of agility (small batches), automation, isolation, reliability, data-driven decision making, and phases rollout for continuous delivery. It uses shifting left to identify problems earlier and ship only what gets used.

August 19, 2020

Review of “Building Secure and Reliable Systems”

Filed under: Computing,Technology — admin @ 5:33 pm

The “Building Secure and Reliable Systems” book shares best practices from Google’s security and SRE engineers. Here is a summary of these best practices:

The first chapter discusses tradeoff between security and reliability, e.g. reliability protects against non-malicious failure but may expand security surface via redundancy whereas security risk comes from adversarial attacks. Both reliability and security need confidentiality, integrity and availability but with different perspectives. Complex systems are difficult to reason so you must apply “Defense in depth”, “Principle of least privilege” and “Distinct failure domains” to limit the blast radius of failure. For example, Google uses geographic regions to limit the scope of credentials.

The second chapter focuses on “security adversaries” and “attack motives” who may come from hobbyists, hacktivist, researchers, criminals, cyber warfare, insiders and other background. You can apply CAPTCHA, automation/AI, zero trust, multi-party authorization, auditing/detection and recoverability to protect against these attacks.

The third chapter is part of second section of the book that focuses on designing secure and reliable systems. It introduces safe proxies in production environment that enforce authentication, multi-party authorization (MPA), auditing, rate limiting, zero touch, access control, etc. For example, Google uses CLI proxy to execute commands that are controlled via security policy, MPA and provides auditing/logs.

The chapter four examines security tradeoffs and reviews product features that may include functional and non-functional requirements (e.g. security, reliability, SLO dev velocity). Reliability and security are also considered emergent properties of system design and encompass entire product and services. The chapter also gives an example of design document template that includes sections for scalability, redundancy/reliability, dependencies, data-integrity, SLA, and security/privacy.

The chapter five discusses designing for least privilege that uses authentication and authorization. It also examines zero-trust networks that doesn’t grant any illegal access and zero-touch interfaces where all access is automated. It recommends writing small functions so that access control can be clearly defined, breaking glass in case of emergency to bypass certain authorization systems, auditing, testing for least privilege, multi-party authorization (MPA), three-factor authorization (3FA where access is approved from two platforms), business justifications, temporary access, proxies etc. This chapter also discusses tradeoffs of complex security with other factors such as company culture, data quality, user productivity, and development complexity.

The chapter six focuses on designing for understanding to reduce likelihood of security vulnerabilities and increase confidence in the system security. It defines system invariant, which is a property that is always true and can be used to assert security and reliability properties. It suggests using mental model to understand complex security system and explains identities, authentication, and access control concepts. When breaking a system into smaller components, the chapter recommends using trusted computing base (TCB) to create a security boundary that enforces security policies. In order to provide access from one TCB to another, you may issue end-user context ticket (EUC) that provides access temporarily. In order to standardize security policies, you may use a common framework for request dispatching, input sanitization, authentication, authorization, auditing, logging, monitoring, quota, load balancing, configuration, testing, dashboard, alerting, etc.

The chapter seven focuses on extensibility and new changes. For example, keeping dependencies up-to-date, automated testing, release frequently, using containers, micro services, etc.

The chapter eight focuses on resilience that describes the system’s ability to hold out against a major malfunction or disruption. It encourages designing the system with independent layers, modularization, redundancy, automation, security in defense, controlled degradation (partially failure), load shedding, throttling, automated response. You will need to consider tradeoffs between reliability and security, e.g. failing safe vs failing secure where reliability/safety may require ACL is “allow-all” but security may require ACL is “deny-all”. You can segment your network and Compartmentalize your system to reduce the blast radius. With micro-service architecture, you can assign distinct roles for each service and add geographic location or time as a scope of access. The chapter then defines failure domain, which is a type of blast radius control that creates isolation by partitioning a system into multiple equivalent but completely independent copies with its own data. Any of the individual partitions can take over for the entire system during an outage and help protect systems from global impact. You can validate the system continuously for failures using fuzzing and other types of testing.

The chapter nine discusses recoverability from random, accidental, software failures and errors. The chapter recommends designing emergency push system to simply be your regular push system turned up-to maximum for recovering it from failure. In order to prevent rollback to older-version, you can collect undesirable versions into a deny list or use white-list of allowed versions, which is used in the release system for verification. Also, you can maintain security version numbers (SVNs) and minimum acceptable security version numbers (MASVNs) and rotate signing keys, e.g.

ComponentState[DenyList] = ComponentState[DenyList].union(self[DenyList))
ComponentState[MASVN] = max(self[MASVN], ComponentState[MASVN])

def IsUpdateAllowed(self, Release, ComponentState, KeyDatabase):
  assert Release[Version] not in ComponentState[DenyList]
  assert Release[SVN] >= ComponentState[MASVN]
  assert VerifySignature(Release, KeyDatabase)

The chapter ten explains how to mitigate D.O.S. attacks where attacker may compromise vulnerable machines or launch amplification attacks. This chapter suggests using edge routers to throttle high-bandwidth attacks and eliminate attack traffic as early as possible. For example, You can use network and application load balancers to continually monitor incoming traffic. Other mitigating techniques include using caching proxies, minimize network requests (e.g. using spriting), minimize egress bandwidth, CAPTCHA, rate limit, monitoring/alerting (MTTD mean-time-to-detect, MTTR mean-time-to-repair), graceful degradation, exponential backoff, jitter, etc.

The chapter eleven is part of third section and focuses on maintaining trusted CA. For example, you can use secure and memory-safe languages to parse certificates or CSR requests. You may need to use third-party libraries but you can add testing for validation.

The chapter twelve focuses on writing code, e.g. using frameworks that enforce security and reliability. You can use RPC frameworks that may provide logging, authentication, authorization, rate-limiting. This chapter covers OWASP top vulnerabilities such as SQL injection that can be prevented by using parameterized SQLs; XSS that can be prevented by using sanitizing user input (safeHTML) and incremental rollout. Other coding techniques include simplicity, minimizing multi-level nesting/cyclometic complexity, eliminate yagni smells, pay tech-debt, refactoring. The chapter also suggests using memory-safe and strongly/static typed languages.

The chapter thirteen examines testing code using unit and integration tests. It also introduces other testing techniques such as fuzz testing, chaos engineering, static program analysis, code inspection tools (Error Prone for Java and Clang-Tidy), and formal methods.

The chapter fourteen describes deployment phase of software development that may include pushing the code, downloading a new binary, updating configuration, migrating database, etc. The chapter reviews threat model to prevent bad deployment such as accidental change, malicious change, bad configuration, stealing integrity keys, deploying older version, backdoor, etc. It suggests best practices such as code-reviews, automation, verifying artifacts, validating configuration, binary provenance, etc. The binary provenance verifies input to the artifact and validate transformation and entity that performed the build. The provenance fields include authenticity (signature), output, input (source and dependencies), command, environment, input metadata, debug-info, versioning. A build is considered verifiable if the binary provenance produced by the build is trustworthy. The verifiable build architectures include trusted build service, hermetic builds, reproducible builds, and verifiable builds, however you may need break-glass mechanism that bypasses the policy in case of outage. You can add post-deployment verification to validate the deployment.

The chapter fifteen shows how to investigate systems using debug flags, verifying data corruption, reviewing logs, and designing for safety.

The chapter sixteen is part of section four that focuses on disaster planning. This chapter introduces best practices to address short and long-term recovery such as performing analysis of potential disaster, establishing a response time, creating a response plans/playbooks, configuring systems, testing procedures/systems, and incorporating feedback from tests and evaluation. It shows how to setup incident response team that may include incident commander, SREs, customer support, legal, forensic, security/privacy engineers, etc. IR teams can use severity and priority models to categorize incidents based on severity of their impact on the organization and priority model to define response time. The response plan include incident reporting, triage, SLO, roles/responsibilities and communications. You also need to test systems and response plans and audit automated system. Red team testing can help simulate how the system reacts to an attack.

The chapter seventeen reviews crisis management that determines if the security incident is a crisis. This can be evaluated in triage that determines severity of the incident and whether the incident is a result of system bug or a compromise that is yet to be discovered. In the context of crisis management, operational security (OpSec) refers to the practice of keeping your response activity secret. For example, common OpSec mistakes include documenting incident in email, logging into the compromised systems, locking accounts/changing passwords, taking system offline. The chapter instead suggests meeting in person, use key-based access (without login), etc. You can apply forensics processes to investigate the security compromise. The chapter ends with summary of best practices that include triage, declaring an incident, communicate with executives and SecOps, creating IR team and forensics team, preparing communication and remediation and closure.

The chapter eighteen reviews recovery and aftermath from the security incident. You can establish recovery time based on if it affected mission critical system.

The goal of your recovery effort is to mitigate an attack and return your systems to their normal routine state, however complex security events may require parallelizing incident management/response execution. In order to return your systems to normal, you need to have a complete list of the systems, networks, and data affected by the attack. You also need sufficient information about the attacker’s tactics, techniques, and procedures (TTPs) to identify any related resources that may be impacted. There are several considerations before recovery such as:

  • how will your attacker respond to your recovery effort?
  • is your recovery infrastructure or tooling compromised?
  • what variants of the attack exist?
  • will your recovery reintroduce attack vectors?
  • what are your mitigation options?

The recovery checklists includes:

  • isolating Assets (quarantine)
  • system Rebuilds and software Upgrades
  • data sanitization
  • recovery data
  • credential rotation
  • postmortems

The chapter nineteen is part of section five of the book that offers making security a part of the organization culture. It suggests making security a team responsibility, providing security to users, designing for defense in depth and being transparent to the community.

The chapter twenty describes roles and responsibilities for security and reliability. For example, security experts implement security specific technologies, SREs develop centralized infrastructures, and security specialists can devise best practices. You can embed security experts with the development teams or review/audit security practices. Organizations can create red team that focus on offensive exercises for simulating attacks and blue team for assessing and hardening software and infrastructure.

The chapter twenty one shows how to build a culture of security and reliability. The chapter suggests organization culture of by-default security and reliability and encourage employees to discuss these topics early in project life-cycle. The chapter also suggests culture of review where peer reviews ensure that code implement least privilege and other security considerations. The culture should include awareness of security aspects, sustainability, transparency, and communication.

« Newer PostsOlder Posts »

Powered by WordPress