Shahzad Bhatti Welcome to my ramblings and rants!

November 17, 2020

Structured Concurrency in modern programming languages – Part-IV

Filed under: Computing,Kotlin,Languages,Swift,Technology — admin @ 12:58 pm

In this fourth part of the series on structured concurrency (Part-I, Part-II, Part-III), I will review Kotlin and Swift languages for writing concurrent applications and their support for structured concurrency:

Kotlin

Kotlin is a JVM language that was created by JetBrains with improved support of functional and object-oriented features such as extension functions, nested functions, data classes, lambda syntax, etc. Kotlin also uses optional types instead of null references similar to Rust and Swift to remove null-pointer errors. Kotlin provides native OS-threads similar to Java and coroutines with async/await syntax similar to Rust. Kotlin brings first-class support for structured concurrency with its support for concurrency scope, composition, error handling, timeout/cancellation and context for coroutines.

Structured Concurrency in Kotlin

Kotlin provides following primitives for concurrency support:

suspend functions

Kotlin adds suspend keyword to annotate a function that will be used by coroutine and it automatically adds continuation behavior when code is compiled so that instead of return value, it calls the continuation callback.

Launching coroutines

A coroutine can be launched using launch, async or runBlocking, which defines scope for structured concurrency. The lifetime of children coroutines is attached to this scope that can be used to cancel children coroutines. The async returns a Deferred (future) object that extends Job. You use await method on the Deferred instance to get the results.

Dispatcher

Kotlin defines CoroutineDispatcher to determine thread(s) for running the coroutine. Kotlin provides three types of dispatchers: Default – that are used for long-running asynchronous tasks; IO – that may use IO/network; and Main – that uses main thread (e.g. on Android UI).

Channels

Kotlin uses channels for communication between coroutines. It defines three types of channels: SendChannel, ReceiveChannel, and Channel. The channels can be rendezvous, buffered, unlimited or conflated where rendezvous channels and buffered channels behave like GO’s channels and suspend send or receive operation if other go-routine is not ready or buffer is full. The unlimited channel behave like queue and conflated channel overwrites previous value when new value is sent. The producer can close send channel to indicate end of work.

Using async/await in Kotlin

Following code shows how to use async/await to build the toy web crawler:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerWithAsync(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls using async/await
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()

        withTimeout(timeout) {
            val jobs = mutableListOf<Deferred<Int>>()
            for (u in urls) {
                jobs.add(async {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                })
            }
            for (j in jobs) {
                j.await()
            }
        }
        return res.completed(size.get())
    }
}

In above example, CrawlerWithAsync class defines timeout parameter for crawler. The crawl function takes list of URLs to crawl and defines high-level scope of concurrency using runBlocking. The private crawl method is defined as suspend so that it can be used as continuation. It uses async with timeout to start background tasks and uses await to collect results. This method recursively calls handleCrawl to crawl child URLs.

Following unit tests show how to test above crawl method:

package concurrency

import org.junit.Test
import org.slf4j.LoggerFactory
import kotlin.test.assertEquals

class CrawlerAsynTest {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutinesTest::class.java)
    val urls = listOf("a.com", "b.com", "c.com", "d.com", "e.com", "f.com",
            "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com")

    @Test
    fun testCrawl() {
        val crawler = CrawlerWithAsync(4, 1000L)
        val started = System.currentTimeMillis()
        val res = crawler.crawl(urls);
        val duration = System.currentTimeMillis() - started
        logger.info("CrawlerAsync - crawled %d urls in %d milliseconds".format(res.childURLs, duration))
        assertEquals(19032, res.childURLs)
    }

    @Test(expected = Exception::class)
    fun testCrawlWithTimeout() {
        val crawler = CrawlerWithAsync(1000, 100L)
        crawler.crawl(urls);
    }
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/kot_pool.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • Kotlin supports cancellation and timeout APIs and the crawl method will fail with timeout error if crawling exceeds the time limit.
  • The crawl method captures error from async response and returns so that client code can perform error handling.
  • The async syntax in Kotlin allows easy composition of asynchronous code.
  • Kotlin allows customized dispatcher for more control on the asynchronous behavior.

Following are shortcomings using this approach for structured concurrency and general design:

  • As Kotlin doesn’t enforce immutability by default, you will need synchronization to protect shared state.
  • Async/Await support is still new in Kotlin and lacks stability and proper documentation.
  • Above design creates a new coroutine for crawling each URL and it can strain expensive network and IO resources so it’s not practical for real-world implementation.

Using coroutines in Kotlin

Following code uses coroutine syntax to implement the web crawler:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerWithCoroutines(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls using coroutines
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()
        withTimeout(timeout) {
            for (u in urls) {
                coroutineScope {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                }
            }
        }
        return res.completed(size.get())
    }
}

Above example is similar to async/await but uses coroutine syntax and its behavior is similar to async/await implementation.

Following example shows how async coroutines can be cancelled:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerCancelable(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls to show cancel operation
    // internal method will call cancel instead of await so this method will
    // fail.
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    ////////////////// Internal methods
    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()

        withTimeout(timeout) {
            val jobs = mutableListOf<Deferred<Int>>()
            for (u in urls) {
                jobs.add(async {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                })
            }
            for (j in jobs) {
                j.cancel()
            }
        }
        return res.completed(size.get())
    }
}

You can download above examples from https://github.com/bhatti/concurency-katas/tree/main/kot_pool.

Swift

Swift was developed by Apple to replace Objective-C and offer modern features such as closures, optionals instead of null-pointers (similar to Rust and Kotlin), optionals chaining, guards, value types, generics, protocols, algebraic data types, etc. It uses same runtime system as Objective-C and uses automatic-reference-counting (ARC) for memory management, grand-central-dispatch for concurrency and provides integration with Objective-C code and libraries.

Structured Concurrency in Swift

I discussed concurrency support in Objective-C in my old blog [1685] such as NSThread, NSOperationQueue, Grand Central Dispatch (GCD), etc and since then GCD has improved launching asynchronous tasks using background queues with timeout/cancellation support. However, much of the Objective-C and Swift code still suffers from callbacks and promises hell discussed in Part-I. Chris Lattner and Joe Groff wrote a proposal to add async/await and actor-model to Swift and provide first-class support for structured concurrency. As this work is still in progress, I wasn’t able to test it but here are major features of this proposal:

Coroutines

Swift will adapt coroutines as building blocks of concurrency and asynchronous code. It will add syntactic sugar for completion handlers using async or yield keywords.

Async/Await

Swift will provide async/await syntactic sugar on top of coroutines to mark asynchronous behavior. The async code will use continuations similar to Kotlin so that it suspends itself and schedules execution by controlling context. It will use Futures (similar to Deferred in Kotlin) to await for the results (or errors). This syntax will work with normal error handling in Swift so that errors from asynchronous code are automatically propagated to the calling function.

Actor model

Swift will adopt actor-model with value based messages (copy-on-write) to manage concurrent objects that can receive messages asynchronously and the actor can keep internal state and eliminate race conditions.

Kotlin and Swift are very similar in design and both have first-class support of structured concurrency such as concurrency scope, composition, error handling, cancellation/timeout, value types, etc. Both Kotlin and Swift use continuation for async behavior so that async keyword suspends the execution and passes control to the execution context so that it can be executed asynchronously and control is passed back at the end of execution.

Structured Concurrency Comparison

Following table summarizes support of structured concurrency discussed in this blog series:

FeatureTypescript (NodeJS)ErlangElixirGORustKotlinSwift
Structured scopeBuilt-inmanuallymanuallymanuallyBuilt-inBuilt-inBuilt-in
Asynchronous CompositionYesNoNoNoYesYesYes
Error HandlingNatively using ExceptionsManually storing errors in ResponseManually storing errors in ResponseManually storing errors in ResponseManually using Result ADTNatively using ExceptionsNatively using Exceptions
CancellationCooperative CancellationBuilt-in Termination or Cooperative CancellationBuilt-in Termination or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative Cancellation
TimeoutNoYesYesYesYesYesYes
Customized Execution ContextNoNoNoNoNoYesYes
Race ConditionsNo due to NodeJS architectureNo due to Actor modelNo due to Actor modelPossible due to shared stateNo due to strong ownershipPossible due to shared statePossible due to shared state
Value TypesNoYesYesYesYesYesYes
Concurrency paradigmsEvent loopActor modelActor modelGo-routine, CSP channelsOS-Thread, coroutineOS-Thread, coroutine, CSP channelsOS-Thread,
GCD queues, coroutine, Actor model
Type CheckingStaticDynamicDynamicStatic but lacks genericsStrongly static types with genericsStrongly static types with genericsStrongly static types with generics
Suspends Async code using Continuations NoNoNoNoYesYesYes
Zero-cost based abstraction ( async)NoNoNoNoYesNoNo
Memory ManagementGCGC
(process-scoped)
GC (process-scoped)GC(Automated) Reference counting, BoxingGCAutomated reference counting

Performance Comparison

Following table summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

Summary

Overall, Typescript/NodeJS provides a simpler model for concurrency but lacks proper timeout/cancellation support and it’s not suitable for highly concurrent applications that require blocking APIs. The actor based concurrency model in Erlang/Elixir supports high-level of concurrency, error handling, cancellation and isolates process state to prevent race conditions but it lacks composing asynchronous behavior natively. Though, you can compose Erlang processes with parent-child hierarchy and easily start or stop these processes. GO supports concurrency via go-routines and channels with built-in cancellation and timeout APIs but GO statements are considered harmful by structured concurrency and it doesn’t protect against race conditions due to mutable state. Erlang and GO are the only languages that were designed from ground-up with support for actors and coroutines and their schedulers have strong support for asynchronous IO and non-blocking APIs. Erlang also offers process-scoped garbage collection to clean up related data easily as opposed to global GC in other languages. The async/await support in Rust is still immature and lacks proper support of cancellation but strong ownership properties of Rust eliminate race conditions and allows safe concurrency. Rust, Kotlin and Swift uses continuation for async/await that allows composition for multiple async/await chained together. For example, instead of using await (await download()).parse() in Javascript/Typescript/C#, you can use await download().parse(). The async/await changes are still new in Kotlin and lack stability whereas Swift has not yet released these changes as part of official release. As Kotlin, Rust, and Swift built coroutines or async/await on top of existing runtime and virtual machine, their green-thread schedulers are not as optimal as schedulers in Erlang or GO and may exhibit limitations on concurrency and scalability.

Finally, structured concurrency helps your code structure with improved data/control flow, concurrency scope, error handling, cancellation/timeout and composition but it won’t solve data races if multiple threads/coroutines access mutable shared data concurrently so you will need to rely on synchronization mechanisms to protect the critical section.

Note: You can download all examples in this series from https://github.com/bhatti/concurency-katas.

November 10, 2020

Structured Concurrency in modern programming languages – Part-III

Filed under: Computing,Languages,Technology — admin @ 4:23 pm

In this third part of the series on structured concurrency (Part-I, Part-II, Part-IV), I will review GO and Rust languages for writing concurrent applications and their support for structured concurrency:

GO

GO language was created by Rob Pike, and Ken Thompson and uses light-weight go-routines for asynchronous processing. Go uses channels for communication that are designed after Tony Hoare’s rendezvous style communicating sequential processes (CSP) where the sender cannot send the message until receiver is ready to accept it. Though, GO supports buffering for channels so that sender/receiver don’t have to wait if buffer is available but channels are not designed to be used as mailbox or message queue. The channels can be shared by multiple go-routines and the messages can be transmitted by value or by reference. GO doesn’t protect against race conditions and shared state must be protected when it’s accessed in multiple go-routines. Also, if a go-routine receives a message by reference, it must be treated as transfer of ownership otherwise it can lead to race conditions. Also, unlike Erlang, you cannot monitor lifetime of other go-routines so you won’t be notified if a go-routine exits unexpectedly.

Following is high-level architecture of scheduling and go-routines in GO process:

Using go-routines/channels in GO

GO doesn’t support async/await syntax but it can be simulated via go-routine and channels. As the cost of each go-routine is very small, you can use them for each background task.

Following code shows how to use go-routines and channels to build the toy web crawler:

package async

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/google/uuid"
)

type AsyncHandler func(ctx context.Context, payload interface{}) (interface{}, error)

type AsyncAwaiter interface {
	Await(ctx context.Context, timeout time.Duration) (interface{}, error)
}

// Future encapsulates request to process asynchronously
type Future struct {
	id      string
	payload interface{}
	outQ    chan Result
}

// Result encapsulates results
type Result struct {
	id      string
	payload interface{}
	err     error
}

// AsyncTask - processes data asynchronously
type AsyncTask struct {
	handler AsyncHandler
}

func New(handler AsyncHandler) *AsyncTask {
	async := &AsyncTask{handler: handler}
	return async
}

func (a *AsyncTask) Async(ctx context.Context, payload interface{}) AsyncAwaiter {
	future := &Future{id: uuid.New().String(), payload: payload, outQ: make(chan Result, 1)}
	go future.run(ctx, a.handler) // run handler asynchronously
	return future
}

func (f Future) run(ctx context.Context, handler AsyncHandler) {
	go func() {
		payload, err := handler(ctx, f.payload)
		f.outQ <- Result{id: f.id, payload: payload, err: err} // out channel is buffered by 1
		close(f.outQ)
	}()
}

func (f Future) Await(ctx context.Context, timeout time.Duration) (payload interface{}, err error) {
	payload = nil
	select {
	case <-ctx.Done():
		err = errors.New("async_cancelled")
	case res := <-f.outQ:
		payload = res.payload
		err = res.err
	case <-time.After(timeout):
		err = errors.New(fmt.Sprintf("async_timedout %v", timeout))
	}

	return
}

In above example, Async method takes a function to invoke in background and creates a channel for reply. It then executes the function and sends back reply to the channel. The client uses the future object return by Async method to wait for the response. The Await method provides timeout to specify the max wait time for response. Note: The Await method listens to ctx.Done() in addition to the response channel that notifies it if client canceled the task or if it timed out by high-level settings.

Following code shows how crawler can use these primitives to define background tasks for crawler:

package crawler

import (
	"context"
	"errors"
	"sync/atomic"
	"time"

	"plexobject.com/crawler/async"
	"plexobject.com/crawler/domain"
	"plexobject.com/crawler/utils"
)

const MAX_DEPTH = 4
const MAX_URLS = 11

// Crawler is used for crawing URLs
type Crawler struct {
	crawlTask     *async.AsyncTask
	downloader    *async.AsyncTask
	jsrenderer    *async.AsyncTask
	indexer       *async.AsyncTask
	totalMessages uint64
}

// Instantiates new crawler
func New(ctx context.Context) *Crawler {
	crawler := &Crawler{totalMessages: 0}
	crawlHandler := func(ctx context.Context, payload interface{}) (interface{}, error) {
		req := payload.(*domain.Request)
		return crawler.handleCrawl(ctx, req)
	}
	downloader := func(ctx context.Context, payload interface{}) (interface{}, error) {
		// TODO check robots.txt and throttle policies
		// TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
		return utils.RandomString(100), nil
	}
	renderer := func(ctx context.Context, payload interface{}) (interface{}, error) {
		// for SPA apps that use javascript for rendering contents
		return utils.RandomString(100), nil
	}
	indexer := func(ctx context.Context, payload interface{}) (interface{}, error) {
		return 0, nil
	}
	crawler.crawlTask = async.New(crawlHandler)
	crawler.downloader = async.New(downloader)
	crawler.jsrenderer = async.New(renderer)
	crawler.indexer = async.New(indexer)
	return crawler
}

// Crawls list of URLs with specified depth
func (c *Crawler) Crawl(ctx context.Context, urls []string, timeout time.Duration) (int, error) {
	// Boundary for concurrency and it will not return until all
	// child URLs are crawled up to MAX_DEPTH limit.
	return c.crawl(ctx, urls, 0, timeout)
}

func (c *Crawler) TotalMessages() uint64 {
	return c.totalMessages
}

// handles crawl
func (c *Crawler) handleCrawl(ctx context.Context, req *domain.Request) (*domain.Result, error) {
	atomic.AddUint64(&c.totalMessages, 1)
	timeout := time.Duration(req.Timeout * time.Second)
	res := domain.NewResult(req)
	if contents, err := c.downloader.Async(ctx, req.URL).Await(ctx, timeout); err != nil {
		res.Failed(err)
	} else {
		if newContents, err := c.jsrenderer.Async(ctx, [...]string{req.URL, contents.(string)}).Await(ctx, timeout); err != nil {
			res.Failed(err)
		} else {
			if hasContentsChanged(ctx, req.URL, newContents.(string)) && !isSpam(ctx, req.URL, newContents.(string)) {
				c.indexer.Async(ctx, [...]string{req.URL, newContents.(string)}).Await(ctx, timeout)
				urls := parseURLs(ctx, req.URL, newContents.(string))
				if childURLs, err := c.crawl(ctx, urls, req.Depth+1, req.Timeout); err != nil {
					res.Failed(err)
				} else {
					res.Succeeded(childURLs + 1)
				}
			} else {
				res.Failed(errors.New("contents didn't change"))
			}
		}
	}

	return res, nil
}

/////////////////// Internal private methods ///////////////////////////
// Crawls list of URLs with specified depth
func (c *Crawler) crawl(ctx context.Context, urls []string, depth int, timeout time.Duration) (int, error) {
	if depth < MAX_DEPTH {
		futures := make([]async.AsyncAwaiter, 0)
		for i := 0; i < len(urls); i++ {
			futures = append(futures, c.crawlTask.Async(ctx, domain.NewRequest(urls[i], depth, timeout)))
		}
		sum := 0
		var savedError error
		for i := 0; i < len(futures); i++ {
			res, err := futures[i].Await(ctx, timeout)
			if err != nil {
				savedError = err // returning only a single error
			}
			if res != nil {
				sum += res.(*domain.Result).ChildURLs
			}
		}

		return sum, savedError
	} else {
		return 0, nil
	}
}

func parseURLs(ctx context.Context, url string, contents string) []string {
	// tokenize contents and extract href/image/script urls
	urls := make([]string, 0)
	for i := 0; i < MAX_URLS; i++ {
		urls = append(urls, utils.RandomChildUrl(url))
	}
	return urls
}

func hasContentsChanged(ctx context.Context, url string, contents string) bool {
	return true
}

func isSpam(ctx context.Context, url string, contents string) bool {
	return false
}

In above implementation, crawler defines background tasks for crawling, downloading, rendering and indexing. The Crawl defines concurrency boundary and waits until all child tasks are completed. Go provides first class support for cancellation and timeout via context.Context, but you have to listen special ctx.Done() channel.

Following unit tests show examples of cancellation, timeout and normal processing:

package crawler

import (
	"context"
	"log"
	"testing"
	"time"
)

const EXPECTED_URLS = 19032

func TestCrawl(t *testing.T) {
	rootUrls := []string{"https://a.com", "https://b.com", "https://c.com", "https://d.com", "https://e.com", "https://f.com", "https://g.com", "https://h.com", "https://i.com", "https://j.com", "https://k.com", "https://l.com", "https://n.com"}
	started := time.Now()
	timeout := time.Duration(8 * time.Second)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	crawler := New(ctx)
	received, err := crawler.Crawl(ctx, rootUrls, timeout)
	elapsed := time.Since(started)
	log.Printf("Crawl took %s to process %v messages -- %v", elapsed, received, crawler.TotalMessages())
	if crawler.totalMessages != EXPECTED_URLS {
		t.Errorf("Expected %v urls but was %v", EXPECTED_URLS, crawler.totalMessages)
	}
	if err != nil {
		t.Errorf("Unexpected error %v", err)
	} else if EXPECTED_URLS != received {
		t.Errorf("Expected %v urls but was %v", EXPECTED_URLS, received)
	}
}

func TestCrawlWithTimeout(t *testing.T) {
	started := time.Now()
	timeout := time.Duration(4 * time.Millisecond)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	crawler := New(ctx)
	received, err := crawler.Crawl(ctx, []string{"a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"}, timeout)
	if err == nil {
		t.Errorf("Expecting timeout error")
	}
	elapsed := time.Since(started)
	log.Printf("Timedout took %s to process %v messages -- %v - %v", elapsed, received, crawler.TotalMessages(), err)
}

func TestCrawlWithCancel(t *testing.T) {
	started := time.Now()
	timeout := time.Duration(3 * time.Second)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	crawler := New(ctx)
	var err error
	var received int
	go func() {
		// calling asynchronously
		received, err = crawler.Crawl(ctx, []string{"a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"}, timeout)
	}()
	time.Sleep(5 * time.Millisecond)
	cancel()
	time.Sleep(50 * time.Millisecond)
	if err == nil {
		t.Errorf("Expecting cancel error")
	}
	elapsed := time.Since(started)
	log.Printf("Cancel took %s to process %v messages -- %v - %v", elapsed, received, crawler.TotalMessages(), err)
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/go_pool.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main Crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • Go supports cancellation and timeout APIs and the Crawl method passes timeout parameter so that the crawling all URLs must complete with the time period.
  • The Crawl method captures error from async response and returns so that client code can perform error handling.

Following are shortcomings using this approach for structured concurrency and general design:

  • You can’t monitor life-time of go-routines and you won’t get any errors if background task dies unexpectedly.
  • The cancellation API returns without cancelling underlying operation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Go doesn’t support specifying execution context for go-routines and all asynchronous code is automatically scheduled by GO (G0 go-routines).
  • GO go-routines are not easily composable because they don’t have any parent/child relationship as opposed to async methods that can invoke other async methods in Typescript, Rust or other languages supporting async/await.
  • As Go doesn’t enforce immutability so you will need mutex to protect shared state. Also, mutex implementation in GO is not re-entrant aware so you can’t use for any recursive methods where you are acquiring locks.
  • Above code creates a new go-routine for crawling each URL and though the overhead of each process is small but it may use other expensive resources such as network resource.

Using worker-pool in GO

As opposed to creating new go-routine, we can use worker-pool of go-routines to perform background tasks so that we can manage external resource dependencies easily.

Following code shows an implementation of worker-pool in GO:

package pool

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/google/uuid"
)

const BUFFER_CAPACITY = 2 // allow buffering to support asynchronous behavior  as by default sender will be blocked

type Handler func(ctx context.Context, payload interface{}) (interface{}, error)

type Awaiter interface {
	Await(ctx context.Context, timeout time.Duration) (interface{}, error)
}

// Request encapsulates request to process
type Request struct {
	id      string
	payload interface{}
	outQ    chan Result
}

// Result encapsulates results
type Result struct {
	id      string
	payload interface{}
	err     error
}

// Worker structure defines inbound channel to receive request and lambda function to execute
type Worker struct {
	id                   int
	handler              Handler
	workerRequestChannel chan *Request
}

// NewWorker creates new worker
func NewWorker(id int, handler Handler) Worker {
	return Worker{
		id:                   id,
		handler:              handler,
		workerRequestChannel: make(chan *Request),
	}
}

func (w Worker) start(ctx context.Context, workersReadyPool chan chan *Request, done chan bool) {
	go func(w Worker) {
		for {
			// register the current worker into the worker queue.
			workersReadyPool <- w.workerRequestChannel

			select {
			case <-ctx.Done():
				break
			case req := <-w.workerRequestChannel:
				payload, err := w.handler(ctx, req.payload)
				req.outQ <- Result{id: req.id, payload: payload, err: err} // out channel is buffered by 1
				close(req.outQ)
			case <-done:
				return
			}
		}
	}(w)
}

// WorkPool - pool of workers
type WorkPool struct {
	size                int
	workersReadyPool    chan chan *Request
	pendingRequestQueue chan *Request
	done                chan bool
	handler             Handler
}

// New Creates new async structure
func New(handler Handler, size int) *WorkPool {
	async := &WorkPool{
		size:                size,
		workersReadyPool:    make(chan chan *Request, BUFFER_CAPACITY),
		pendingRequestQueue: make(chan *Request, BUFFER_CAPACITY),
		done:                make(chan bool),
		handler:             handler}
	return async
}

// Start - starts up workers and internal goroutine to receive requests
func (p *WorkPool) Start(ctx context.Context) {
	for w := 1; w <= p.size; w++ {
		worker := NewWorker(w, p.handler)
		worker.start(ctx, p.workersReadyPool, p.done)
	}
	go p.dispatch(ctx)
}

// Add request to process
func (p *WorkPool) Add(ctx context.Context, payload interface{}) Awaiter {
	// Adding request to process
	req := &Request{id: uuid.New().String(), payload: payload, outQ: make(chan Result, 1)}
	go func() {
		p.pendingRequestQueue <- req
	}()
	return req
}

// Await for reply -- you can only call this once
func (r Request) Await(ctx context.Context, timeout time.Duration) (payload interface{}, err error) {
	select {
	case <-ctx.Done():
		err = errors.New("async_cancelled")
	case res := <-r.outQ:
		payload = res.payload
		err = res.err
	case <-time.After(timeout):
		payload = nil
		err = fmt.Errorf("async_timedout %v", timeout)
	}

	return
}

// Stop - stops thread pool
func (p *WorkPool) Stop() {
	close(p.pendingRequestQueue)
	go func() {
		p.done <- true
	}()
}

// Receiving requests from inbound channel and forward it to the worker's workerRequestChannel
func (p *WorkPool) dispatch(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-p.done:
			return
		case req := <-p.pendingRequestQueue:
			go func(req *Request) {
				// Find next ready worker
				workerRequestChannel := <-p.workersReadyPool
				// dispatch the request to next ready worker
				workerRequestChannel <- req
			}(req)
		}
	}
}

You can download above examples from https://github.com/bhatti/concurency-katas/tree/main/go_pool.

Rust

Rust was designed by Mozilla Research to provide better performance, type safety, strong memory ownership and safe concurrency. With its strong ownership and lifetime scope, Rust minimizes race conditions because each object can only have one owner that can update the value. Further, strong typing, traits/structured-types, abstinence of null references, immutability by default eliminates most of common bugs in the code.

Rust uses OS-threads for multi-threading but has added support for coroutines and async/await recently. Rust uses futures for asynchronous behavior but unlike other languages, it doesn’t provide runtime environment for async/await. Two popular runtime systems available for Rust are https://tokio.rs/ and https://github.com/async-rs/async-std. Also, unlike other languages, async/await in Rust uses zero-cost abstraction where async just creates a future without scheduling until await is invoked. The runtime systems such as async-std and tokio provides executor that polls future until it returns a value.

Following example shows how async/await can be used to implement

extern crate rand;
use std::{error::Error, fmt};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rand::Rng;
use rand::distributions::Alphanumeric;
use rand::seq::SliceRandom;
use futures::future::{Future, join_all, BoxFuture};
use futures::stream::{FuturesUnordered};
use futures::executor;
use async_std::{task, future};
use async_std::future::timeout;

const MAX_DEPTH: u8 = 4;
const MAX_URLS: u8 = 11;

// Request encapsulates details of url to crawl
#[derive(Debug, Clone, PartialEq)]
pub struct Request {
    pub url: String,
    pub depth: u8,
    pub timeout: Duration,
    pub created_at: u128,
}

impl Request {
    pub fn new(url: String, depth: u8, timeout: Duration) -> Request {
        let epoch = SystemTime::now().duration_since(UNIX_EPOCH).expect("epoch failed").as_millis();
        Request{url: url.to_string(), depth: depth, timeout: timeout, created_at: epoch}
    }
}

#[derive(Debug, Copy, Clone)]
pub enum CrawlError {
    Unknown,
    MaxDepthReached,
    DownloadError,
    ParseError,
    IndexError,
    ContentsNotChanged,
    Timedout,
}

impl Error for CrawlError {}

impl fmt::Display for CrawlError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            CrawlError::MaxDepthReached => write!(f, "MaxDepthReached"),
            CrawlError::DownloadError => write!(f, "DownloadError"),
            CrawlError::ParseError => write!(f, "ParseError"),
            CrawlError::IndexError => write!(f, "IndexError"),
            CrawlError::ContentsNotChanged => write!(f, "ContentsNotChanged"),
            CrawlError::Timedout=> write!(f, "Timedout"),
            CrawlError::Unknown => write!(f, "Unknown"),
        }
    }
}


//////// PUBLIC METHODS
// crawling a collection of urls
pub fn crawl(urls: Vec<String>, timeout_dur: Duration) -> Result<usize, CrawlError> {
    // Boundary for concurrency and it will not return until all
    // child URLs are crawled up to MAX_DEPTH limit.
    //
    match task::block_on(
        timeout(timeout_dur, async {
            do_crawl(urls, timeout_dur, 0)
        })
    ) {
        Ok(res) => res,
        Err(_err) => Err(CrawlError::Timedout),
    }
}

//////// PRIVATE METHODS
fn do_crawl(urls: Vec<String>, timeout_dur: Duration, depth: u8) -> Result<usize, CrawlError> {
    if depth >= MAX_DEPTH {
        return Ok(0)
    }

    let mut futures = Vec::new();
    let mut size = 0;

    for u in urls {
        size += 1;
        futures.push(async move {
            let child_urls = match handle_crawl(Request::new(u, depth, timeout_dur)) {
                Ok(urls) => urls,
                Err(_err) => [].to_vec(),
            };
            if child_urls.len() > 0 {
                do_crawl(child_urls, timeout_dur, depth+1)
            } else {
                Ok(0)
            }
        });
    }
    task::block_on(
        async {
            let res: Vec<Result<usize, CrawlError>> = join_all(futures).await;
            let sizes: Vec<usize> = res.iter().map(|r| r.map_or(0, |n|n)).collect::<Vec<usize>>();
            size += sizes.iter().fold(0usize, |sum, n| n+sum);
        }
    );
    Ok(size)
}

// method to crawl a single url
fn handle_crawl(req: Request) -> Result<Vec<String>, CrawlError> {
    let res: Result<Vec<String>, CrawlError> = task::block_on(
        async {
            let contents = match download(&req.url).await {
                Ok(data) => data,
                Err(_err) => return Err(CrawlError::DownloadError),
            };

            if has_contents_changed(&req.url, &contents) && !is_spam(&req.url, &contents) {
                let urls = match index(&req.url, &contents).await {
                    Ok(_) =>
                        match parse_urls(&req.url, &contents) {
                            Ok(urls) => urls,
                            Err(_err) => return Err(CrawlError::ParseError),
                        },
                    Err(_err) => return Err(CrawlError::IndexError),
                };
                return Ok(urls)
            } else {
                return Err(CrawlError::ContentsNotChanged)
            }
        }
    );
    match res {
        Ok(list) => return Ok(list),
        Err(err) => return Err(err),
    }
}

async fn download(url: &str) -> Result<String, CrawlError> {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    // invoke jsrender to generate dynamic content
    jsrender(url, &random_string(100)).await
}

async fn jsrender(_url: &str, contents: &str) -> Result<String, CrawlError> {
    // for SPA apps that use javascript for rendering contents
    Ok(contents.to_string())
}

async fn index(_url: &str, _contents: &str) -> Result<bool, CrawlError> {
    // apply standardize, stem, ngram, etc for indexing
    Ok(true)
}

fn parse_urls(_url: &str, _contents: &str) -> Result<Vec<String>, CrawlError> {
    // tokenize contents and extract href/image/script urls
    Ok((0..MAX_URLS).into_iter().map(|i| random_url(i)).collect())
}

fn has_contents_changed(_url: &str, _contents: &str) -> bool {
    true
}

fn is_spam(_url: &str, _contents: &str) -> bool {
    false
}

fn random_string(max: usize) -> String {
    rand::thread_rng().sample_iter(&Alphanumeric).take(max).collect::<String>()
}

fn random_url(i: u8) -> String {
    let domains = vec!["ab.com", "bc.com", "cd.com", "de.com", "ef.com", "fg.com", "gh.com", "hi.com", "ij.com", "jk.com", "kl.com", "lm.com", "mn.com",
		"no.com", "op.com", "pq.com", "qr.com", "rs.com", "st.com", "tu.com", "uv.com", "vw.com", "wx.com", "xy.com", "yz.com"];
    let domain = domains.choose(&mut rand::thread_rng()).unwrap();
    format!("https://{}/{}_{}", domain, random_string(20), i)
}

The crawl method defines scope of concurrency and asynchronously crawls each URL recursively but the parent URL waits until child URLs are crawled. The async-std provides support for timeout so that asynchronous task can fail early if it’s not completed within the bounded time-frame. However, it doesn’t provide cancellation support so you have to rely on cooperative cancellation.

Following unit-test and main routine shows example of crawling a list of URLs:

use std::time::Duration;
use futures::prelude::*;
use std::time::{Instant};
use crate::crawler::crawler::*;

mod crawler;

fn main() {
    let _ = do_crawl(8000);
}

fn do_crawl(timeout: u64) -> Result<usize, CrawlError> {
    let start = Instant::now();
    let urls = vec!["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"].into_iter().map(|s| s.to_string()).collect();
    let res = crawl(urls, Duration::from_millis(timeout));
    let duration = start.elapsed();
    println!("Crawled {:?} urls in () is: {:?}", res, duration);
    res
}

#[cfg(test)]
mod tests {
    use super::do_crawl;
    #[test]
    fn crawl_urls() {
        match do_crawl(8000) {
            Ok(size) => assert_eq!(size, 19032),
            Err(err) => assert!(false, format!("Unexpected error {:?}", err)),
        }
    }
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/rust_async.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • The async-std runtime environment supports timeout APIs and the crawl method takes the timeout parameter so that the crawling all URLs must complete with the time period.
  • The crawl method captures error from async response and returns the error so that client code can perform error handling.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.

Following are shortcomings using this approach for structured concurrency and general design:

  • Rust async/await APIs doesn’t support native support for cancellation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Rust async/await APIs doesn’t allow you to specify execution context for asynchronous code.
  • The async/await support in Rust is relatively new and has not matured yet. Also, it requires separate runtime environment and there are a few differences in these implementations.
  • Above design for crawler is not very practical because it creates a asynchronous task for each URL that is crawled and it may strain network or IO resources.

Overall, GO provides decent support for low-level concurrency but its complexity can create subtle bugs and incorrect use of go-routines can result in deadlocks. Also, it’s prone to data races due to mutable shared state. Just like structured programming considered GOTO statements harmful and recommended if-then, loops, and function calls for control flow, structured concurrency considers GO statements harmful and recommends parent waits for children completion and supports propagating errors from children to parent. Rust offers async/await syntax for concurrency scope and supports composition and error propagation with strong ownership that reduces chance of data races. Also, Rust uses continuations by suspending async block and async keyword just creates a future and does not start execution so it results in better performance when async code is chained together. However, async/await is still in its inception phase in Rust and lacks proper support for cancellation and customized execution context.

November 4, 2020

Structured Concurrency in modern programming languages – Part-II

Filed under: Computing,Erlang,Languages — admin @ 8:46 pm

In this second part of the series on structured concurrency (Part-I, Part-III, Part-IV), I will review Elixir and Erlang languages for writing concurrent applications and their support for structured concurrency:

Erlang

The Erlang language was created by late Joe Armstrong when he worked at Ericsson and it is designed for massive concurrency by means of very light weight processes that are based on actors. Each process has its own mailbox for storing incoming messages of various kinds. The receive block in Erlang is triggered upon new message arrival and the message is removed and executed when it matches specific pattern match. The Erlang language uses supervisors for monitoring processes and immutable functional paradigm for writing robust concurrent systems. Following is high-level architecture of Erlang system:

As the cost of each process or actor is only few hundred bytes, you can create millions of these processes for writing highly scalable concurrent systems. Erlang is a functional language where all data is immutable by default and the state within each actor is held private so there is no shared state or race conditions.

An actor keeps a mailbox for incoming messages and processes one message at a time using the receive API. Erlang doesn’t provide native async/await primitives but you can simulate async by sending an asynchronous message to an actor, which can then reply back to the sender using its process-id. The requester process can then block using receive API until reply is received. Erlang process model has better support for timeouts with receive API to exit early if it doesn’t receive response within a time period. Erlang system uses the mantra of let it crash for building fault tolerant applications and you can terminate a process and all children processes connected.

Using actor model in Erlang

Following code shows how native send and receive primitives can be used to build the toy web crawler:

-module(erlcrawler).

-export([start_link/0, crawl_urls/3, total_crawl_urls/1]).

-record(request, {clientPid, ref, url, depth, timeout, created_at=erlang:system_time(millisecond)}).
-record(result, {url, status=pending, child_urls=0, started_at=erlang:system_time(millisecond), completed_at, error}).

-define(MAX_DEPTH, 4).
-define(MAX_URL, 11).
-define(DOMAINS, [
  "ab.com",
  "bc.com",
  "cd.com",
  "de.com",
  "ef.com",
  "fg.com",
  "yz.com"]).

make_request(ClientPid, Ref, Url, Depth, Timeout) ->
    #request{clientPid=ClientPid, ref=Ref, url=Url, depth=Depth, timeout=Timeout}.

make_result(Req) ->
    Url = Req#request.url,
    #result{url=Url}.

%%% Client API
start_link() ->
    spawn_link(fun init/0).

%%%%%%%%%%%% public method for crawling %%%%%%%%%%%%
%%% calling private method for crawling
%%% Pid - process-id of actor
%%% 0 - current depth
%%% Urls - list of urls to crawl
%%% Timeout - max timeout
crawl_urls(Pid, Urls, Timeout) when is_pid(Pid), is_list(Urls)  ->
    %% Boundary for concurrency and it will not return until all
    %% child URLs are crawled up to MAX_DEPTH limit.
    do_crawl_urls(Pid, 0, Urls, [], Timeout, 0).

total_crawl_urls(Pid) when is_pid(Pid) ->
    Self = self(),
    Pid ! {total, Self},
    receive {total_reply, Self, N} ->
        N
    end.

%%% Server functions
init() ->
    {ok, DownloaderPid} = downloader:start_link(),
    {ok, IndexerPid} = indexer:start_link(),
    loop(DownloaderPid, IndexerPid, 0).

%%% Main server loop
loop(DownloaderPid, IndexerPid, N) ->
    receive
        {crawl, Req} ->
            CrawlerPid = self(),
            spawn_link(fun() -> handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) end),
            debug_print(N),
            loop(DownloaderPid, IndexerPid, N+1);
        {total, Pid} ->
            Pid ! {total_reply, Pid, N},
            loop(DownloaderPid, IndexerPid, N);
        terminate ->
            ok
    end.


%%% Internal client functions
debug_print(N) when N rem 10000 == 0 ->
    io:format("~p...~n", [{N}]);
debug_print(_) ->
    ok.

%% Go through URLs to crawl, send asynchronous request to crawl and
%% then add request to a list to monitor that will be used to receive
%% reply back from the crawling actor.
do_crawl_urls(_, _, [], [], _, ChildURLs) ->
    ChildURLs; % all done
do_crawl_urls(_, ?MAX_DEPTH, _, _, _, _) ->
    0; % reached max depth, stop more crawling
do_crawl_urls(Pid, Depth, [Url|T], SubmittedRequests, Timeout, 0) when is_pid(Pid), is_integer(Depth), is_integer(Timeout) ->
    %%% monitoring actor so that we are notified when actor process dies
    Ref = erlang:monitor(process, Pid),
    %%% crawling next url to process
    Req = make_request(self(), Ref, Url, Depth, Timeout),
    Pid ! {crawl, Req},
    do_crawl_urls(Pid, Depth, T, SubmittedRequests ++ [Req], Timeout, 0);
do_crawl_urls(Pid, Depth, [], [Req|T], Timeout, ChildURLs) when is_pid(Pid) ->
    %%% receiving response from the requests that were previously stored
    Ref = Req#request.ref,
    receive
        {crawl_done, Ref, Res} ->
            erlang:demonitor(Ref, [flush]),
            do_crawl_urls(Pid, Depth, [], T, Timeout, Res#result.child_urls+ChildURLs+1);
        {'DOWN', Ref, process, Pid, Reason} ->
            erlang:error(Reason)
    after Timeout ->
        erlang:error({crawl_timeout, Timeout})
    end.


%%% Internal server functions called by actor to process the crawling request
handle_crawl(CrawlerPid, Req, DownloaderPid, IndexerPid) ->
    Res = make_result(Req),
    ClientPid = Req#request.clientPid,
    Url = Req#request.url,
    Ref = Req#request.ref,
    Depth = Req#request.depth,
    Timeout = Req#request.timeout,

    case downloader:download(DownloaderPid, Url) of
        {ok, Contents} ->
        {ok, Contents1} = downloader:jsrender(DownloaderPid, Url, Contents),
        Changed = has_content_changed(Url, Contents1),
        Spam = is_spam(Url, Contents1),
        if Changed and not Spam ->
            indexer:index(IndexerPid, Url, Contents1), % asynchronous call
        Urls = parse_urls(Url, Contents1),
                %% Crawling child urls synchronously before returning
                ChildURLs = do_crawl_urls(CrawlerPid, Depth+1, Urls, [], Timeout, 0) + 1,
                Res1 = Res#result{completed_at=erlang:system_time(millisecond), child_urls=ChildURLs},
                ClientPid ! {crawl_done, Ref, Res1};
            true ->
                Res1 = Res#result{completed_at=erlang:system_time(millisecond)},
                ClientPid ! {crawl_done, Ref, Res1}
            end;
        Err ->
            Res1 = Res#result{completed_at=erlang:system_time(millisecond), error = Err},
            ClientPid ! {crawl_done, Ref, Res1}
        end,
    ok.

%%%%%%%%%%%%%%% INTERNAL METHODS FOR CRAWLING %%%%%%%%%%%%%%%%
parse_urls(_Url, _Contents) ->
    % tokenize contents and extract href/image/script urls
    random_urls(?MAX_URL).

random_urls(N) ->
    [random_url() || _ <- lists:seq(1, N)].

has_content_changed(_Url, _Contents) ->
     % calculate hash digest and compare it with last digest
    true.

is_spam(_Url, _Contents) ->
     % apply standardize, stem, ngram, etc for indexing
    false.

random_url() ->
    "https://" ++ random_domain() ++ "/" ++ random_string(20).

random_domain() ->
    lists:nth(random:uniform(length(?DOMAINS)), ?DOMAINS).

random_string(Length) ->
    AllowedChars = "abcdefghijklmnopqrstuvwxyz",
    lists:foldl(fun(_, Acc) -> [lists:nth(random:uniform(length(AllowedChars)), AllowedChars)] ++ Acc end, [], lists:seq(1, Length)).

In above implementation, crawl_urls method takes list of URLs and time out and waits until all URLs are crawled. It uses spawn_link to create a process, which invokes handle_crawl method to process requests concurrently. The handle_crawl method recursively crawl the URL and its children up to MAX_DEPTH limit. This implementation uses separate Erlang OTP processes for downloading, rendering and indexing contents. The handle_crawl sends back the response with number of child URLs that it crawled.

-module(erlcrawler_test).
-include_lib("eunit/include/eunit.hrl").

-define(ROOT_URLS, ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]).

crawl_urls_test() ->
    {spawn, {timeout,30, do_crawl_urls(10000)}}.

%% Testing timeout and by default, it will terminate the test process so we will instead convert
%% kill signal into a message using erlang:exit
crawl_urls_with_timeout_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Timeout = 10, % We know that processing takes longer than 10 milliseconds
    Pid = erlcrawler:start_link(),
    process_flag(trap_exit, true),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout)
    end),
    {{crawl_timeout, _}, _} = receive
        {'EXIT', _, Reason} -> Reason
    after 1000 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_timeout_test: timedout as expected in millis ~p ~n", [{Elapsed}]).

%% Testing terminate/cancellation and killing a process will kill all its children
crawl_urls_with_terminate_test() ->
    %%% crawling next url to process
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    spawn_link(fun() ->
        erlcrawler:crawl_urls(Pid, ?ROOT_URLS, 1000) % crawl_urls is synchronous method so calling in another process
    end),
    receive
    after 15 -> % waiting for a bit before terminating (canceling) process
        exit(Pid, {test_terminated})
    end,
    {test_terminated} = receive
        {'EXIT', Pid, Reason} -> Reason
    after 200 ->
        erlang:error(unexpected_timeout)
    end,
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("crawl_urls_with_terminate_test: terminated as expected in millis ~p ~n", [{Elapsed}]).

do_crawl_urls(Timeout) ->
    Started = erlang:system_time(millisecond),
    Pid = erlcrawler:start_link(),
    N = erlcrawler:crawl_urls(Pid, ?ROOT_URLS, Timeout),
    N1 = erlcrawler:total_crawl_urls(Pid),
    Elapsed = erlang:system_time(millisecond) - Started,
    ?debugFmt("do_crawl_urls: Crawled URLs in millis: ~p ~n", [{N, N1, Elapsed}]),
    ?assertEqual(N1, 19032).

Above tests show three ways to try out the crawl_urls API. First test crawl_urls_test tests happy case of crawling URLs within 10 seconds. The crawl_urls_with_timeout_test tests the timeout behavior to make sure proper error message is returned and all Erlang processes are terminated. The crawl_urls_with_terminate_test tests cancellation behavior by terminating the main crawling process. You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/erl_actor.

Following are major benefits of using this process model to implement structured concurrency:

  • The main crawl_urls method defines high level scope of concurrency and it waits for the completion of child tasks.
  • crawl_urls method takes a timeout parameter so that the crawling all URLs must complete with the time period.
  • Erlang allows parent-child relationship between processes where you can monitor child processes and get notified when a child process dies. You can use this feature to cancel the asynchronous task. However, it will abruptly end all processes and all state within the process will be lost.
  • Erlang implementation captures the error within the response so the client can handle all error handling using pattern matching or other approach common in Erlang applications.

Following are shortcomings using this approach for structured concurrency:

  • The terminate API is not suitable for clean cancellation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Though, you can combine processes in groups or parent child relationships manually but Erlang doesn’t give you a lot of flexibility to specify the context for execution.
  • Unlike async declared methods in Typescript, Erlang code is not easily composable but you can define client code to wrap send/receive messages so that high level code can be comprehended easily. Also, Erlang processes can be connected with parent-child relationships and you can manage composition via process-supervisor hierarchy.
  • Above code creates a new process for crawling each URL and though the overhead of each process is small but it may use other expensive resources such as network resource. We won’t use such approach for real crawler as it will strain the resources on the website being crawled. Instead, we may need to limit how many concurrent requests can be sent to a given website or maintain delay between successive requests.

Using pmap in Erlang

We can generalize above approach into a general purpose pmap that processes an array (similar to map function in functional languages) concurrently and then waits for their response such as:

-module(pmap).

-export([pmap/3]).

pmap(F, Es, Timeout) ->
   Parent = self(),
   Running = [exec(Parent, F, E) || E <- Es],
   collect(Running, Timeout).

exec(Parent, F, E) ->
    spawn_monitor(fun() -> Parent ! {self(), F(E)} end).

collect([], _Timeout) -> [];
collect([{Pid, MRef} | Next], Timeout) ->
  receive
    {Pid, Res} ->
      erlang:demonitor(MRef, [flush]),
      [{ok, Res} | collect(Next, Timeout)];
    {'DOWN', MRef, process, Pid, Reason} ->
      [{error, Reason} | collect(Next, Timeout)]
  after Timeout ->
    erlang:error({pmap_timeout, Timeout})
  end.

You can download full pmap example from https://github.com/bhatti/concurency-katas/tree/main/erl_pmap.

Elixir

The Elixir language is built upon Erlang BEAM VM and was created by Jose Valim to improve usability of Erlang language and introduce Rubyist syntax instead of Prologist syntax in Erlang language. It also removes some of the boilerplate that you needed in Erlang language and adds higher level abstractions for writing highly concurrent, distributed and fault tolerant applications.

Using a worker-pool and OTP in Elixir

As Elixir uses Erlang VM and runtime system, the application behavior will be similar to Erlang applications but following approach uses a worker pool design where the parent process keeps a list of child-processes and delegates the crawling work to child processes in a round-robin fashion:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  # {:ok, pid} = Crawler.start_link(100000)
  def start_link(size) when is_integer(size) do
    GenServer.start_link(__MODULE__, size)
  end

  def total_crawl_urls(pid) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, 30000)
  end

  ### Public client APIs
  def crawl_urls(pid, urls) when is_pid(pid) and is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self())
  end

  ### Internal client APIs
  def crawl_urls(pid, urls, depth, clientPid) when is_pid(pid) and is_list(urls) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ## init method create pool of workers based on given size
  def init(size) when is_integer(size) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    pids = Map.keys(pid_to_workers)
    {:ok, {pid_to_workers, pids, 0}}
  end

  ## handles crawling
  def handle_cast({:crawl, request}, {pid_to_workers, [pid|rest], total_in}) do
    GenServer.cast(pid, {:crawl, request}) # send request to workers in round-robin fashion
    {:noreply, {pid_to_workers, rest ++ [pid], total_in+1}}
  end

  def handle_call({:total_crawl_urls}, _from, {_, _, total_in} = state) do
    {:reply, total_in, state}
  end

  ## OTP Callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, _, total_in}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)
    pids = Map.keys(new_pid_to_workers)
    {:noreply, {new_pid_to_workers, pids, total_in}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

The parent process in above example defines crawl_urls method for crawling URLs, which is defined as an asynchronous API (handle_cast) and forwards the request to next worker. Following is implementation of the worker:

defmodule Worker do
  @moduledoc """
  Documentation for crawling worker.
  """
  @max_url 11
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"

  use GenServer

  # Client APIs
  def start_link(crawler_pid) when is_pid(crawler_pid) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {crawler_pid, downloader_pid, indexer_pid})
  end

  @doc """
  Crawls web url asynchronously
  """
  def handle_cast({:crawl, request}, {crawler_pid, downloader_pid, indexer_pid}=state) do
    handle_crawl(crawler_pid, downloader_pid, indexer_pid, request)
    {:noreply, state}
  end

  def init(crawler_pid) do
      {:ok, crawler_pid}
  end

  # Internal private methods
  defp handle_crawl(crawler_pid, downloader_pid, indexer_pid, req) do
    res = Result.new(req)
    contents = Downloader.download(downloader_pid, req.url)
    new_contents = Downloader.jsrender(downloader_pid, req.url, contents)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(indexer_pid, req.url, new_contents)
      urls = parse_urls(req.url, new_contents)
      Crawler.crawl_urls(crawler_pid, urls, req.depth+1, req.clientPid)
      send req.clientPid, {:crawl_done, Result.completed(res)}
    else
      send req.clientPid, {:crawl_done, Result.failed(req, :skipped_crawl)}
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

The worker process starts downloader and indexer processes upon start and crawls the URL upon receiving the next request. It then sends back the response to the originator of request using process-id in the request. Following unit tests are used to test the behavior of normal processing, timeouts and cancellation:

defmodule CrawlerTest do
  use ExUnit.Case
  doctest Crawler
  @max_processes 10000
  @max_wait_messages 19032
  @root_urls ["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"]

  test "test crawling urls" do
    started = System.system_time(:millisecond)
    {:ok, pid} = Crawler.start_link(@max_processes)
    Crawler.crawl_urls(pid, @root_urls)
    wait_until_total_crawl_urls(pid, @max_wait_messages, started)
  end

  defp wait_until_total_crawl_urls(pid, 0, started) do
    n = Crawler.total_crawl_urls(pid)
    elapsed = System.system_time(:millisecond) - started
    IO.puts("Crawled URLs in millis: #{n} #{elapsed}")
    assert n >= @max_wait_messages
  end

  defp wait_until_total_crawl_urls(pid, max, started) do
    if rem(max, 1000) == 0 do
      IO.puts("#{max}...")
    end
    receive do
      {:crawl_done, _} -> wait_until_total_crawl_urls(pid, max-1, started)
    end
  end

end

Following are major benefits of this approach for its support of structured concurrency:

  • The crawl_urls method in parent process defines high level scope of concurrency and it waits for the completion of child tasks.
  • Above implementation also uses timeout similar to the Erlang example to ensure task is completed within given time period.
  • Above implementation also captures the error within the response similar to Erlang for error handling.
  • This approach addresses some of the shortcomings of previous approach in Erlang implementation where a new process was created for each request. Instead a pool of process is used to manage the capacity of resources.

Following are shortcomings using this approach for structured concurrency:

  • This approach also suffers the same drawbacks as Erlang approach regarding cancellation behavior and you will need to implement a cooperative cancellation to cleanup the resources properly.
  • Similar to Erlang, Elixir also doesn’t give you a lot of flexibility to specify the context for execution and it’s not easily composable.

Using async-await in Elixir

Elixir defines abstracts Erlang process with Task when you only need to execute a single action throughout its lifetime. Here is an example that combines Task async/await with pmap implementation:

defmodule Parallel do
  def pmap(collection, func, timeout) do
    collection
    |> Enum.map(&(Task.async(fn -> func.(&1) end)))
    |> Enum.map(fn t -> Task.await(t, timeout) end)
  end
end
defmodule Crawler do
  @domains [
    "ab.com",
    "bc.com",
    "cd.com",
    "de.com",
    "ef.com",
    "yz.com"]
  @allowed_chars "abcdefghijklmnopqrstuvwxyz"
  @max_depth 4
  @max_url 11

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def crawl_urls(urls, timeout) when is_list(urls) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    ## Starting external services using OTP for downloading and indexing
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    res = crawl_urls(urls, downloader_pid, indexer_pid, 0, timeout)
    ## Stopping external services using OTP for downloading and indexing
    Process.exit(downloader_pid, :normal)
    Process.exit(indexer_pid, :normal)
    res
  end

  def crawl_urls(urls, downloader_pid, indexer_pid, depth, timeout) when is_list(urls) and is_pid(downloader_pid) and is_pid(indexer_pid) and is_integer(depth) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, downloader_pid, indexer_pid, depth, timeout)))
      Parallel.pmap(requests, &(handle_crawl/1), timeout)
    else
      []
    end
  end

  # Internal private methods
  defp handle_crawl(req) do
    {:ok, contents} = Downloader.download(req.downloader_pid, req.url, req.timeout)
    {:ok, new_contents} = Downloader.jsrender(req.downloader_pid, req.url, contents, req.timeout)
    if has_content_changed(req.url, new_contents) and !is_spam(req.url, new_contents) do
      Indexer.index(req.indexer_pid, req.url, new_contents, req.timeout)
      urls = parse_urls(req.url, new_contents)
      res = Crawler.crawl_urls(urls, req.downloader_pid, req.indexer_pid, req.depth+1, req.timeout)
      Enum.reduce(res, 0, &(&1 + &2)) + 1
    else
      0
    end
  end

  defp parse_urls(_Url, _Contents) do
    # tokenize contents and extract href/image/script urls
    random_urls(@max_url)
  end

  defp random_urls(n) do
    1..n |> Enum.map(&(random_url/1))
  end

  defp has_content_changed(_url, _contents) do
    # calculate hash digest and compare it with last digest
    true
  end

  defp is_spam(_url, _contents) do
    # apply standardize, stem, ngram, etc for indexing
    false
  end

  defp random_url(_) do
    "https://" <> random_domain() <> "/" <> random_string(20)
  end

  defp random_domain() do
    Enum.random(@domains)
  end

  defp random_string(n) do
    1..n
    |> Enum.reduce([], fn(_, acc) -> [Enum.random(to_charlist(@allowed_chars)) | acc] end)
    |> Enum.join("")
  end
end

Above example is a bit shorter due to the high level Task abstraction but its design has similar pros/cons as actor and pmap implementation of Erlang example. You can download full source code for this implementation from https://github.com/bhatti/concurency-katas/tree/main/elx_pmap.

Using Queue in Elixir

Following example shows web crawler implementation using queue:

defmodule Crawler do
  @max_depth 4

  @moduledoc """
  Documentation for Crawler.
  """

  ## Client API
  def start_link(size) when is_integer(size) do
    {:ok, downloader_pid} = Downloader.start_link()
    {:ok, indexer_pid} = Indexer.start_link()
    GenServer.start_link(__MODULE__, {size, downloader_pid, indexer_pid})
  end

  ## crawl list of url
  def crawl_urls(pid, urls, timeout) when is_pid(pid) and is_list(urls) and is_integer(timeout) do
    ## Boundary for concurrency and it will not return until all
    ## child URLs are crawled up to MAX_DEPTH limit.
    crawl_urls(pid, urls, 0, self(), timeout)
  end

  # returns number of urls crawled
  def total_crawl_urls(pid, timeout) when is_pid(pid) do
    GenServer.call(pid, {:total_crawl_urls}, timeout)
  end

  ## dequeue returns pops top request from the queue and returns it
  def dequeue(pid) when is_pid(pid) do
    GenServer.call(pid, {:dequeue})
  end

  ###########################################
  ## internal api to crawl urls
  def crawl_urls(pid, urls, depth, clientPid, timeout) when is_pid(pid) and is_list(urls) and is_pid(clientPid) and is_integer(timeout) do
    if depth < @max_depth do
      requests = urls |> Enum.map(&(Request.new(&1, depth, clientPid, timeout)))
      requests |> Enum.map(&(GenServer.cast(pid, {:crawl, &1})))
    else
      :max_depth_exceeded
    end
  end

  ###########################################
  ## init method create pool of workers based on given size
  def init({size, downloader_pid, indexer_pid}) when is_integer(size) and is_pid(downloader_pid) and is_pid(indexer_pid) do
    Process.flag(:trap_exit, true)
    pid_to_workers = 0..size |> Enum.map(&child_spec/1)
    |> Enum.map(&start_child/1)
    |> Enum.into(%{})
    {:ok, {pid_to_workers, :queue.new, 0, 0, downloader_pid, indexer_pid}}
  end

  ## asynchronous server handler for adding request to crawl in the queue
  def handle_cast({:crawl, request}, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    new_queue = :queue.in(request, queue)
    {:noreply, {pid_to_workers, new_queue, total_in+1, total_out, downloader_pid, indexer_pid}}
  end

  ## synchronous server handler for returning total urls crawled
  def handle_call({:total_crawl_urls}, _from, {_, _, _total_in, total_out, _, _} = state) do
    {:reply, total_out, state}
  end

  ## synchronous server handler to pop top request from the queue and returning it
  def handle_call({:dequeue}, _from, {pid_to_workers, queue, total_in, total_out, downloader_pid, indexer_pid}) do
    {head, new_queue} = :queue.out(queue)
    if head == :empty do
      {:reply, {head, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out, downloader_pid, indexer_pid}}
    else
      if rem(:queue.len(queue), 1000) == 0 or rem(total_out+1, 1000) == 0do
        IO.puts("#{total_out+1}...")
      end
      {:value, req} = head
      {:reply, {req, downloader_pid, indexer_pid}, {pid_to_workers, new_queue, total_in, total_out+1, downloader_pid, indexer_pid}}
    end
  end

  ## OTP helper callbacks
  def handle_info({:EXIT, dead_pid, _reason}, {pid_to_workers, queue, total_in, total_out}) do
    # Start new process based on dead_pid spec
    {new_pid, child_spec} = pid_to_workers
    |> Map.get(dead_pid)
    |> start_child()

    # Remove the dead_pid and insert the new_pid with its spec
    new_pid_to_workers = pid_to_workers
    |> Map.delete(dead_pid)
    |> Map.put(new_pid, child_spec)

    {:noreply, {new_pid_to_workers, queue, total_in, total_out}}
  end

  ## Defines spec for worker
  defp child_spec(_) do
    {Worker, :start_link, [self()]}
  end

  ## Dynamically create child
  defp start_child({module, function, args} = spec) do
    {:ok, pid} = apply(module, function, args)
    Process.link(pid)
    {pid, spec}
  end

end

You can download full source code of this example from https://github.com/bhatti/concurency-katas/tree/main/elx_queue.

Using Actor model as Abstract Data Structure

As the cost of actors is very small, you can also use it as an abstract data structure or objects that maintains internal state. Alan Kay, the pioneer in object-oriented programming described message-passing, isolation and state encapsulation as foundation of object-oriented design and Joe Armstrong described Erlang as the only object-oriented language. For example, let’s say you need to create a cache of stock quotes using dictionary data structure, which is updated from another source and provides easy access to the latest quotes. You would need to protect access to shared data in multi-threaded environment with synchronization. However, with actor-based design, you may define an actor for each stock symbol that keeps latest value internally and provides API to access or update quote data. This design will remove the need to synchronize shared data structure and will result in better performance.

Overall, Erlang process model is a bit low-level compared to async/await syntax and lacks composition in asynchronous code but it can be designed to provide structured scope, error handling and termination. Further, immutable data structures and message passing obviates the need for locks to protect shared state. Another benefit of Erlang/Elixir is its support of distributed services so it can be used for automatically distributing tasks to remote machines seamlessly.

October 29, 2020

Structured Concurrency in modern programming languages – Part-I

Filed under: Computing,Languages,Technology — admin @ 11:01 pm

Herb Sutter wrote about fifteen years ago how free performance lunch is over and you need to leverage concurrency to build high performance applications on modern multi-core machines. Unfortunately, adding concurrency support is not simple and low-level concurrency primitives in many languages can lead to buggy code with potential deadlocks and concurrent code can be hard to understand. Over the last few years, a number of programming languages have been improving support for concurrency and in this series of blogs (Part-II, Part-III, Part-IV), I will review some of programming languages that I have used in my current or past project such as Typescript, Elixir, Erlang, GO, Kotlin, and Rust. In particular, I will examine how these languages support structured concurrency so that concurrent code looks like sequential code and can be reasoned easily. Strictly speaking, concurrency relates to application behavior when modern operating systems use context switching to interleave execution of multiple tasks, whereas parallelism allow those tasks to be executed simultaneously. I will evaluate concurrency support in the context of multi-core hardware where we can guarantee correct behavior with preemptive/collaborative based multi-tasking and gain parallelism by utilizing multiple cores. The parallelism across multiple machines or distributed computing will be out of scope for this discussion.

Pitfalls with Concurrency

Before diving into the structured concurrency support, let’s review a few barriers that make writing concurrent code hard such as:

The control and data flow

The sequential code is easier to understand because you can predict the order of execution and though compilers or runtime environments may optimize that code with slightly different order but the top-down structure would remain intact. As opposed, concurrent code using threads/executors is disconnected from the main control and data flow that makes composition, error handling, timeout or cancellation in asynchronous code much harder. In addition, concurrent code requires coordination between threads with some overhead and requires synchronization to prevent data races that is hard to get right resulting in obscure and brittle code.

Race conditions

The race conditions is caused when the application behavior is determined by timing and interleaving of execution steps by multiple threads. The race conditions cause faulty behavior when the shared state or critical section is not properly guarded in a multi-threaded environment. You can eliminate race conditions by removing the shared state, using immutable objects or protecting critical sections using synchronization, mutex or locks.

Mutual Exclusion

The low-level locking primitives such as mutex, read/write/re-entrant locks are difficult to work with and add considerable complexity to your code. The buggy or incorrect implementation can lead to starvation, deadlocks or faulty behavior. Some libraries provide lock-free or concurrent data structures using atomic compare-and-swap (CAS) but they can still prone to contention when accessing from multiple threads.

Deadlocks

You may need to use locks to protect critical sections of the code and avoid race condition but incorrect implementation can lead to deadlocks where a threads can’t make any progress because it’s waiting for the resource held by another thread. In addition, the concurrent code may experience starvation when a thread can’t make a progress or livelock when multiple threads are stepping on each other. In order to circumvent deadlocks and livelocks, you can avoid nested locks, reduce number of locks and reduce scope of critical section. Instead, you can use re-entrant lock or fair locks to favor thread waiting for the longest time.

Callback Hell

A common pattern in many languages when calling an asynchronous method is to pass a callback function or lambda that is invoked when background task is completed. However, this structure devolves into complete mess when multiple asynchronous methods are chained, e.g.

class Crawler {
    crawl(url) {
        download(url, (contents) => {
            render(url, contents, (rendered) => {
                index(url, contents, (data) => { // index could have been running in parallel to parse
                    parse(contents, (urls) => {
                        urls.forEach((u) => crawl(u))
                    } // parse
                }) // index
            }) // render
        }) // download
    }

    download(url, cb) {

        ....
            cb(result)
    }

    render(url, contents, cb) {
        ....
            cb(result)
    }

    index(url, contents, cb) {
        ....
            cb(result)
    }

    parse(url, contents, cb) {
        ....
            cb(result)
    }
}

As you can see, the callback pattern quickly divulges into unwieldy mess and it’s hard to manage the results and error handling from within nested scope. Each callback is called upon completion of previous operation and you can’t use these callbacks for concurrent operations easily.

Promise Hell

Another common pattern for invoking an asynchronous method is to use promise or future objects such as:

class Crawler {
    crawl(url) {
        return download(url)
            .then((contents) =>
                render(url, contents)
            ).then((rendered) => {
                const indexPromise = index(url, contents)
                const parsePromise = parse(url, contents)
                return Promise.all([indexPromise, parsePromise]) // run both tasks in parallel
            }).then([indexResult, urls] => {
                return parse(contents, (urls) => {
                    urls.forEach((u) => crawl(u))
                } // parse
            }).catch((e) => {
                // error handling
            })
        }) // download
    }

    download(url) {
        return new Promise((resolve, reject) => {
            // async operation
            // ....
            if (ok) {
                resolve(result)
            } else {
                reject(new Error('failed'))
            }
        })
    }

    render(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    index(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    parse(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }
}

Though, promise model is a bit improved and you can manage concurrent tasks better using Promise.all but you still have to nest operations and error handling requires two separate ways to catch errors (using catch blocks and native try/catch). Another gotcha in dynamic languages such as Javascript is forgetting return in the last statement of promise.

Error Handling

Above examples show that error handling in asynchronous code is tricky and error prone. For example, when using callbacks, you can pass two callback methods, one for valid results and another for error but the nested scope makes it hard to handle these errors. Similarly, when using catch blocks in promises, it’s not clear which operation failed and adds substantial complexity if you need to recover some errors or perform an alternate operation based on conditional logic. You also have to combine promise specific catch blocks with normal try/catch blocks and it’s easy to miss proper error checking.

Cancellation and Timeout

As asynchronous code is run in a separate thread of execution, the cancelling or timing out requires some coordination between threads and it can be hard to implement in absence of library or language support. For example, a thread in expensive computation or database query can’t be cancelled if it’s blocking until that operation is completed. Some libraries support APIs to stop threads but that can leave process in unpredictable state, other libraries use signals to notify threads about termination. In order to properly cancel, you need non-blocking and cooperative model where the detached task checks for cancellation request periodically. Optimally, cancellation needs to cancel underlying operation so that application state remains consistent. Timeout is just an extension of cancellation behavior where asynchronous task is cancelled if it’s not completed within a specified time bound.

Debugging/Stack-traces

Asynchronous code makes it hard to see the call graph or stack traces from caller’s perspective due to execution in a separate thread. For example, you may see following stack trace in case of a database error on NodeJS where root-cause is not easily apparent:

at new QueryResultError (node_modules/pg-promise/lib/errors/queryResult.js:122:24)
at Query.ctx.db.client.query (node_modules/pg-promise/lib/query.js:192:41)
at Query.handleReadyForQuery (node_modules/pg/lib/query.js:126:10)
at Connection.<anonymous> (node_modules/pg/lib/client.js:163:19)
at Socket.<anonymous> (node_modules/pg/lib/connection.js:118:12)
at addChunk (_stream_readable.js:288:12)
at readableAddChunk (_stream_readable.js:269:11)
at Socket.Readable.push (_stream_readable.js:224:10)
at TCP.onStreamRead [as onread] (internal/stream_base_commons.js:94:17)

Out of scope

Note: you may need to handle other concerns such as retries with exponential back-off, circuit-breakers, or idempotence behavior with asynchronous code in distributed systems but it won’t be discussed here.

Concurrency Constructs

I have discussed some of concurrency primitives in my old blogs [1634, 1638, 1621] but following are common constructs or building blocks that are used for implementing concurrency support:

Threads

A thread defines a smallest unit of execution and multi-threading allows executing concurrent operations. There are two types of threads:

Native/OS-Thread

The native threads are tied with kernel threads and are scheduled using preemptive multi-tasking on modern operating systems. The operating system preempts native thread upon IO operation, wait/sleep, hardware interrupts or context switching. Native threads have high cost due to its stack size (about 256KB) and system overhead. As a result, a thread-pool of limited size is often used to share system resources for background processing.

Green/User-space Thread

The green threads use cooperative scheduling where a user-space scheduler performs context switching thus the overhead of spawning new threads is very small. The user-space schedulers use M:N model for mapping M schedulers to N native threads such as:

As green threads use cooperative scheduling, they generally require yield to allow other threads to proceed. The user-space schedulers are not suitable for blocking operations such as sleep or blocking IO. For example, Java initially supported green threads but it replaced with native threads to support preemptive multi-tasking. Similarly, earlier version of Rust used green threads with blocking IO that resulted in slow performance so it replaced green threads with native threads in later version. Thus, green threads are generally used with non-blocking IO or waits that automatically preempts the thread, saves its stack and resumes another thread. As a general rule, native threads work better if an application is CPU bound or requires real-time priority and green threads/coroutines provide better concurrency with IO bound applications.

Structured Concurrency

The structured concurrency was envisioned by Martin Sústrik to simplify writing concurrent applications. Following are building blocks of structured concurrency that remedy concurrency related issues discussed above:

Concurrency Scope

The structured concurrency defines a single entry and exit similar to top-down structured programming. It defines a concurrency scope or boundary where all asynchronous tasks must complete by the end of scope. The scope may optionally define the context or queue where the tasks will be run. This model simplifies semantics of asynchronous behavior because the lifetime of child tasks is tied with the parent scope. The parent scope automatically waits until all child tasks are completed.

Execution Context

The structured concurrency allows you to specify context, threads or queues where asynchronous code will run so that you can manage related asynchronous code and underlying resources easily.

Cancellation and Timeout

The structured concurrency provides first-class support for cancellation and timeout though it still requires that child tasks support cooperative cancellation as blocking operations cannot be easily interrupted.

Error Handling

The errors from child tasks are automatically propagated to the parent scope where they can be handled in a consistent way using language provided syntax.

Immutability or value semantics

The structured concurrency encourages use of immutable objects or pass by value semantics to avoid race condition or need for locks to protect the critical section.

Composition

The structured concurrency allows composing asynchronous code within another asynchronous function so that data and control flow can be managed easily. It allows errors to be propagated from the nested asynchronous code to calling function and it cancels all child asynchronous tasks if parent task is cancelled.

What’s missing from Structured Concurrency

The structured concurrency doesn’t specify exact paradigm of concurrency mechanism such as threads, coroutines, fibers, generators, actors, etc and instead it focuses on concurrency scope, data/control flow, error handling, cancellation/timeout and composition. Thus, it won’t solve data races if your application requires mutable shared data, which is accessed from multiple threads or coroutines concurrently so you will need to rely on synchronization primitives to protect the critical section.

Toy Web Crawler

I will use a toy implementation of a simple web crawler to show support of structured concurrency in some of my preferred languages. Following is a pseudocode of sequential version of this crawler:

MAX_DEPTH = 5
class URLRequest: 
  url, status, depth, error, created_at, started_at, completed_at

class WebCrawler:
  def crawl_all(root_urls):
	# priority such as page-rank
	pq = PriorityQueue()
	# track duplicate urls
	visited = Set()
	# add root urls to queue
	for url in root_urls:
		pq.add(URLRequest(url, pending, 0))

	# crawl urls using BFS
	total = 0
	while not pq.isEmpty():
		total+=1
		request = pq.pop()
		visited.add(request.url)
		urls = crawl(request)
		for url in urls:
			if not visited.contains(url) and 
            not is_spam(url) and request.depth+1 < MAX_DEPTH:
				pq.add(URLRequest(url, pending, request.depth+1))
	# all done
	print total

  # download, parse and index given url in the request
  def crawl(request):
	urls = []
	try:
		rqeuest.started_at = Date()
		contents = download(request.url)
		contents = jsrender(request.url, contents)
		if has_content_changed(request.url, contents):
			index(request.url, contents)
			urls = parse_urls(request.url, contents)
		request.status = completed
	except: err
		request.status = failed
		request.error = err
	# mark request completion time
	request.completed_at = Date()
	return urls

  def download(url):
	# check robots.txt and throttle policies
 	# may need timeout for slow websites and linearize 
    # requests to the same domain to prevent denial of service attack

  def jsrender(url, contents):
 	# for SPA apps that use javascript for rendering contents
	return contents

  def index(parent_url, contents):
	# apply standardize, stem, ngram, etc for indexing

  def parse_urls(parent_url, contents):
	# tokenize contents and extract href/image/script urls
	return urls

  def is_spam(url):
	# check spam or low-quality urls
	return false

  def has_content_changed(url, contents):
	# calculate hash digest and compare it with last digest
	return true

Above example defines crawl_all method to crawl list of root-urls that recursively invokes crawl method using breadth-first-search. The crawl method invokes stub-out methods for downloading url, parsing contents and indexing the contents.

Typescript

Typescript/Javascript on NodeJS platform offers a unique design for managing concurrency where only a single thread processes requests from event queue. Following is high-level architecture of NodeJS:

NodeJS process uses a single thread that takes next operation to execute form the event queue and executes in an event loop. It delegates some of system calls and asynchronous code to a small thread pool and uses non-blocking API when performing disk or network IO operations. This architecture eliminates the need to synchronize shared data as there is only a single thread accessing application state at a time (similar to actor model).

Using async/await in Typescript

Following is an implementation of web crawler using async-await syntax in Typescript:

import { Request, Response } from '../types/index';

const MAX_DEPTH = 4;
const MAX_URLS = 11;
const DOMAINS = [
  'ab.com',
  'bc.com',
  'cd.com',
  'yz.com',
];

export class Crawler {
  async crawl(urls: string[], timeoutMillis: number): Promise<number> {
    // Main scope of concurrency begin
    const res = await doCrawl(urls, 0, timeoutMillis);
    return res.childURLs;
    // Main scope of concurrency end    
  }
}

///////////////// PRIVATE METHODS ////////////////
const doCrawl = async (
  urls: string[],
  depth: number,
  timeoutMillis: number
): Promise<Response> => {
  const res = new Response();
  if (depth >= MAX_DEPTH) {
    res.failed('max-depth');
    return res;
  }
  const requests = urls.map((u) => new Request(u, depth, timeoutMillis));
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

  const childURLs : number = results.reduce((total: number, r: Response) => total + r.childURLs, 0);
  res.succeeded(childURLs);
  return res;
};


const handleCrawl = async (req: Request): Promise<Response> => {
  const res = new Response();
  const contents = await download(req.url);
  const newContents = await jsrender(req.url, contents);
  if (
    hasContentsChanged(req.url, newContents) &&
    !isSpam(req.url, newContents)
  ) {
    await index(req.url, newContents);
    const urls = await parseURLs(req.url, newContents);
    const childResp = await doCrawl(urls, req.depth + 1, req.timeoutMillis);
    res.succeeded(childResp.childURLs + 1);
  } else {
    res.failed("contents didn't change");
  }
  return res;
};

const download = async (url: string): Promise<string> => {
  // TODO check robots.txt and throttle policies
  // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
  return randomString(100);
};

const jsrender = async (url: string, contents: string): Promise<string> => {
  // for SPA apps that use javascript for rendering contents
  return contents;
};

const index = async (url: string, contents: string) => {
  // apply standardize, stem, ngram, etc for indexing
};

const parseURLs = (url: string, contents: string): string[] => {
  // tokenize contents and extract href/image/script urls
  const urls = [];
  for (var i = 0; i < MAX_URLS; i++) {
    urls.push(randomUrl());
  }
  return urls;
};

const hasContentsChanged = (url: string, contents: string): boolean => {
  return true;
};

const isSpam = (url: string, contents: string): boolean => {
  return false;
};

const randomUrl = (): string => {
  const i = Math.floor(Math.random() * DOMAINS.length);
  return 'https://' + DOMAINS[i] + '/' + randomString(20);
};

const randomString = (n: number): string => {
  let letters =
    'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
  let text = '';
  for (let i = 0; i < n; i++) {
    text += letters.charAt(Math.floor(Math.random() * letters.length));
  }
  return text;
};

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

The async-await syntax in Typescript is a syntactic sugar on top of promises. The async function returns a promise and automatically wraps return value with Promise.

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following is a unit test for testing the behavior of async/await based crawler:

import { Crawler } from '../lib/index';
import { expect } from 'chai';

const EXPECTED_URLS = 19032;
const ROOT_URLS = [
  'a.com',
  'b.com',
  'c.com',
  'd.com',
  'e.com',
  'f.com',
  'g.com',
  'h.com',
  'i.com',
  'j.com',
  'k.com',
  'l.com',
  'n.com',
];

describe('crawler', async () => {
  it('crawling urls with nesting', async () => {
    const started = new Date().getTime();
    const timeout = 5000;
    const crawler = new Crawler();
    const res = await crawler.crawl(ROOT_URLS, timeout);
    const elapsed = new Date().getTime() - started;
    console.log(`Crawl took ${elapsed} to process ${res}`);
    expect(res).equal(EXPECTED_URLS);
  });

});

Following are some of the concurrency support in Typescript:

  • Concurrency scope – Though, typescript doesn’t support concurrency scope as first class citizen or an option to specify queue/context but you can manage boundary using await in the main method.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling – Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • As NodeJS uses a single thread in an event loop, you don’t need to worry about shared state being updated in multiple threads.

Following are the major shortcomings in Typescript for its support of structured concurrency:

  • Typescript doesn’t support value semantics and objects are passed by reference except primitive types.
  • As NodeJS uses a single thread for even loop with small thread pool for asynchronous operation, it limits the tasks that you can run in parallel on multi-core hardware.
  • Typescript doesn’t support cancellation and timeout natively so you have to rely on cooperative cancellation. You can implement timeout partially using Promise.race but it’s not a reliable way to handle timeouts, e.g.
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

You can download full code for Typescript example from https://github.com/bhatti/concurency-katas/tree/main/node.

Overall, Typescript/NodeJS is suitable for IO-bound applications where a little time is spent on each operation so that event-loop can switch to next task but it’s not suitable when the application has blocking/CPU-bound operations, requires more background tasks or requires high level of concurrency and parallelism.

December 13, 2009

Dynamic Inheritance and Composition

Filed under: Languages — admin @ 3:55 pm

Static Inheritance

Inheritance is a core feature of object oriented languages that has been used to simulate real world by modeling closely related objects and to build reusable code. The inheritance relationship is defined statically in class specifications and it comes in various flavors such as:

Single Inheritance

It allows a class to be extended by just one other class.

Multiple Inheritance

It allows a class to be derived from multiple classes and historically has been difficult to maintain and has been source of diamond inheritance in C++, though other languages use order such as Method Resolution Order (MRO) in Python to avoid those issues.

Interfaces

The interfaces are used in C# and Java to define methods without implementation and a class can implement multiple interfaces without the downsides of multiple inheritance.

Mixins

The mixins are available in Ruby and D, that use mixins for code reuse. The mixins are similar to interfaces with implementations except they aggregate methods and attributes at runtime.

Traits

The traits are available in Squeak and Scala and are conceptually similar to Mixins except traits do not allow attributes.

Dynamic Inheritance

As opposed to static inheritance, dynamic inheritance can be added at runtime using Object Extension Pattern, which I first learned in Erich Gamma, et al’s Gof patterns. In late 90s, I used Voyager ORB for building distributed systems, which used this pattern. Following example shows how this pattern can be used:

Let’s define a marker interface Extension in Java such as:

 1 package ext;
 2 
 3 public interface Extension {
 4 
 5 }
 6 
 7 
 

Then create a factory class such as

 1 package ext;
 2 
 3 public class ExtensionsFactory {
 4     public void register(final Class subject, final Extension ... exts) {/* ... */}
 5     public <T> T get(final Object subject, final Class<T> extClass) { /* ... */ return ;}
 6 }
 7 
 8 
 

The subject is object that needs to extend extensions, e.g. let’s assume you have a User class and you need to add hobbies, you can do it as follows:

 1 package domain;
 2 
 3 public class User {
 4     //...
 5 }
 6 
 7 
 

And you then define Hobbies as follows:

 1 package domain;
 2 
 3 public class Hobbies implements ext.Extension {
 4     public Hobbies(User user) {
 5         // ...
 6     }
 7 }
 8 
 9 
 

At runtime, you can register Hobbies to User and use it as follows

  1 package test;
  2 
  3 public class Main {
  4     public static void main(String[] args) {
  5         ExtensionsFactory f = new ExtensionsFactory();
  6 
  7         f.register(User.class, Hobbies.class);
  8 
  9         //
 10         User user = new User();
 11         Hobbies hobbies = f.get(user, Hobbies.class);
 12     }
 13 
 14 }
 15 
 16 
 

The dynamic inheritance allows you to follow open-closed principle by extending classes without modifying existing classes and allows you to choose features that you need at runtime. Of course, dynamic languages such as Ruby make this a lot easier as you can extend classes or objects with modules at runtime, e.g.

  1 ### defining Hobbies extension
  2 module Hobbies
  3   def hobbies
  4   end
  5 end
  6 
  7 ### defining User class
  8 class User
  9 end
 10 
 11 user = User.new.extend(Hobbies)
 12 
 13 puts user.singleton_methods   #["hobbies"]
 14 
 15 ## or
 16 ### binding Hobbies with User at runtime
 17 class << User
 18   include Hobbies
 19 end
 20 puts User.singleton_methods   # ["hobbies"]
 21 
 22 
 23 
 

In real life, the inheritance relationship can be difficult to get right and often you have to use Liskov Substitution Principle to ensure base class can be replaced by derived class in all uses of the base class. However, dynamic inheritance acts more like Composition feature so above technique can also be used to implement dynamic composition. The dynamic inheritance or composition allows you to mix and match features you need at runtime and build extendable systems. This technique has been success key of evolution of Eclipse IDE. Also, this technique goes nicely with the Adaptive Object Modeling technique I described in my last post to build easily extendable systems.

March 25, 2009

When in Rome, code like how Romans code

Filed under: Languages — admin @ 12:11 pm

I have been programming for over twenty years and I have learned a number of programming languages over the years. One of recurring behavior I have seen in a lot of programmers is that they take a lot of programming habbits (good or bad) from old language(s) to the new language. This could be how you design the application, style of coding, naming conventions, etc. I remember when I switched from C to C++, I was used to procedural thinking and had to learn how to break the problem into classes and how to assign responsibility to different classes. Similarly, when I starting using Java back in 95-95, I had to learn about Java’s peculiar style. For example, I used to declare public methods in C++ at top and all private methods including attributes at the bottom. I also tended to use underscores to prefix member attributes. I slowly learned Java’s style of declaring class attributes at top, using all uppercase for constants, camel case, etc.

In early 2000s, I learned Ruby from PicAxe Ruby book that taught me Ruby from object oriented style and I missed all its functional or meta-programming features. I slowly learned more functional style of programming and meta programming. I even had to switched back to underscores as opposed to camel case. I read Ruby code of other programmers to learn how they code and what conventions they use. I did similar excercises when I learned Python, Erlang, Scala, Objective-C, etc, i.e., I tried to learn not only language itself, its core and third party libraries but how people write the code, package applications or create libraries. Though, I think it helps if there are examples of good usage or style for that language. For example, I have seen plenty of abuses of Javascript that misunderstood its prototype or functional roots and used it as either procedural or class oriented language.

At my work, we use code reviews before any code checkin and I see conventions and styles of other languages mixed in all the time. I think learning different styles of programming makes you a better programmer. For example, I learned from functional programming how immutability can make sure programs safer and I tend to use it more in other object oriented or multi-paradigm langauges that don’t enforce immutability. Though, in other cases it’s hard to force yourself to use features from one language to another when that feature isn’t available inherently. For example, I like mixins feature of Ruby or traits of Scala but I can’t really use them in langauges that support only single inheritance such as Java. So instead of jumping over hoops to use features from other language, I try to use the style suitable for that specific langauge such as using multiple interfaces. I have been learning iphone development and been reading iphone SDK book by Jonathan Zdziarski. One of peculiar thing about his coding examples is that he does not use Interface Builder and creates all UI components from the code. Though, such style is acceptable in many situations but I would prefer to use Interface Builder and follow path of least resistence.

In practice, you will often find multiple styles or approaches of doing a thing in a single language. For example, Ruby encourage multiple ways to do things that can be quite confusing. Though, I like Python’s philolsophy of only one way to do things, but there are plenty of divergent opinions in that language as well. Another somewhat related topic is how to pick a language as languages vary in their core areas of strength. For example, Java was originally marketed as language for Web platform but these days I tend to use Ruby or Python for web development and Java for system development. Also, I tend to use Erlang for network oriented or concurrent applications and use C/C++ where performance is critical. Last year, there was big hoopla over Erlang’s aweful performance for search engine that sparked WideFinder benchmarks but it missed the point that Erlang’s core strength is distributed/concurrent applications and not text searching. So in nutshell, I think it helps to pick a language based on the problem and take advantage of its strengths. Finally, stick to general style of coding and conventions of the language especially when working with large codebase or large number of programmers.

Powered by WordPress