Shahzad Bhatti Welcome to my ramblings and rants!

October 29, 2020

Structured Concurrency in modern programming languages – Part-I

Filed under: Computing,Languages,Technology — admin @ 11:01 pm

Herb Sutter wrote about fifteen years ago how free performance lunch is over and you need to leverage concurrency to build high performance applications on modern multi-core machines. Unfortunately, adding concurrency support is not simple and low-level concurrency primitives in many languages can lead to buggy code with potential deadlocks and concurrent code can be hard to understand. Over the last few years, a number of programming languages have been improving support for concurrency and in this series of blogs (Part-II, Part-III, Part-IV), I will review some of programming languages that I have used in my current or past project such as Typescript, Elixir, Erlang, GO, Kotlin, and Rust. In particular, I will examine how these languages support structured concurrency so that concurrent code looks like sequential code and can be reasoned easily. Strictly speaking, concurrency relates to application behavior when modern operating systems use context switching to interleave execution of multiple tasks, whereas parallelism allow those tasks to be executed simultaneously. I will evaluate concurrency support in the context of multi-core hardware where we can guarantee correct behavior with preemptive/collaborative based multi-tasking and gain parallelism by utilizing multiple cores. The parallelism across multiple machines or distributed computing will be out of scope for this discussion.

Pitfalls with Concurrency

Before diving into the structured concurrency support, let’s review a few barriers that make writing concurrent code hard such as:

The control and data flow

The sequential code is easier to understand because you can predict the order of execution and though compilers or runtime environments may optimize that code with slightly different order but the top-down structure would remain intact. As opposed, concurrent code using threads/executors is disconnected from the main control and data flow that makes composition, error handling, timeout or cancellation in asynchronous code much harder. In addition, concurrent code requires coordination between threads with some overhead and requires synchronization to prevent data races that is hard to get right resulting in obscure and brittle code.

Race conditions

The race conditions is caused when the application behavior is determined by timing and interleaving of execution steps by multiple threads. The race conditions cause faulty behavior when the shared state or critical section is not properly guarded in a multi-threaded environment. You can eliminate race conditions by removing the shared state, using immutable objects or protecting critical sections using synchronization, mutex or locks.

Mutual Exclusion

The low-level locking primitives such as mutex, read/write/re-entrant locks are difficult to work with and add considerable complexity to your code. The buggy or incorrect implementation can lead to starvation, deadlocks or faulty behavior. Some libraries provide lock-free or concurrent data structures using atomic compare-and-swap (CAS) but they can still prone to contention when accessing from multiple threads.

Deadlocks

You may need to use locks to protect critical sections of the code and avoid race condition but incorrect implementation can lead to deadlocks where a threads can’t make any progress because it’s waiting for the resource held by another thread. In addition, the concurrent code may experience starvation when a thread can’t make a progress or livelock when multiple threads are stepping on each other. In order to circumvent deadlocks and livelocks, you can avoid nested locks, reduce number of locks and reduce scope of critical section. Instead, you can use re-entrant lock or fair locks to favor thread waiting for the longest time.

Callback Hell

A common pattern in many languages when calling an asynchronous method is to pass a callback function or lambda that is invoked when background task is completed. However, this structure devolves into complete mess when multiple asynchronous methods are chained, e.g.

class Crawler {
    crawl(url) {
        download(url, (contents) => {
            render(url, contents, (rendered) => {
                index(url, contents, (data) => { // index could have been running in parallel to parse
                    parse(contents, (urls) => {
                        urls.forEach((u) => crawl(u))
                    } // parse
                }) // index
            }) // render
        }) // download
    }

    download(url, cb) {

        ....
            cb(result)
    }

    render(url, contents, cb) {
        ....
            cb(result)
    }

    index(url, contents, cb) {
        ....
            cb(result)
    }

    parse(url, contents, cb) {
        ....
            cb(result)
    }
}

As you can see, the callback pattern quickly divulges into unwieldy mess and it’s hard to manage the results and error handling from within nested scope. Each callback is called upon completion of previous operation and you can’t use these callbacks for concurrent operations easily.

Promise Hell

Another common pattern for invoking an asynchronous method is to use promise or future objects such as:

class Crawler {
    crawl(url) {
        return download(url)
            .then((contents) =>
                render(url, contents)
            ).then((rendered) => {
                const indexPromise = index(url, contents)
                const parsePromise = parse(url, contents)
                return Promise.all([indexPromise, parsePromise]) // run both tasks in parallel
            }).then([indexResult, urls] => {
                return parse(contents, (urls) => {
                    urls.forEach((u) => crawl(u))
                } // parse
            }).catch((e) => {
                // error handling
            })
        }) // download
    }

    download(url) {
        return new Promise((resolve, reject) => {
            // async operation
            // ....
            if (ok) {
                resolve(result)
            } else {
                reject(new Error('failed'))
            }
        })
    }

    render(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    index(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    parse(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }
}

Though, promise model is a bit improved and you can manage concurrent tasks better using Promise.all but you still have to nest operations and error handling requires two separate ways to catch errors (using catch blocks and native try/catch). Another gotcha in dynamic languages such as Javascript is forgetting return in the last statement of promise.

Error Handling

Above examples show that error handling in asynchronous code is tricky and error prone. For example, when using callbacks, you can pass two callback methods, one for valid results and another for error but the nested scope makes it hard to handle these errors. Similarly, when using catch blocks in promises, it’s not clear which operation failed and adds substantial complexity if you need to recover some errors or perform an alternate operation based on conditional logic. You also have to combine promise specific catch blocks with normal try/catch blocks and it’s easy to miss proper error checking.

Cancellation and Timeout

As asynchronous code is run in a separate thread of execution, the cancelling or timing out requires some coordination between threads and it can be hard to implement in absence of library or language support. For example, a thread in expensive computation or database query can’t be cancelled if it’s blocking until that operation is completed. Some libraries support APIs to stop threads but that can leave process in unpredictable state, other libraries use signals to notify threads about termination. In order to properly cancel, you need non-blocking and cooperative model where the detached task checks for cancellation request periodically. Optimally, cancellation needs to cancel underlying operation so that application state remains consistent. Timeout is just an extension of cancellation behavior where asynchronous task is cancelled if it’s not completed within a specified time bound.

Debugging/Stack-traces

Asynchronous code makes it hard to see the call graph or stack traces from caller’s perspective due to execution in a separate thread. For example, you may see following stack trace in case of a database error on NodeJS where root-cause is not easily apparent:

at new QueryResultError (node_modules/pg-promise/lib/errors/queryResult.js:122:24)
at Query.ctx.db.client.query (node_modules/pg-promise/lib/query.js:192:41)
at Query.handleReadyForQuery (node_modules/pg/lib/query.js:126:10)
at Connection.<anonymous> (node_modules/pg/lib/client.js:163:19)
at Socket.<anonymous> (node_modules/pg/lib/connection.js:118:12)
at addChunk (_stream_readable.js:288:12)
at readableAddChunk (_stream_readable.js:269:11)
at Socket.Readable.push (_stream_readable.js:224:10)
at TCP.onStreamRead [as onread] (internal/stream_base_commons.js:94:17)

Out of scope

Note: you may need to handle other concerns such as retries with exponential back-off, circuit-breakers, or idempotence behavior with asynchronous code in distributed systems but it won’t be discussed here.

Concurrency Constructs

I have discussed some of concurrency primitives in my old blogs [1634, 1638, 1621] but following are common constructs or building blocks that are used for implementing concurrency support:

Threads

A thread defines a smallest unit of execution and multi-threading allows executing concurrent operations. There are two types of threads:

Native/OS-Thread

The native threads are tied with kernel threads and are scheduled using preemptive multi-tasking on modern operating systems. The operating system preempts native thread upon IO operation, wait/sleep, hardware interrupts or context switching. Native threads have high cost due to its stack size (about 256KB) and system overhead. As a result, a thread-pool of limited size is often used to share system resources for background processing.

Green/User-space Thread

The green threads use cooperative scheduling where a user-space scheduler performs context switching thus the overhead of spawning new threads is very small. The user-space schedulers use M:N model for mapping M schedulers to N native threads such as:

As green threads use cooperative scheduling, they generally require yield to allow other threads to proceed. The user-space schedulers are not suitable for blocking operations such as sleep or blocking IO. For example, Java initially supported green threads but it replaced with native threads to support preemptive multi-tasking. Similarly, earlier version of Rust used green threads with blocking IO that resulted in slow performance so it replaced green threads with native threads in later version. Thus, green threads are generally used with non-blocking IO or waits that automatically preempts the thread, saves its stack and resumes another thread. As a general rule, native threads work better if an application is CPU bound or requires real-time priority and green threads/coroutines provide better concurrency with IO bound applications.

Structured Concurrency

The structured concurrency was envisioned by Martin Sústrik to simplify writing concurrent applications. Following are building blocks of structured concurrency that remedy concurrency related issues discussed above:

Concurrency Scope

The structured concurrency defines a single entry and exit similar to top-down structured programming. It defines a concurrency scope or boundary where all asynchronous tasks must complete by the end of scope. The scope may optionally define the context or queue where the tasks will be run. This model simplifies semantics of asynchronous behavior because the lifetime of child tasks is tied with the parent scope. The parent scope automatically waits until all child tasks are completed.

Execution Context

The structured concurrency allows you to specify context, threads or queues where asynchronous code will run so that you can manage related asynchronous code and underlying resources easily.

Cancellation and Timeout

The structured concurrency provides first-class support for cancellation and timeout though it still requires that child tasks support cooperative cancellation as blocking operations cannot be easily interrupted.

Error Handling

The errors from child tasks are automatically propagated to the parent scope where they can be handled in a consistent way using language provided syntax.

Immutability or value semantics

The structured concurrency encourages use of immutable objects or pass by value semantics to avoid race condition or need for locks to protect the critical section.

Composition

The structured concurrency allows composing asynchronous code within another asynchronous function so that data and control flow can be managed easily. It allows errors to be propagated from the nested asynchronous code to calling function and it cancels all child asynchronous tasks if parent task is cancelled.

What’s missing from Structured Concurrency

The structured concurrency doesn’t specify exact paradigm of concurrency mechanism such as threads, coroutines, fibers, generators, actors, etc and instead it focuses on concurrency scope, data/control flow, error handling, cancellation/timeout and composition. Thus, it won’t solve data races if your application requires mutable shared data, which is accessed from multiple threads or coroutines concurrently so you will need to rely on synchronization primitives to protect the critical section.

Toy Web Crawler

I will use a toy implementation of a simple web crawler to show support of structured concurrency in some of my preferred languages. Following is a pseudocode of sequential version of this crawler:

MAX_DEPTH = 5
class URLRequest: 
  url, status, depth, error, created_at, started_at, completed_at

class WebCrawler:
  def crawl_all(root_urls):
	# priority such as page-rank
	pq = PriorityQueue()
	# track duplicate urls
	visited = Set()
	# add root urls to queue
	for url in root_urls:
		pq.add(URLRequest(url, pending, 0))

	# crawl urls using BFS
	total = 0
	while not pq.isEmpty():
		total+=1
		request = pq.pop()
		visited.add(request.url)
		urls = crawl(request)
		for url in urls:
			if not visited.contains(url) and 
            not is_spam(url) and request.depth+1 < MAX_DEPTH:
				pq.add(URLRequest(url, pending, request.depth+1))
	# all done
	print total

  # download, parse and index given url in the request
  def crawl(request):
	urls = []
	try:
		rqeuest.started_at = Date()
		contents = download(request.url)
		contents = jsrender(request.url, contents)
		if has_content_changed(request.url, contents):
			index(request.url, contents)
			urls = parse_urls(request.url, contents)
		request.status = completed
	except: err
		request.status = failed
		request.error = err
	# mark request completion time
	request.completed_at = Date()
	return urls

  def download(url):
	# check robots.txt and throttle policies
 	# may need timeout for slow websites and linearize 
    # requests to the same domain to prevent denial of service attack

  def jsrender(url, contents):
 	# for SPA apps that use javascript for rendering contents
	return contents

  def index(parent_url, contents):
	# apply standardize, stem, ngram, etc for indexing

  def parse_urls(parent_url, contents):
	# tokenize contents and extract href/image/script urls
	return urls

  def is_spam(url):
	# check spam or low-quality urls
	return false

  def has_content_changed(url, contents):
	# calculate hash digest and compare it with last digest
	return true

Above example defines crawl_all method to crawl list of root-urls that recursively invokes crawl method using breadth-first-search. The crawl method invokes stub-out methods for downloading url, parsing contents and indexing the contents.

Typescript

Typescript/Javascript on NodeJS platform offers a unique design for managing concurrency where only a single thread processes requests from event queue. Following is high-level architecture of NodeJS:

NodeJS process uses a single thread that takes next operation to execute form the event queue and executes in an event loop. It delegates some of system calls and asynchronous code to a small thread pool and uses non-blocking API when performing disk or network IO operations. This architecture eliminates the need to synchronize shared data as there is only a single thread accessing application state at a time (similar to actor model).

Using async/await in Typescript

Following is an implementation of web crawler using async-await syntax in Typescript:

import { Request, Response } from '../types/index';

const MAX_DEPTH = 4;
const MAX_URLS = 11;
const DOMAINS = [
  'ab.com',
  'bc.com',
  'cd.com',
  'yz.com',
];

export class Crawler {
  async crawl(urls: string[], timeoutMillis: number): Promise<number> {
    // Main scope of concurrency begin
    const res = await doCrawl(urls, 0, timeoutMillis);
    return res.childURLs;
    // Main scope of concurrency end    
  }
}

///////////////// PRIVATE METHODS ////////////////
const doCrawl = async (
  urls: string[],
  depth: number,
  timeoutMillis: number
): Promise<Response> => {
  const res = new Response();
  if (depth >= MAX_DEPTH) {
    res.failed('max-depth');
    return res;
  }
  const requests = urls.map((u) => new Request(u, depth, timeoutMillis));
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

  const childURLs : number = results.reduce((total: number, r: Response) => total + r.childURLs, 0);
  res.succeeded(childURLs);
  return res;
};


const handleCrawl = async (req: Request): Promise<Response> => {
  const res = new Response();
  const contents = await download(req.url);
  const newContents = await jsrender(req.url, contents);
  if (
    hasContentsChanged(req.url, newContents) &&
    !isSpam(req.url, newContents)
  ) {
    await index(req.url, newContents);
    const urls = await parseURLs(req.url, newContents);
    const childResp = await doCrawl(urls, req.depth + 1, req.timeoutMillis);
    res.succeeded(childResp.childURLs + 1);
  } else {
    res.failed("contents didn't change");
  }
  return res;
};

const download = async (url: string): Promise<string> => {
  // TODO check robots.txt and throttle policies
  // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
  return randomString(100);
};

const jsrender = async (url: string, contents: string): Promise<string> => {
  // for SPA apps that use javascript for rendering contents
  return contents;
};

const index = async (url: string, contents: string) => {
  // apply standardize, stem, ngram, etc for indexing
};

const parseURLs = (url: string, contents: string): string[] => {
  // tokenize contents and extract href/image/script urls
  const urls = [];
  for (var i = 0; i < MAX_URLS; i++) {
    urls.push(randomUrl());
  }
  return urls;
};

const hasContentsChanged = (url: string, contents: string): boolean => {
  return true;
};

const isSpam = (url: string, contents: string): boolean => {
  return false;
};

const randomUrl = (): string => {
  const i = Math.floor(Math.random() * DOMAINS.length);
  return 'https://' + DOMAINS[i] + '/' + randomString(20);
};

const randomString = (n: number): string => {
  let letters =
    'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
  let text = '';
  for (let i = 0; i < n; i++) {
    text += letters.charAt(Math.floor(Math.random() * letters.length));
  }
  return text;
};

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

The async-await syntax in Typescript is a syntactic sugar on top of promises. The async function returns a promise and automatically wraps return value with Promise.

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following is a unit test for testing the behavior of async/await based crawler:

import { Crawler } from '../lib/index';
import { expect } from 'chai';

const EXPECTED_URLS = 19032;
const ROOT_URLS = [
  'a.com',
  'b.com',
  'c.com',
  'd.com',
  'e.com',
  'f.com',
  'g.com',
  'h.com',
  'i.com',
  'j.com',
  'k.com',
  'l.com',
  'n.com',
];

describe('crawler', async () => {
  it('crawling urls with nesting', async () => {
    const started = new Date().getTime();
    const timeout = 5000;
    const crawler = new Crawler();
    const res = await crawler.crawl(ROOT_URLS, timeout);
    const elapsed = new Date().getTime() - started;
    console.log(`Crawl took ${elapsed} to process ${res}`);
    expect(res).equal(EXPECTED_URLS);
  });

});

Following are some of the concurrency support in Typescript:

  • Concurrency scope – Though, typescript doesn’t support concurrency scope as first class citizen or an option to specify queue/context but you can manage boundary using await in the main method.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling – Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • As NodeJS uses a single thread in an event loop, you don’t need to worry about shared state being updated in multiple threads.

Following are the major shortcomings in Typescript for its support of structured concurrency:

  • Typescript doesn’t support value semantics and objects are passed by reference except primitive types.
  • As NodeJS uses a single thread for even loop with small thread pool for asynchronous operation, it limits the tasks that you can run in parallel on multi-core hardware.
  • Typescript doesn’t support cancellation and timeout natively so you have to rely on cooperative cancellation. You can implement timeout partially using Promise.race but it’s not a reliable way to handle timeouts, e.g.
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

You can download full code for Typescript example from https://github.com/bhatti/concurency-katas/tree/main/node.

Overall, Typescript/NodeJS is suitable for IO-bound applications where a little time is spent on each operation so that event-loop can switch to next task but it’s not suitable when the application has blocking/CPU-bound operations, requires more background tasks or requires high level of concurrency and parallelism.

No Comments

No comments yet.

RSS feed for comments on this post. TrackBack URL

Sorry, the comment form is closed at this time.

Powered by WordPress