Shahzad Bhatti Welcome to my ramblings and rants!

August 15, 2021

Structured Concurrency with Swift

Filed under: Concurrency,Uncategorized — Tags: , , , — admin @ 6:19 pm

I wrote about support of structured concurrency in Javascript/Typescript, Erlang/Elixir, Go, Rust, Kotlin and Swift last year (Part-I, Part-II, Part-III, Part-IV) but Swift language was still in development for async/await and actors support. The Swift 5.5 will finally have these new concurrency features available, which are described below:

Async/Await

As described in Part-IV, Swift APIs previously used completion handlers for asynchronous methods that suffered from:

  • Poor error handling because you could not use a single way to handle errors/exceptions instead separate callbacks for errors were needed
  • Difficult to cancel asynchronous operation or exit early after a timeout.
  • Requires a global reasoning of shared state in order to prevent race conditions.
  • Stack traces from the asynchronous thread don’t include the originating request so the code becomes hard to debug.
  • As Swift/Objective-C runtime uses native threads, creating a lot of background tasks results in expensive thread resources and may cause excessive context switching.
  • Nested use of completion handlers turn the code into a callback hell.

Following example shows poor use of control flow and deficient error handling when using completion handlers:

func fetchThumbnails(for ids: [String],
    completion handler: @escaping ([String: UIImage]?, Error?) -> Void) {
    guard let id = ids.first else { return handler([:], nil) }
    let request = thumbnailURLRequest(for: id)
    URLSession.shared.dataTask(with: request) { data, response, error in
        guard let response = response,
              let data = data else { return handler(nil, error) } // Poor error handling
        UIImage(data: data)?.prepareThumbnail(of: thumbSize) { image in
            guard let image = image else { return handler(nil, ThumbnailError()) }
        }
        fetchThumbnails(for: Arrays(ids.dropFirst()) { thumbnail, error in
            // cannot use loop
            ...
        }
    }
}

Though, use of Promise libraries help a bit but it still suffers from dichotomy of control flow and error handling. Here is equivalent code using async/await:

func fetchThumbnails(for ids: [String]) async throws -> [String: UIImage] {
	let thumbnails: [String: UIImage] = [:]
    for id in ids {
    	let request = thumbnailURLRequest(for: id)
        let (data, response) = try await URLSession.shared.dataTask(for: request)
        try validateResponse(response)
        guard let image = await UIImage(data: data)?.byPreparingThumbnail(ofSize: thumbSize) else { throw ThumbnailError()) }
        thumbnails[id] = image
    }
    return thumbnails
}

As you can see, above code not only improves control flow and adds uniform error handling but it also enhances readability by removing the nested structure of completion handlers.

Tasks Hierarchy, Priority and Cancellation

When a new task is created using async/await, it inherits the priority and local values of the parent task, which are then passed to the entire hierarchy of child tasks from the parent task. When a parent task is cancelled, the Swift runtime automatically cancels all child tasks, however Swift uses cooperative cancellation so child tasks must check for cancellation state otherwise they may continue to execute, however the results from cancelled tasks are discarded.

Continuations and Scheduling

Swift previously used native threads to schedule background tasks, where new threads were automatically created when a thread is blocked or waiting for another resource. The new Swift runtime creates native threads based on the number of cores and background tasks use continuations to schedule the background task on the native threads. When a task is blocked, its state is saved on the heap and another task is scheduled for processing on the thread. The await syntax suspends current thread and releases control until the child task is completed. This cooperative scheduling requires runtime support for non-blocking I/O operations and system APIs so that native threads are not blocked and continue to work on other background tasks. This also limits background tasks from using semaphores and locks, which are discussed below.

async function

In above example, when a thread is working on a background task “updateDatabase” that starts a child tasks “add” or “save”, it saves the tasks as continuations on heap. However, if current task is suspended then the thread can work on other tasks as shown below:

Multiple Asynchronous Tasks

The async/await in Swift also allows scheduling multiple asynchronous tasks and then awaiting for them later, e.g.

struct MarketData {
    let symbol: String
    let price: Int
    let volume: Int
}

struct HistoryData {
    let symbol: String
    let history: [Int]
    func sum() -> Int {
      history.reduce(0, +)
    }
}

func fetchMarketData(symbol: String) async throws -> MarketData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(MarketData(symbol: symbol, price: 10, volume: 200)))
        }
    }
}

func fetchHistoryData(symbol: String) async throws -> HistoryData {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(HistoryData(symbol: symbol, history: [5, 10, 15, 20])))
        }
    }
}

func getMovingAverage(symbol: String) async throws -> Int {
    async let marketData = fetchMarketData(symbol: symbol)
    async let historyData = fetchHistoryData(symbol: symbol)
    let sum = try await marketData.price + historyData.sum()
    return try await sum / (historyData.history.count+1)
}

The async let syntax is called concurrent binding where the child task executes in parallel to the parent task.

Task Groups

The task groups allow dispatching multiple background tasks that are executed concurrently in background and Swift automatically cancels all child tasks when a parent task is cancelled. Following example demonstrates use of group API:

func downloadImage(id: String) async throws -> UIImage {
    await withCheckedContinuation { c in
        DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
            c.resume(with: .success(UIImage(data: [])))
        }
    }
}

func downloadImages(ids: [String]) async throws -> [String: UIImage] {
    var images: [String: UIImage] = [:]
    try await withThrowingTaskGroup(of: (String, UIImage).self) { group in
        for id in ids {
            group.addTask(priority: .background) {
                return (id, try await downloadImage(id: id))
            }
        }
        for try await (id, image) in group {
            images[id] = image
        }
    }
    return images
}

As these features are still in development, Swift has recently changed group.async API to group.addTask. In above example, images are downloaded in parallel and then for try await loop gathers results.

Data Races

Swift compiler will warn you if you try to mutate a shared state from multiple background tasks. In above example, the asynchronous task returns a tuple of image-id and image instead of mutating shared dictionary. The parent task then mutates the dictionary using the results from the child task in for try await loop.

Cancellation

You can also cancel a background task using cancel API or cancel all child tasks of a group using group.cancelAll(), e.g.

group.cancelAll()

The Swift runtime automatically cancels all child tasks if any of the background task fails. You can store reference to a child task in an instance variable if you need to cancel a task in a different method, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]

    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
              ...
            }
    }
    func collectionView(_ collectionView: UICollectionView,
        didEndDisplaying cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
        imageTasks[item]?.cancel()
    }
}

As cancellation in Swift is cooperative, you must check cancellation state explicitly otherwise task will continue to execute but Swift will reject the results, e.g.

if Task.isCancelled {
    return // return early
}

Timeout

The task or async/await APIs don’t directly support timeout so you must implement it manually similar to cooperative cancellation.

Semaphores and Locks

Swift does not recommend using Semaphores and Locks with background tasks because they are suspended when waiting for an external resource and can be later resumed on a different thread. Following example shows incorrect use of semaphores with background tasks:

func updateDatabase(_ asyncUpdateDatabase: @Sendable @escaping () async -> Void {
  let semaphore = DispatchSemaphore(value: 0)
  Task {
    await asyncUpdateDatabase()
    semaphore.signal()
  }
  semaphore.wait() // Do not use unsafe primitives to wait across task boundaries
}

TaskLocal

You can annotate certain properties with TaskLocal, which are stored in the context of Task and is available to the task and all of its children, e.g.

enum TracingExample {
    @TaskLocal
    static let traceID: TraceID?
}
...
guard let traceID = TracingExample.traceID else {
  print("no trace id")
  return
}
print(traceID)

Detached Tasks (Unstructured)

Above tasks and async/await APIs are based on structured concurrency where parent task is not completed until all child background tasks are done with their work. However, Swift allows launching detached tasks that can continue to execute in background without waiting for the results, e.g.

class MyDelegate: UICollectionViewDelegate {
    var imageTasks: [IndexPath: Task<Void, Error>] = [:]
    func collectionView(_ collectionView: UICollectionView,
        willDisplay cell: UICollectionViewCell,
        forItemAt item: IndexPath) {
            let ids = getImageIDs(for: item)
            imageTasks[item] = Task {
                defer { imageTasks[item] = nil }
                let images = try await getImages(for: ids)
                Task.detached(priority: .background) {
                    await withThrowingTaskGroup(of: Void.self) { g in
                        g.addTask { try await addImageCache(for: images) }
                        g.addTask { try await logImages(for: images) }
                    }
                }
                display(images, in: cell)
            }
    }
}

Legacy APIs

The legacy code that use completion-handlers can use following continuation APIs to support async/await syntax:

func persistPosts() async throws -> [Post] {
    typealias PostContinuation = CheckedContinuation<[Post], Error>
    return try await withCheckedThrowingContinuation { (continuation: PostContinuation) in
        self.getPersistentPosts { posts, error in
            if let error = error {
                continuation.resume(throwing: error)
            } else {
                continuation.resume(returning: posts)
            }
        }
    }
}

In above example, the getPersistentPosts method used completion-handler and persistPosts method provides a bridge so that you can use async/await syntax. The resume method can only called once for the continuation. 

You may also save continuation in an instance variable when you need to resume in another method, e.g.

class MyViewController: UIViewController {
    private var activeContinuation: CheckedContinuation<[Post], Error>?
    func sharePostsFromPeer() async throws -> [Post] {
        try await withCheckedThrowingContinuation { continuation in
            self.activeContinuation = continuation
            self.peerManager.syncSharedPosts()
        }
    }
}
extension MyViewController: PeerSyncDelegate {
    func peerManager(_ manager: PeerManager, received posts: [Post]) {
        self.activeContinuation?.resume(returning: posts)
        self.activeContinuation = nil
    }
    func peerManager(_ manager: PeerManager, hadError error: Error) {
        self.activeContinuation?.resume(throwing: error)
        self.activeContinuation = nil
    }
}

Implementing WebCrawler Using Async/Await

Following example shows implementation of WebCrawler using async/await described in Part I of the concurrency series:

import Foundation
struct Request {
    let url: String
    let depth: Int
    let deadline: DispatchTime
}
enum CrawlError: Error {
    case timeoutError(String)
}
let MAX_DEPTH = 4
let MAX_URLS = 11
let DOMAINS = [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "gh.com",
  "hi.com",
  "ij.com",
  "jk.com",
  "kl.com",
  "lm.com",
  "mn.com",
  "no.com",
  "op.com",
  "pq.com",
  "qr.com",
  "rs.com",
  "st.com",
  "tu.com",
  "uv.com",
  "vw.com",
  "wx.com",
  "xy.com",
  "yz.com",
];
public func crawl(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawl(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}
public func crawlWithActors(urls: [String], deadline: DispatchTime) async throws -> Int {
    // Main scope of concurrency begin
    // TODO add timeout using race, e.g. await Task.WhenAny(crawlTask, Task.Delay(deadline)) == crawlTask
    return try await doCrawlWithActors(urls: urls, depth: 0, deadline: deadline)
    // Main scope of concurrency end
}

///////////////// PRIVATE METHODS ////////////////
func doCrawl(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    try await withThrowingTaskGroup(of: (Request, Int).self) { group in
        for req in requests {
	    group.addTask(priority: .background) {
	        return (req, try await handleRequest(req))
	    }
        }
        for try await (req, childURLs) in group {
	    if totalChildURLs % 10 == 0 {
		print("received request \(req)")
	    }
	    totalChildURLs += childURLs
        }
    }
    return totalChildURLs
}
func doCrawlWithActors(urls: [String], depth: Int, deadline: DispatchTime) async throws -> Int {
    if depth >= MAX_DEPTH {
	return 0
    }
    let requests = urls.map { Request(url: $0, depth: depth, deadline: deadline) }
    var totalChildURLs = 0
    let crawler = CrawlActor()
    for req in requests {
     	let childURLs = try await crawler.handle(req)
	totalChildURLs += childURLs
    }
    return totalChildURLs
}
func handleRequest(_ request: Request) async throws -> Int {
    let contents = try await download(request.url)
    let newContents = try await jsrender(request.url, contents)
    if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
        try await index(request.url, newContents)
        let urls = try await parseURLs(request.url, newContents)
        let childURLs = try await doCrawl(urls: urls, depth: request.depth + 1, deadline: request.deadline)
        return childURLs + 1
    } else {
        return 0
    }
}
func download(_ url: String) async throws -> String {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    return randomString(100)
}
func jsrender(_ url: String, _ contents: String) async throws -> String {
    // for SPA apps that use javascript for rendering contents
    return contents
}
func index(_ url: String, _ contents: String) async throws {
    // apply standardize, stem, ngram, etc for indexing
}
func parseURLs(_ url: String, _ contents: String) async throws -> [String] {
    // tokenize contents and extract href/image/script urls
    var urls = [String]()
    for _ in 0..<MAX_URLS {
        urls.append(randomUrl())
    }
    return urls
}
func hasContentsChanged(_ url: String, _ contents: String) -> Bool {
    return true
}
func isSpam(_ url: String, _ contents: String) -> Bool {
    return false
}
func randomUrl() -> String {
    let number = Int.random(in: 0..<WebCrawler.DOMAINS.count)
    return "https://" + WebCrawler.DOMAINS[number] + "/" + randomString(20)
}
func randomString(_ length: Int) -> String {
  let letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
  return String((0..<length).map{ _ in letters.randomElement()! })
}

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using try await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following are major features of the structured concurrency in Swift:

  • Concurrency scope?—?The async/await defines scope of concurrency where all child background tasks must be completed before returning from the asynchronous function.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling?—?Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • Swift runtime schedules asynchronous tasks on a fixed number of native threads and automatically suspends tasks when they wait for I/O or other resources.

Following are the major shortcomings in Swift for its support of structured concurrency:

  • The most glaring omission in above implementation is timeout, which is not supported in Swift’s implementation.
  • Swift runtime manages scheduling of tasks and you cannot pass your own execution dispatcher for scheduling background tasks.

Actors

Actor Model is a classic abstraction from 1970s for managing concurrency where an actor keeps its internal state private and uses message passing for interaction with its state and behavior. An actor can only work on one message at a time, thus it prevents any data races when accessing from multiple background tasks. I have previously written about actors and described them Part II of the concurrency series when covering Erlang and Elixir.

actor

Instead of creating a background task using serial queue such as:

final class Counter {
    private var queue = DispatchQueue(label: "counter.queue")
    private var _count : Int = 0
    var count: Int {
        queue.sync {
            _count
        }
    }

    func incr() {
        queue.async(flags: .barrier) {
            self._count += 1
        }
    }
    func decr() {
        queue.async(flags: .barrier) {
            self._count -= 1
        }
    }
}

The actor syntax simplifies such implementation and removes all boilerplate e.g.

actor Counter {
    var count: Int = 0
    func incr() {
        count += 1
    }
    func decr() {
        count -= 1
    }
}

Above syntax protects direct access to the internal state and you must use await syntax to access the state or behavior, e.g.

Task {
	let c = Counter()
    await withTaskGroup(of: Void.self) { group in
        for i in 0..<100 {
            group.async {
                await c.incr()
            }
        }
    }
    print("count \(await c.count)")
}

Priority Inversion Principle

The dispatch queue API applies priority inversion principle when a high priority task is behind low priority tasks, which bumps up the priority of low priority tasks ahead in the queue. The runtime environment then executes the high priority task after completing those low priority tasks. The actor API instead can choose high priority task directly from the actor’s queue without waiting for completion of the low priority tasks ahead in the queue.

Actor Reentrancy

If an actor invokes another actor or background task in its function, it may get suspended until the background task is completed. In the meantime, another client may invoke the actor and modify its state so you need to check assumptions when changing internal state. A continuation used for the background task may be scheduled on a different thread after resuming the work, you cannot rely on DispatchSemaphore, NSLock, NSRecursiveLock, etc. for synchronizations.

Following code from WWDC-2021 shows how reentrancy can be handled safely:

actor ImageDownloader {
    private enum CacheEntry {
        case inProgress(Task.Handle<Image, Error>)
        case ready(Image)
    }
    private var cache: [URL: CacheEntry] = [:]
    func downloadAndCache(from url: URL) async throws -> Image? {
        if let cached = cache[url] {
            switch cached {
            case .ready(let image):
                return image
            case .inProgress(let handle):
                return try await handle.get()
            }
        }
        let handle = async {
            try await downloadImage(from: url)
        }
        cache[url] = .inProgress(handle)
        do {
            let image = try await handle.get()
            cache[url] = .ready(image)
            return image
        } catch {
            cache[url] = nil
            throw error
        }
    }
}

The ImageDownloader actor in above example downloads and caches the image and while it’s downloading an image. The actor will be suspended while it’s downloding the image but another client can reenter the downloadAndCache method and download the same image. Above code prevents duplicate requests and reuses existing request to serve multiple concurrent clients.

Actor Isolation

The actors in Swift prevent invoking methods directly but you can annotate methods with nonisolated if you need to call them directly but those methods cannot mutate state, e.g.

actor Account {
    let id: String
    var balance: Double = 0
    init(id: String) {
        self.id = id
    }
}
extension Account: Hashable {
    nonisolated func hash(into hasher: inout Hasher) {
        hasher.combine(id)
   }
   static func == (lhs: Account, rhs: Account) -> Bool {
        return lhs.id == rhs.id
   }
}

Sendable

The actors requires that any data structure used in its internal state are thread safe and implement Sendable protocol such as:

  • Value types
  • Actors
  • Immutable classes
  • Synchronized classes
  • @Sendable Functions
struct Book: Sendable {
    var title: String
    var authors: [Author]
}

@MainActor

The UI apps require that all UI updates are performed on the main thread and previously you had to dispatch UI work to DispatchQueue.main queue. Swift now allows marking functions, classes or structs with a special annotations of @MainActor where the functions are automatically executed on the main thread, e.g.

@MainActor func checkedOut(_ books: [Book]) {
  booksView.checkedOutBooks = books
}
...
await checkedOut(booksOnLoan)

Following example shows how a view-controller can be annotated with the @MainActor annotations:

@MainActor class MyViewController: UIViewController {
  func onPress() .... // implicitly on main-thread
  nonisolated func fetch() async {
    ...

In above example, all methods for MyViewController are executed on the main thread, however you can exclude certain methods via nonisolated keyword.

@globalActor

The @globalActor annotation defines a singleton global actor and @MainActor is a kind of global actor. You can also define your own global actor such as:

@globalActor
public struct GlobalSettings {
  public actor SettingsActor {
     func rememberPassword() -> Bool {
        return UserDefaults.standard.bool(forKey: "rememberPassword")
     }
  }

  public static let shared = SettingsActor()
}

...
let rememberPassword = await GlobalSettings.shared.rememberPassword()

Message Pattern Matching

As actors in Swift use methods to invoke operations on actor, they don’t support pattern matching similar to Erlang/Elixir, which offer selecting next message to process by comparing one or more fields in the message.

Local only

Unlike actors in Erlang or Elixir, actors in Swift can only communicate with other actors in the same process or application and they don’t support distributed communication to remote actors.

Actor Executor/Dispatcher

The actor protocol defines following property to access the executor:

var unownedExecutor: UnownedSerialExecutor

However, unownedExecutor is a read-only property that cannot be changed at this time.

Implementing WebCrawler Using Actors and Tasks

Following example shows implementation of WebCrawler using actors and tasks described in Part I of the concurrency series:

import Foundation
actor CrawlActor {
    public func handle(_ request: Request) async throws -> Int {
	let contents = try await download(request.url)
	let newContents = try await jsrender(request.url, contents)
  	if hasContentsChanged(request.url, newContents) && !isSpam(request.url, newContents) {
    	    try await index(request.url, newContents)
    	    let urls = try await parseURLs(request.url, newContents)
    	    let childURLs = try await doCrawlWithActors(urls: urls, depth: request.depth + 1, deadline: request.deadline)
    	    return childURLs + 1
  	} else {
    	    return 0
  	}
    }
}

Above implementation uses actors for processing crawling requests but it shares other code for parsing and downloading web pages. As an actor provides a serialize access to its state and behavior, you can’t use a single actor to implement a highly concurrent web crawler. Instead, you may divide the web domain that needs to be crawled into a pool of actors that can share the work.

Performance Comparison

Following table from Part-IV summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

You can download full code for Swift example from https://github.com/bhatti/concurency-katas/tree/main/swift.

Overall, Swift’s new features for structured concurrency including async/await and actors is a welcome addition to its platform. On the downside, Swift concurrency APIs lack support for timeouts, customized dispatcher/executors and micro benchmarks showed higher overhead than expected. However, on the positive side, the Swift runtime catches errors due to data races and the new async/await/actors syntax prevents bugs that were previously caused by incorrect use of completion handlers and error handling. This will help developers write more robust and responsive apps in the future.

Powered by WordPress