Shahzad Bhatti Welcome to my ramblings and rants!

March 24, 2022

Architecture Patterns and Practices for Sustainable Software Delivery Pipelines

Filed under: Project Management,Technology — Tags: , , , , — admin @ 10:31 pm

Abstract

Software is eating the world and today’s businesses demand shipping software features at a higher velocity to enable learning at a greater pace without compromising the quality. However, each new feature increases viscosity of existing code, which can add more complexity and technical debt so the time to market for new features becomes longer. Managing a sustainable pace for software delivery requires continuous improvements to the software development architecture and practices.

Software Architecture

The Software Architecture defines guiding principles and structure of the software systems. It also includes quality attribute such as performance, sustainability, security, scalability, and resiliency. The software architecture is then continuously updated through iterative software development process and feedback cycle from the the actual use in production environment. The software architecture decays if it’s ignored that results in a higher complexity and technical debt. In order to reduce technical debt, you can build a backlog of technical and architecture related changes so that you can prioritize along with the product development. In order to maintain consistent architecture throughout your organization, you can document the architecture principles to define high-level guidelines for best practices, documentation templates, review process and guidance for the architecture decisions.

Quality Attributes

Following are major quality attributes of the software architecture:

  • Availability — It defines percentage of time, the system is available, e.g. available-for-use-time / total-time. It is generally referred in percentiles such as P99.99, which indicates a down time of 52 minutes in a year. It can also be calculated in terms of as mean-time between failure (MTBF) and mean-time to recover (MTRR) using MTBF/(MTBF+MTRR). The availability will depend not only on the service that you are providing but also its dependent services, e.g. P-service * P-dep-service-1 * P-dep-service-2. You can improve availability with redundant services, which can be calculated as Max-availability - (100 - Service-availability) ** Redundancy-factor. In order to further improve availability, you can detect faults and use redundancy and state synchronization for fault recovery. The system should also handle exceptions gracefully so that it doesn’t crash or goes into a bad state.
  • Capacity — Capacity defines how the system scales by adding hardware resources.
  • Extensibility — Extensibility defines how the system meets future business requirements without significantly changing existing design and code.
  • Fault Tolerance — Fault tolerance prevents a single point of failure and allows the system to continue operating even when parts of the system fail.
  • Maintainability — Higher quality code allows building robust software with higher stability and availability. This improves software delivery due to modular and loosely coupled design.
  • Performance — It is defined in terms of latency of an operation under normal or peak load. A performance may degrade with consumptions of resources, which affects throughput and scalability of the system. You can measure user’s response-time, throughput and utilization of computational resources by stress testing the system. A number of tactics can be used to improve performance such as prioritization, reducing overhead, rate-limiting, asynchronicity, caching, etc. Performance testing can be integrated with continuous delivery process that use load and stress testing to measure performance metrics and resource utilization.
  • Resilience — Resilience accepts the fact that faults and failure will occur so instead system components resist them by retrying, restarting, limiting error propagation or other measures. A failure is when a system deviates from its expected behavior as a result of accidental fault, misconfigurations, transient network issues or programming error. Two metrics related to resilience are mean-time between failure (MTBF) and mean-time to recover (MTTR), however resilient systems pay more attention to recovery or a shorter MTTR for fast recovery.
  • Recovery — Recovery looks at system recover in relation with availability and resilience. Two metrics related to recovery are recovery point objective (RPO) and recovery time objective (RTO), where RPO determines data that can be lost in case of failure and RTO defines wait time for the system recovery.
  • Reliability — Reliability looks at the probability of failure or failure rate.
  • Reproducibility — Reproducibility uses version control for code, infrastructure, configuration so that you can track and audit changes easily.
  • Reusability — It encourages code reuse to improve reliability, productivity and cost savings from the duplicated effort.
  • Scalability — It defines ability of the system to handle increase in workload without performance degradation. It can be expressed in terms of vertical or horizontal scalability, where horizontal reduces impact of isolated failure and improves workload availability. Cloud computing offers elastic and auto-scaling features for adding additional hardware when higher request rate is detected by the load balancer.
  • Security — Security primarily looks at confidentiality, integrity, availability (CIA) and is critical in building distributed systems. Building secure systems depends on security practices such as a strong identity management, defense in depth, zero trust networks, auditing, ad protecting while data in motion or at rest. You can adopt DevSecOps that shifts security left to earlier in software development lifecycle with processes such as Security by Design (SbD), STRIDE (Spoofing, Tampering, Repudiation, Disclosure, Denial of Service, Elevation of privilege), PASTA (Process for Attack Simulation and Threat Analysis), VAST (Visual, Agile and Simple Threat), CAPEC (Common Attack pattern Enumeration and Classification), and OCTAGE (Operationally Critical Threat, and Vulnerability Evaluation).
  • Testability — It encourages building systems in a such way it’s easier to test them.
  • Usability — It defines user experience of user interface and information architecture.

Architecture Patterns

Following is a list of architecture patterns that help building a high quality software:

Asynchronicity

Synchronous services are difficult to scale and recover from failures because they require low-latency and can easily overwhelm the services. Messaging-based asynchronous communication based on point-to-point or publish/subscribe are more suitable for handling faults or high load. This improves resilience because service components can restart in case of failure while messages remain in the queue.

Admission Control

The admission control adds a authentication, authorization or validation check in front event queue so that service can handle the load and prevent overload when demand exceeds the server capacity.

Back Pressure

When a producer is generating workload faster than the server can process, it can result in long request queues. Back pressure signals clients that servers are overloaded and clients need to slow down. However, rogue clients may ignore these signals so servers often employ other tactics such as admission control, load shedding, rate limiting or throttling requests.

Big fleet in front of small fleet

You should look at all transitive dependencies when scaling a service with a large fleet of hosts so that you don’t drive a large network traffic that needs to invoke dependent services with a smaller fleet. You can use use load testing to find the bottlenecks and update SLAs for the dependent services so that they are aware of network load from your APIs.

Blast Radius

A blast radius defines impact of failure on overall system when an error occurs. In order to limit the blast radius, the system should eliminate a single point of failure, rolling deploy changes using canaries and stop cascading failures using circuit breakers, retry and timeout.

Bulkheads

Bulkheads isolate faults from one component to another, e.g. you may use different thread pool for different workloads or use multiple regions/availability-zones to isolate failures in a specific datacenter.

Caching

Caching can be implemented at a several layers to improve performance such as database-cache, application-cache, proxy/edge cache, pre-compute cache and client-side cache.

Circuit Breakers

The circuit breaker is defined as a state machine with three states: normal, checking and tripped. It can be used to detect persistent failures in a dependent service and trip its state to disable invocation of the service temporarily with some default behavior. It can be later changed to the checking state for detecting success, which changes its state to normal after a successful invocation of the dependent service.

CQRS / Event Sourcing

Command and Query Responsibility Segregation (CQRS) separates read and update operations in the database. It’s often implemented using event-sourcing that records changes in an append-only store for maintaining consistency and audit trails.

Default Values

Default values provide a simple way to provide limited or degraded behavior in case of failure in dependent configuration or control service.

Disaster Recovery

Disaster recovery (DR) enables business continuity in the event of large-scale failure of data centers. Based on cost, availability and RTO/RPO constraints, you can deploy services to multiple regions for hot site; only replicate data from one region to another region while keeping servers as standby for warm site; or use backup/restore for cold site. It is essential to periodically test and verify these DR procedures and processes.

Distributed Saga

Maintaining data consistency in a distributed system where data is stored in multiple databases can be hard and using 2-phase-commit may incur high complexity and performance. You can use distributed Saga for implementing long-running transactions. It maintains state of the transaction and applies compensating transactions in case of a failure.

Failing Fast

You can fail fast if the workload cannot serve the request due to unavailability of resources or dependent services. In some cases, you can queue requests, however it’s best to keep those queues short so that you are not spending resources to serve stale requests.

Function as a Service

Function as a service (FaaS) offers serverless computing to simplify managing physical resources. Cloud vendors offer APIs for AWS Lambda, Google Cloud Functions and Azure Functions to build serverless applications for scalable workloads. These functions can be easily scaled to handle load spikes, however you have to be careful scaling these functions so that any services that they depend on can support the workload. Each function should be designed with a single responsibility, idempotency and shared nothing principles that can be executed concurrently. The serverless applications generally use event-based architecture for triggering functions and as the serverless functions are more granular, they incur more communication overhead. In addition, chaining functions within the code can result in tightly coupled applications, instead use a state machine or a workflow to orchestrate the communication flow. There is also an open source support for FaaS based serverless computing such as OpenFaas and OpenWhisk on top of Kubernetes or OpenShift, which prevents locking into a specific cloud provider.

Graceful Degradation

Instead of failing a request when dependent components are unhealthy, a service may use circuit-breaker pattern to return a predefined or default response.

Health Checks

Health checks runs a dummy or synthetic transaction that performs the action without affecting real data to verify the system component and its dependencies.

Idempotency

Idempotent services completes API request only exactly once so resending same request due to retries has no side effect. Idempotent APIs typically uses a client-generated identifier or token and the idempotent service returns same response if duplicate request is received.

Layered Architecture

The layered architecture separates software into different concerns such as:

  • Presentation Layer
  • Business Logic Layer
  • Service Layer
  • Domain Model Layer
  • Data Access Layer

Load Balancer

Load balancer allows distributing traffic among groups of resources so that a single resource is not overloaded. These load balancers also monitors health of servers and you can setup a load balancer for each group of resources to ensure that requests are not routed to unhealthy or unavailable resources.

Load Shedding

Load shedding allows rejection the work at the edge when server side exceeds its capacity, e.g. a server may return HTTP 429 error to signal clients that they can retry at a slower rate.

Loosely coupled dependencies

Using queuing systems, streaming systems, and workflows isolate behavior of dependent components and increases resiliency with asynchronous communication.

MicroServices

Microservices evolved from service oriented architecture (SOA) and support both point to point protocols such as REST/gRPC and asynchronous protocols based on messaging/event bus. You can apply bounded-context of domain-driven design (DDD) to design loosely coupled services.

Model-View Controller

It decouples user interface from the data model and application functionality so that each component can be independently tested. Other variations of this pattern include model–view–presenter (MVP) and model–view–viewmodel (MVVM).

NoSQL

NoSQL database technology provide support for high availability and variable/write-heavy workloads that can be easily scaled with additional hardware. NoSQL optimizes CAP and PACELC tradeoffs of consistency, availability, partition tolerance and latency, A number of cloud vendors provide managed NoSQL database solutions, however they can create latency issues if services accessing these databases are not colocated.

No Single Point of Failure

In order to eliminate single-points of failures for providing high availability and failover, you can deploy redundant services to multiple regions and availability zones.

Ports and Adapters

Ports and Adapters (Hexagon) separates interface (ports) from implementation (adapters). The business logic is encapsulated in the Hexagon that is invoked by the implementation (adapters) when actors operate on capabilities offered by the interface (port).

Rate Limiting and Throttling

Rate-limiting defines the rate at which clients can access the services based on the license policy. The throttling can be used to restrict access as a result of unexpected increase in demand. For example, the server can return HTTP 429 to notify clients that they can backoff or retry at a slower rate.

Retries with Backoff and Jitter

A remote operation can be retried if it fails due to transient failure or a server overload, however each retry should use a capped exponential backoff so that retries don’t cause additional load on the server. In a layered architecture, retry should be performed at a single point to minimize multifold retries. Retries can use circuit-breakers and rate-limiting to throttle requests. In some cases, requests may timeout for clients but succeed on the server side so the APIs must be designed with idempotency so that they are safe to retry. In order to avoid retries at the same time, a small random jitter can be added with retries.

Rollbacks

The software should be designed with rollbacks in mind so that all code, database schema and configurations can be easily rolled back. A production environment might be running multiple versions of same service so care must be taken to design the APIs that are both backward and forward compatibles.

Stateless and Shared nothing

Shared nothing architecture helps building stateless and loosely decoupled services that can be easily horizontally scaled for providing high availability. This architecture allows recovering from isolated failures and support auto-scaling by shrinking or expanding resources based on the traffic patterns.

Startup dependencies

Upon start of services, they may need to connect to certain configuration or bootstrap services so care must be taken to avoid thundering herd problems that can overwhelm those dependent services in the event of a wide region outage.

Timeouts

Timeouts help building resilient systems by throttling invocation of external services and preventing the thundering herd problem. Timeouts can also be used when retrying a failed operation after a transient failure or a server overload. A timeout can also add a small jitter to randomly spread the load on the server. Jitter can also be applied to timers of scheduled jobs or delayed work.

Watchdogs and Alerts

A watchdogs monitors a system component for a specific action such as latency, traffic, errors, saturation and SLOs. It then send an alert based on the monitoring configuration that triggers an email, on-call paging or an escalation.

Virtualization and Containers

Virtualization allows abstracting computing resources using virtual machines or containers so that you don’t depend on physical implementation. A virtual machine is a complete operating system on top of hypervisors whereas container is an isolated, lightweight environment for running applications. Virtualization allows building immutable infrastructure that are specially design to meet application requirements and can be easily deployed on a variety of hardware resources.

Architecture Practices

Following are best practices for sustainable software delivery:

Automation

Automation builds pipelines for continuous integration, continuous testing and continuous delivery to improve speed and agility of the software delivery. Any kind of operation procedures for deployment and monitoring can be stored in version control and then automatically applied with CI/CD procedures. In addition, automated procedures can be defined to track failures based on key performance indicators and trigger recovery or repair for the erroneous components.

Automated Testing

Automated testing allows building software with a suite of unit, integration, functional, load and security tests that verify the behavior and ensures that it can meet production demand. These automated tests will run as part of CI/CD pipelines and will stop deployment if any of the tests fail. In order to run end-to-end and load tests, the deployment scripts will create a new environment and setup a tests data. These tests may replicate synthetic transactions based on production traffic and benchmark the performance metrics.

Capacity Planning

Using load testing, monitoring production traffic patterns and demand with workload utilization help forecast the resources needed for future growth. This can be further strengthened with a capacity model that calculates unit-price of resources and growth forecast so that you can automate addition or removal of resources based on demand.

Cloud Computing

Adopting cloud computing simplifies resource provisioning and its elasticity allows organizations to grow or shrink those resources based on the demand. You can also add automation to optimize utilization of the resources and reduce costs when allocating more resources.

Continuous Delivery

Continuous delivery automates production deployment of small and frequent changes by developers. Continuous delivery relies on continuous integration that runs automated tests and automated deployment without any manual interventions. During a software development process, a developer picks a feature, works on changes and then commits changes to the source control after peer code-review. The automated build system will run a pipeline to create a container image based on the commit and then deploy it to a test or QA environment. The test environment will run automated unit, integration and regression tests using a test data in the database. The code is then promoted to the main branch and the automated build system tags and build the image on the head commit of main-branch, which that is pushed to the container registry. The pre-prod environment pulls the image, restarts the pre-prod container and runs more comprehensive tests with a larger set of test data in the database including performance tests. You may need multiple stages of pre-prod deployment such as alpha, beta and gamma environments, where each environment may require deployment to a unique datacenter. After successful testing, the production systems are updated with the new image using rolling updates, blue/green deployments or canary deployments to minimize disruption to end users. The monitoring system watches for error rates at each stage of the deployment and automatically rollbacks changes if a problem occurs.

Deploy over Multiple Zones and Regions

In order to provide high availability, compliance and reduced latency, you can deploy to multiple availability zones and regions. Global load balancers can be used to route traffic based on geographic proximity to the closest region. This also helps implementing business continuity as applications can easily failover to another region with minimal data.

Service Mesh

In order to easily build distributed systems, a number of platforms based on service-mesh pattern have emerged to abstract a common set of problems such as network communication, security, observability, etc:

Dapr – Distributed Application Runtime

The Distributed Application Runtime (Dapr) provides a variety of communication protocols, encryption, observability and secret management for building secured and resilient distributed services.

Envoy

Envoy is a service proxy for building cloud native application with builtin support for networking protocols and observability.

Istio service mesh

Istio is built on top of Kubernetes and Envoy to build service mesh with builtin support for networking, traffic management, observability and security. A service mesh also addresses features such as A/B testing, canary deployments, rate limiting, access control, encryption, and end-to-end authentication.

Linkerd

Linkerd is a service mesh for Kubernetes and consists of control-plane and data-plane with builtin support for networking, observability and security. The control-plane allows controlling services and data-plane acts as a sidecar container that handles network traffic and communicate with the control-plane for configuration.

WebAssembly

The WebAssembly is a stack-based virtual machine that can run at the edge or in cloud. A number of WebAssembly platforms have adopted Actor model to build a platform for writing distributed applications such as wasmCloud and Lunatic.

Documentation

The architecture document defines goals and constraints of the software system and provides various perspectives such as use-cases, logical, data, processes, and physical deployment. It also includes non-functional or quality attributes such as performance, growth, scalability, etc. You can document these aspects using standards such as 4+1, C4, and ERD as well as document the broader enterprise architecture using methodologies like TOGAF, Zachman, and EA.

Incident management

Incident management defines process of root-cause analysis and actions that organization can take when an incident occurs affecting production environment. It defines best practices such as clear ownership, reducing time to detect/mitigate, blameless postmortems and prevention measures. The organization can then implement preventing measures and share lessons learned from all operational events and failures across teams. You can also use pre-mortem to identify potential areas that can be improved or mitigated. Another way to simulate potential problems is using chaos engineering or setting up game days to test the workloads for various scenarios and outage.

Infrastructure as Code

Infrastructure as code uses declarative language to define development, test and production environment, which is managed by the source code management software. These provisioning and configuration logic can be used by CI/CD pipelines to automatically deploy and test environments. Following is a list of frameworks for building infrastructure from code:

Azure Resource Manager

Azure cloud offer Azure Resource Manager (ARM) templates based on JSON format to declaratively define the infrastructure that you intend to deploy.

AWS Cloud Development Kit

The Cloud Development Kit (CDK) supports high-level programming languages to construct cloud resources on Amazon Web Services so that you can easily build cloud applications.

Hashicorp Terraform

Terraform uses HCL based configurations to describe computing resources that can be deployed to multiple cloud providers.

Monitoring

Monitoring measures key performance indicators (KPI) and service-level objectives (SLO) that are defined at the infrastructure, applications, services and end-to-end levels. These include both business and technical metrics such as number of errors, hot spots, call graphs, which are visible to the entire team for monitoring trends and reacting quickly to failures.

Multi-tenancy

If your system is consumed by a different groups or tenants of users, you will need to design your system and services so that it isolates data and computing resources for secure and reliable fashion. Each layer of the system can be designed to treat tenant context as a first-class construct, which is tied to the user identity. You can capture usage metrics per tenant to identify bottlenecks, estimate cost and analyze the resource utilization for capacity planning and growth projections. The operational dashboards can also use these metrics to construct tenant-based operational views and proactively respond to unexpected load.

Security Review

In order to minimize the security risk, the development teams can adopt shift-left on security and DevSecOps practices to closely collaborate with the InfoSec team and integrate security review into every phase of the software development lifecycle.

Version Control Systems

Version control systems such as Git or Mercurial help track code changes, configurations and scripts over time. You can adopt workflows such as gitflow or trunk-based development for check-in process. Other common practices include smaller commits, testing code and running static analysis or linters/profiling tools before checkin.

Summary

The software complexity is a major reason for missed deadlines and slow/buggy software. This complexity can be essential complexity within the business domain but it’s often result of accidental complexity as a result of technical debt, poor architecture and development practices. Another source of incidental complexity comes from distributed computing where you need handle security, rate-limiting, observability, etc. that needs to be applied consistently across the distributed systems. For example, virtualization helps building immutable infrastructures and adopting infrastructure as a code; functions as a service simplifies building micro-services; and distributed platforms such as Istio, Linkerd remove a lot of cruft such as security, observability, traffic management and communication protocols when building distributed systems. The goal of a good architecture is to simplify building, testing, deploying and operating a software. You need to continually improve the systems architecture and its practices to build sustainable software delivery pipelines that can meet both current and future demands of users.

March 24, 2021

Task Scheduling Algorithms

Filed under: Computing,Technology — admin @ 8:16 pm

A task is defined as a unit of work and a task scheduler is responsible for selecting best task for execution based on various criteria. The criteria for selecting best scheduling policy includes:

  • Maximize resource utilization such as CPU or Memory but not exceeding a certain limit such as 80%.
  • Maximize throughput such as tasks per seconds.
  • Minimize turnaround or wall time from task submission to the completion time.
  • Minimize waiting time in ready queue before executing the task.

Above criteria assumes that pending tasks will be stored in a bounded ready-queue before execution so that no new tasks will be stored when the queue reaches the maximum capacity. In this context, the task is more coarse grained and executes to the completion as compared to CPU scheduling that may use preemptive or cooperative scheduling with context switching to dispatch different processes for execution.

Following is a list of common scheduling algorithms:

First-Come First-Serve (FCFS)

This algorithm simply uses FIFO queue to process the tasks in the order they are queued. On the downside, a long-running task can block other tasks and yield very large average wait times in the queue.

Shortest Job-First (SJF)

This algorithm picks smallest job that needs to be done and then next smallest job. It results in best performance, however it may not be possible to predict runtime of a job before the execution. You may need to pass hints about the job runtime or use historical data to predict the job runtime with exponential average such as:

NewEstimatedRuntime = alpha * PreviousActualRuntime + (1.0 - alpha) * PreviousEstimatedRuntime

where alpha is between 0.0 and 1.0 and represents a weighting factor for the relative importance of recent data compared to older data, e.g. alpha with 0 value ignores previous actual time and 1.0 ignores past history of estimates.

Priority Scheduling

This algorithm picks highest priority job that needs to be done and then next highest priority job. It may result in starvation of low-priority tasks and they may wait indefinitely. You can fix this drawback by supporting aging where priority of old tasks is slowly bumped.

Earliest Deadline

This algorithm gives highest priority to the task with the earliest deadline, which is then used to pick the next task to execute. This scheduler improves resource utilization based on the estimated runtime and data requirements.

Speculative Scheduler

The speculative scheduler detects slow running task and starts another instance of the task as a backup to minimize the response time. This generally works for short jobs with homogeneous resources but it doesn’t guarantee complete reliability.

Multilevel Queues

This algorithm allows categorizing tasks and then dispatching the task to the queue based on the category, e.g. you may define distinct queues for small/medium/large tasks based on runtime or definer queues for interactive/background/system/batch tasks. In some cases, a task may need to be jumped from one queue to another based on runtime characteristics of the task and you can use aging and priority inversion to promote low-priority tasks.

Resource aware scheduler

This scheduler tracks provisioned resources and their utilization required by the tasks. The scheduler may simply allocate resources they need when scheduling the tasks or use admission-control algorithm to prevent certain tasks to start until the required resources are available.

Matchmaking scheduler

This scheduler uses affinity based scheduling so that it routes the tasks to workers or nodes where the data resides locally and provide greater data locality.

Delay scheduler

This scheduler waits until the data required by the task is available on the node or worker where task will run to improve data locality. However, in order to prevent starvation, it allows scheduling tasks to another worker or node.

Capacity scheduler

This scheduler shares system resources among tasks where tasks specify the required resources such as memory and CPU. The scheduler tracks resource utilization and allocates resources specified by the tasks.

Fair scheduler

This scheduler ensures that common resources such as CPU, memory, disk and I/O are shared fairly among different tasks.

References



November 17, 2020

Structured Concurrency in modern programming languages – Part-IV

Filed under: Computing,Kotlin,Languages,Swift,Technology — admin @ 12:58 pm

In this fourth part of the series on structured concurrency (Part-I, Part-II, Part-III, Swift-Followup), I will review Kotlin and Swift languages for writing concurrent applications and their support for structured concurrency:

Kotlin

Kotlin is a JVM language that was created by JetBrains with improved support of functional and object-oriented features such as extension functions, nested functions, data classes, lambda syntax, etc. Kotlin also uses optional types instead of null references similar to Rust and Swift to remove null-pointer errors. Kotlin provides native OS-threads similar to Java and coroutines with async/await syntax similar to Rust. Kotlin brings first-class support for structured concurrency with its support for concurrency scope, composition, error handling, timeout/cancellation and context for coroutines.

Structured Concurrency in Kotlin

Kotlin provides following primitives for concurrency support:

suspend functions

Kotlin adds suspend keyword to annotate a function that will be used by coroutine and it automatically adds continuation behavior when code is compiled so that instead of return value, it calls the continuation callback.

Launching coroutines

A coroutine can be launched using launch, async or runBlocking, which defines scope for structured concurrency. The lifetime of children coroutines is attached to this scope that can be used to cancel children coroutines. The async returns a Deferred (future) object that extends Job. You use await method on the Deferred instance to get the results.

Dispatcher

Kotlin defines CoroutineDispatcher to determine thread(s) for running the coroutine. Kotlin provides three types of dispatchers: Default – that are used for long-running asynchronous tasks; IO – that may use IO/network; and Main – that uses main thread (e.g. on Android UI).

Channels

Kotlin uses channels for communication between coroutines. It defines three types of channels: SendChannel, ReceiveChannel, and Channel. The channels can be rendezvous, buffered, unlimited or conflated where rendezvous channels and buffered channels behave like GO’s channels and suspend send or receive operation if other go-routine is not ready or buffer is full. The unlimited channel behave like queue and conflated channel overwrites previous value when new value is sent. The producer can close send channel to indicate end of work.

Using async/await in Kotlin

Following code shows how to use async/await to build the toy web crawler:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerWithAsync(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls using async/await
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()

        withTimeout(timeout) {
            val jobs = mutableListOf<Deferred<Int>>()
            for (u in urls) {
                jobs.add(async {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                })
            }
            for (j in jobs) {
                j.await()
            }
        }
        return res.completed(size.get())
    }
}

In above example, CrawlerWithAsync class defines timeout parameter for crawler. The crawl function takes list of URLs to crawl and defines high-level scope of concurrency using runBlocking. The private crawl method is defined as suspend so that it can be used as continuation. It uses async with timeout to start background tasks and uses await to collect results. This method recursively calls handleCrawl to crawl child URLs.

Following unit tests show how to test above crawl method:

package concurrency

import org.junit.Test
import org.slf4j.LoggerFactory
import kotlin.test.assertEquals

class CrawlerAsynTest {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutinesTest::class.java)
    val urls = listOf("a.com", "b.com", "c.com", "d.com", "e.com", "f.com",
            "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com")

    @Test
    fun testCrawl() {
        val crawler = CrawlerWithAsync(4, 1000L)
        val started = System.currentTimeMillis()
        val res = crawler.crawl(urls);
        val duration = System.currentTimeMillis() - started
        logger.info("CrawlerAsync - crawled %d urls in %d milliseconds".format(res.childURLs, duration))
        assertEquals(19032, res.childURLs)
    }

    @Test(expected = Exception::class)
    fun testCrawlWithTimeout() {
        val crawler = CrawlerWithAsync(1000, 100L)
        crawler.crawl(urls);
    }
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/kot_pool.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • Kotlin supports cancellation and timeout APIs and the crawl method will fail with timeout error if crawling exceeds the time limit.
  • The crawl method captures error from async response and returns so that client code can perform error handling.
  • The async syntax in Kotlin allows easy composition of asynchronous code.
  • Kotlin allows customized dispatcher for more control on the asynchronous behavior.

Following are shortcomings using this approach for structured concurrency and general design:

  • As Kotlin doesn’t enforce immutability by default, you will need synchronization to protect shared state.
  • Async/Await support is still new in Kotlin and lacks stability and proper documentation.
  • Above design creates a new coroutine for crawling each URL and it can strain expensive network and IO resources so it’s not practical for real-world implementation.

Using coroutines in Kotlin

Following code uses coroutine syntax to implement the web crawler:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerWithCoroutines(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls using coroutines
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()
        withTimeout(timeout) {
            for (u in urls) {
                coroutineScope {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                }
            }
        }
        return res.completed(size.get())
    }
}

Above example is similar to async/await but uses coroutine syntax and its behavior is similar to async/await implementation.

Following example shows how async coroutines can be cancelled:

package concurrency

import concurrency.domain.Request
import concurrency.domain.Response
import concurrency.utils.CrawlerUtils
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger

class CrawlerCancelable(val maxDepth: Int, val timeout: Long) : Crawler {
    private val logger = LoggerFactory.getLogger(CrawlerWithCoroutines::class.java)
    val crawlerUtils = CrawlerUtils(maxDepth)

    // public method for crawling a list of urls to show cancel operation
    // internal method will call cancel instead of await so this method will
    // fail.
    override fun crawl(urls: List<String>): Response {
        var res = Response()
        // Boundary for concurrency and it will not return until all
        // child URLs are crawled up to MAX_DEPTH limit.
        runBlocking {
            res.childURLs = crawl(urls, 0).childURLs
        }
        return res
    }

    ////////////////// Internal methods
    suspend private fun crawl(urls: List<String>, depth: Int): Response {
        var res = Response()
        if (depth >= maxDepth) {
            return res.failed("Max depth reached")
        }
        var size = AtomicInteger()

        withTimeout(timeout) {
            val jobs = mutableListOf<Deferred<Int>>()
            for (u in urls) {
                jobs.add(async {
                    val childURLs = crawlerUtils.handleCrawl(Request(u, depth))
                    // shared
                    size.addAndGet(crawl(childURLs, depth + 1).childURLs + 1)
                })
            }
            for (j in jobs) {
                j.cancel()
            }
        }
        return res.completed(size.get())
    }
}

You can download above examples from https://github.com/bhatti/concurency-katas/tree/main/kot_pool.

Swift

Swift was developed by Apple to replace Objective-C and offer modern features such as closures, optionals instead of null-pointers (similar to Rust and Kotlin), optionals chaining, guards, value types, generics, protocols, algebraic data types, etc. It uses same runtime system as Objective-C and uses automatic-reference-counting (ARC) for memory management, grand-central-dispatch for concurrency and provides integration with Objective-C code and libraries.

Structured Concurrency in Swift

I discussed concurrency support in Objective-C in my old blog [1685] such as NSThread, NSOperationQueue, Grand Central Dispatch (GCD), etc and since then GCD has improved launching asynchronous tasks using background queues with timeout/cancellation support. However, much of the Objective-C and Swift code still suffers from callbacks and promises hell discussed in Part-I. Chris Lattner and Joe Groff wrote a proposal to add async/await and actor-model to Swift and provide first-class support for structured concurrency. As this work is still in progress, I wasn’t able to test it but here are major features of this proposal:

Coroutines

Swift will adapt coroutines as building blocks of concurrency and asynchronous code. It will add syntactic sugar for completion handlers using async or yield keywords.

Async/Await

Swift will provide async/await syntactic sugar on top of coroutines to mark asynchronous behavior. The async code will use continuations similar to Kotlin so that it suspends itself and schedules execution by controlling context. It will use Futures (similar to Deferred in Kotlin) to await for the results (or errors). This syntax will work with normal error handling in Swift so that errors from asynchronous code are automatically propagated to the calling function.

Actor model

Swift will adopt actor-model with value based messages (copy-on-write) to manage concurrent objects that can receive messages asynchronously and the actor can keep internal state and eliminate race conditions.

Kotlin and Swift are very similar in design and both have first-class support of structured concurrency such as concurrency scope, composition, error handling, cancellation/timeout, value types, etc. Both Kotlin and Swift use continuation for async behavior so that async keyword suspends the execution and passes control to the execution context so that it can be executed asynchronously and control is passed back at the end of execution.

Structured Concurrency Comparison

Following table summarizes support of structured concurrency discussed in this blog series:

FeatureTypescript (NodeJS)ErlangElixirGORustKotlinSwift
Structured scopeBuilt-inmanuallymanuallymanuallyBuilt-inBuilt-inBuilt-in
Asynchronous CompositionYesNoNoNoYesYesYes
Error HandlingNatively using ExceptionsManually storing errors in ResponseManually storing errors in ResponseManually storing errors in ResponseManually using Result ADTNatively using ExceptionsNatively using Exceptions
CancellationCooperative CancellationBuilt-in Termination or Cooperative CancellationBuilt-in Termination or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative CancellationBuilt-in Cancellation or Cooperative Cancellation
TimeoutNoYesYesYesYesYesYes
Customized Execution ContextNoNoNoNoNoYesYes
Race ConditionsNo due to NodeJS architectureNo due to Actor modelNo due to Actor modelPossible due to shared stateNo due to strong ownershipPossible due to shared statePossible due to shared state
Value TypesNoYesYesYesYesYesYes
Concurrency paradigmsEvent loopActor modelActor modelGo-routine, CSP channelsOS-Thread, coroutineOS-Thread, coroutine, CSP channelsOS-Thread,
GCD queues, coroutine, Actor model
Type CheckingStaticDynamicDynamicStatic but lacks genericsStrongly static types with genericsStrongly static types with genericsStrongly static types with generics
Suspends Async code using Continuations NoNoNoNoYesYesYes
Zero-cost based abstraction ( async)NoNoNoNoYesNoNo
Memory ManagementGCGC
(process-scoped)
GC (process-scoped)GC(Automated) Reference counting, BoxingGCAutomated reference counting

Performance Comparison

Following table summarizes runtime of various implementation of web crawler when crawling 19K URLs that resulted in about 76K messages to asynchronous methods/coroutines/actors discussed in this blog series:

LanguageDesignRuntime (secs)
TypescriptAsync/Await0.638
ErlangSpawning Process4.636
ErlangPMAP4.698
ElixirSpawning OTP Children43.5
ElixirTask async/await187
ElixirWorker-pool with queue97
GOGo-routine/channels1.2
RustAsync/Await4.3
KotlinAsync/Await0.736
KotlinCoroutine0.712
SwiftAsync/Await63
SwiftActors/Async/Await65
Note: The purpose of above results was not to run micro-benchmarks but to show rough cost of spawning thousands of asynchronous tasks.

Summary

Overall, Typescript/NodeJS provides a simpler model for concurrency but lacks proper timeout/cancellation support and it’s not suitable for highly concurrent applications that require blocking APIs. The actor based concurrency model in Erlang/Elixir supports high-level of concurrency, error handling, cancellation and isolates process state to prevent race conditions but it lacks composing asynchronous behavior natively. Though, you can compose Erlang processes with parent-child hierarchy and easily start or stop these processes. GO supports concurrency via go-routines and channels with built-in cancellation and timeout APIs but GO statements are considered harmful by structured concurrency and it doesn’t protect against race conditions due to mutable state. Erlang and GO are the only languages that were designed from ground-up with support for actors and coroutines and their schedulers have strong support for asynchronous IO and non-blocking APIs. Erlang also offers process-scoped garbage collection to clean up related data easily as opposed to global GC in other languages. The async/await support in Rust is still immature and lacks proper support of cancellation but strong ownership properties of Rust eliminate race conditions and allows safe concurrency. Rust, Kotlin and Swift uses continuation for async/await that allows composition for multiple async/await chained together. For example, instead of using await (await download()).parse() in Javascript/Typescript/C#, you can use await download().parse(). The async/await changes are still new in Kotlin and lack stability whereas Swift has not yet released these changes as part of official release. As Kotlin, Rust, and Swift built coroutines or async/await on top of existing runtime and virtual machine, their green-thread schedulers are not as optimal as schedulers in Erlang or GO and may exhibit limitations on concurrency and scalability.

Finally, structured concurrency helps your code structure with improved data/control flow, concurrency scope, error handling, cancellation/timeout and composition but it won’t solve data races if multiple threads/coroutines access mutable shared data concurrently so you will need to rely on synchronization mechanisms to protect the critical section.

Note: You can download all examples in this series from https://github.com/bhatti/concurency-katas.

November 10, 2020

Structured Concurrency in modern programming languages – Part-III

Filed under: Computing,Languages,Technology — admin @ 4:23 pm

In this third part of the series on structured concurrency (Part-I, Part-II, Part-IV, Swift-Followup), I will review GO and Rust languages for writing concurrent applications and their support for structured concurrency:

GO

GO language was created by Rob Pike, and Ken Thompson and uses light-weight go-routines for asynchronous processing. Go uses channels for communication that are designed after Tony Hoare’s rendezvous style communicating sequential processes (CSP) where the sender cannot send the message until receiver is ready to accept it. Though, GO supports buffering for channels so that sender/receiver don’t have to wait if buffer is available but channels are not designed to be used as mailbox or message queue. The channels can be shared by multiple go-routines and the messages can be transmitted by value or by reference. GO doesn’t protect against race conditions and shared state must be protected when it’s accessed in multiple go-routines. Also, if a go-routine receives a message by reference, it must be treated as transfer of ownership otherwise it can lead to race conditions. Also, unlike Erlang, you cannot monitor lifetime of other go-routines so you won’t be notified if a go-routine exits unexpectedly.

Following is high-level architecture of scheduling and go-routines in GO process:

Using go-routines/channels in GO

GO doesn’t support async/await syntax but it can be simulated via go-routine and channels. As the cost of each go-routine is very small, you can use them for each background task.

Following code shows how to use go-routines and channels to build the toy web crawler:

package async

import (
	"context"
	"fmt"
	"time"
)

// type of async function
type Handler func(ctx context.Context, request interface{}) (interface{}, error)

// type of abortHandler function that is called if async operation is cancelled
type AbortHandler func(ctx context.Context, request interface{}) (interface{}, error)

func NoAbort(ctx context.Context, request interface{}) (interface{}, error) {
	return nil, nil
}

// Awaiter - defines method to wait for result
type Awaiter interface {
	Await(ctx context.Context, timeout time.Duration) (interface{}, error)
	IsRunning() bool
}

// task - submits task asynchronously
type task struct {
	handler      Handler
	abortHandler AbortHandler
	request      interface{}
	resultQ      chan Response
	running      bool
}

// Response encapsulates results of async task
type Response struct {
	Result interface{}
	Err    error
}

// Execute executes a long-running function in background and returns a future to wait for the response
func Execute(
	ctx context.Context,
	handler Handler,
	abortHandler AbortHandler,
	request interface{}) Awaiter {
	task := &task{
		request:      request,
		handler:      handler,
		abortHandler: abortHandler,
		resultQ:      make(chan Response, 1),
		running:      true,
	}
	go task.run(ctx) // run handler asynchronously
	return task
}

// IsRunning checks if task is still running
func (t *task) IsRunning() bool {
	return t.running
}

// Await waits for completion of the task
func (t *task) Await(
	ctx context.Context,
	timeout time.Duration) (result interface{}, err error) {
	result = nil
	select {
	case <-ctx.Done():
		err = ctx.Err()
	case res := <-t.resultQ:
		result = res.Result
		err = res.Err
	case <-time.After(timeout):
		err = fmt.Errorf("async task timedout %v", timeout)
	}
	if err != nil {
		go t.abortHandler(ctx, t.request) // abortHandler operation
	}
	return
}

// AwaitAll waits for completion of multiple tasks
func AwaitAll(
	ctx context.Context,
	timeout time.Duration,
	all ...Awaiter) []Response {
	ctx, cancel := context.WithTimeout(ctx, timeout)
	defer cancel()
	results := make([]Response, 0)
	for _, next := range all {
		res, err := next.Await(ctx, timeout)
		results = append(results, Response{Result: res, Err: err})
	}
	return results
}

////////////////////////////////////// PRIVATE METHODS ///////////////////////////////////////
func (t *task) run(ctx context.Context) {
	go func() {
		result, err := t.handler(ctx, t.request)
		t.resultQ <- Response{Result: result, Err: err} // out channel is buffered by 1
		t.running = false
		close(t.resultQ)
	}()
}

In above example, Async method takes a function to invoke in background and creates a channel for reply. It then executes the function and sends back reply to the channel. The client uses the future object return by Async method to wait for the response. The Await method provides timeout to specify the max wait time for response. Note: The Await method listens to ctx.Done() in addition to the response channel that notifies it if client canceled the task or if it timed out by high-level settings.

Following code shows how crawler can use these primitives to define background tasks for crawler:

package crawler

import (
	"context"
	"errors"
	"sync/atomic"
	"time"

	"plexobject.com/crawler/async"
	"plexobject.com/crawler/domain"
	"plexobject.com/crawler/utils"
)

// MaxDepth max depth of crawling
const MaxDepth = 4

// MaxUrls max number of child urls to crawl
const MaxUrls = 11

// Crawler is used for crawing URLs
type Crawler struct {
	crawlHandler      async.Handler
	downloaderHandler async.Handler
	rendererHandler   async.Handler
	indexerHandler    async.Handler
	totalMessages     uint64
}

// New Instantiates new crawler
func New(ctx context.Context) *Crawler {
	crawler := &Crawler{totalMessages: 0}
	crawler.crawlHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		req := payload.(*domain.Request)
		return crawler.handleCrawl(ctx, req)
	}
	crawler.downloaderHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		// TODO check robots.txt and throttle policies
		// TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
		return utils.RandomString(100), nil
	}
	crawler.rendererHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		// for SPA apps that use javascript for rendering contents
		return utils.RandomString(100), nil
	}
	crawler.indexerHandler = func(ctx context.Context, payload interface{}) (interface{}, error) {
		return 0, nil
	}
	return crawler
}

// Crawl - crawls list of URLs with specified depth
func (c *Crawler) Crawl(ctx context.Context, urls []string, timeout time.Duration) (int, error) {
	// Boundary for concurrency and it will not return until all
	// child URLs are crawled up to MaxDepth limit.
	return c.crawl(ctx, urls, 0, timeout)
}

// TotalMessages - total number of messages processed
func (c *Crawler) TotalMessages() uint64 {
	return c.totalMessages
}

// handles crawl
func (c *Crawler) handleCrawl(ctx context.Context, req *domain.Request) (*domain.Result, error) {
	atomic.AddUint64(&c.totalMessages, 1)
	timeout := time.Duration(req.Timeout * time.Second)
	res := domain.NewResult(req)
	if contents, err := async.Execute(ctx, c.downloaderHandler, async.NoAbort, req.URL).Await(ctx, timeout); err != nil {
		res.Failed(err)
	} else {
		if newContents, err := async.Execute(ctx, c.rendererHandler, async.NoAbort, [...]string{req.URL, contents.(string)}).Await(ctx, timeout); err != nil {
			res.Failed(err)
		} else {
			if hasContentsChanged(ctx, req.URL, newContents.(string)) && !isSpam(ctx, req.URL, newContents.(string)) {
				async.Execute(ctx, c.indexerHandler, async.NoAbort, [...]string{req.URL, newContents.(string)}).Await(ctx, timeout)
				urls := parseURLs(ctx, req.URL, newContents.(string))
				if childURLs, err := c.crawl(ctx, urls, req.Depth+1, req.Timeout); err != nil {
					res.Failed(err)
				} else {
					res.Succeeded(childURLs + 1)
				}
			} else {
				res.Failed(errors.New("contents didn't change"))
			}
		}
	}

	return res, nil
}

/////////////////// Internal private methods ///////////////////////////
// Crawls list of URLs with specified depth
func (c *Crawler) crawl(ctx context.Context, urls []string, depth int, timeout time.Duration) (int, error) {
	if depth < MaxDepth {
		futures := make([]async.Awaiter, 0)
		for i := 0; i < len(urls); i++ {
			futures = append(futures, async.Execute(ctx, c.crawlHandler, async.NoAbort, domain.NewRequest(urls[i], depth, timeout)))
		}
		sum := 0
		var savedError error
		for i := 0; i < len(futures); i++ {
			res, err := futures[i].Await(ctx, timeout)
			if err != nil {
				savedError = err // returning only a single error
			}
			if res != nil {
				sum += res.(*domain.Result).ChildURLs
			}
		}

		return sum, savedError
	}
	return 0, nil
}

func parseURLs(ctx context.Context, url string, contents string) []string {
	// tokenize contents and extract href/image/script urls
	urls := make([]string, 0)
	for i := 0; i < MaxUrls; i++ {
		urls = append(urls, utils.RandomChildUrl(url))
	}
	return urls
}

func hasContentsChanged(ctx context.Context, url string, contents string) bool {
	return true
}

func isSpam(ctx context.Context, url string, contents string) bool {
	return false
}

In above implementation, crawler defines background tasks for crawling, downloading, rendering and indexing. The Crawl defines concurrency boundary and waits until all child tasks are completed. Go provides first class support for cancellation and timeout via context.Context, but you have to listen special ctx.Done() channel.

Following unit tests show examples of cancellation, timeout and normal processing:

package crawler

import (
	"context"
	"log"
	"testing"
	"time"
)

const EXPECTED_URLS = 19032

func TestCrawl(t *testing.T) {
	rootUrls := []string{"https://a.com", "https://b.com", "https://c.com", "https://d.com", "https://e.com", "https://f.com", "https://g.com", "https://h.com", "https://i.com", "https://j.com", "https://k.com", "https://l.com", "https://n.com"}
	started := time.Now()
	timeout := time.Duration(8 * time.Second)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	crawler := New(ctx)
	received, err := crawler.Crawl(ctx, rootUrls, timeout)
	elapsed := time.Since(started)
	log.Printf("Crawl took %s to process %v messages -- %v", elapsed, received, crawler.TotalMessages())
	if crawler.totalMessages != EXPECTED_URLS {
		t.Errorf("Expected %v urls but was %v", EXPECTED_URLS, crawler.totalMessages)
	}
	if err != nil {
		t.Errorf("Unexpected error %v", err)
	} else if EXPECTED_URLS != received {
		t.Errorf("Expected %v urls but was %v", EXPECTED_URLS, received)
	}
}

func TestCrawlWithTimeout(t *testing.T) {
	started := time.Now()
	timeout := time.Duration(4 * time.Millisecond)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	crawler := New(ctx)
	received, err := crawler.Crawl(ctx, []string{"a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"}, timeout)
	if err == nil {
		t.Errorf("Expecting timeout error")
	}
	elapsed := time.Since(started)
	log.Printf("Timedout took %s to process %v messages -- %v - %v", elapsed, received, crawler.TotalMessages(), err)
}

func TestCrawlWithCancel(t *testing.T) {
	started := time.Now()
	timeout := time.Duration(3 * time.Second)
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	crawler := New(ctx)
	var err error
	var received int
	go func() {
		// calling asynchronously
		received, err = crawler.Crawl(ctx, []string{"a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"}, timeout)
	}()
	time.Sleep(5 * time.Millisecond)
	cancel()
	time.Sleep(50 * time.Millisecond)
	if err == nil {
		t.Errorf("Expecting cancel error")
	}
	elapsed := time.Since(started)
	log.Printf("Cancel took %s to process %v messages -- %v - %v", elapsed, received, crawler.TotalMessages(), err)
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/go_pool.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main Crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • Go supports cancellation and timeout APIs and the Crawl method passes timeout parameter so that the crawling all URLs must complete with the time period.
  • The Crawl method captures error from async response and returns so that client code can perform error handling.

Following are shortcomings using this approach for structured concurrency and general design:

  • You can’t monitor life-time of go-routines and you won’t get any errors if background task dies unexpectedly.
  • The cancellation API returns without cancelling underlying operation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Go doesn’t support specifying execution context for go-routines and all asynchronous code is automatically scheduled by GO (G0 go-routines).
  • GO go-routines are not easily composable because they don’t have any parent/child relationship as opposed to async methods that can invoke other async methods in Typescript, Rust or other languages supporting async/await.
  • As Go doesn’t enforce immutability so you will need mutex to protect shared state. Also, mutex implementation in GO is not re-entrant aware so you can’t use for any recursive methods where you are acquiring locks.
  • Above code creates a new go-routine for crawling each URL and though the overhead of each process is small but it may use other expensive resources such as network resource.

Using worker-pool in GO

As opposed to creating new go-routine, we can use worker-pool of go-routines to perform background tasks so that we can manage external resource dependencies easily.

Following code shows an implementation of worker-pool in GO:

package pool

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/google/uuid"
)

const BUFFER_CAPACITY = 2 // allow buffering to support asynchronous behavior  as by default sender will be blocked

type Handler func(ctx context.Context, payload interface{}) (interface{}, error)

type Awaiter interface {
	Await(ctx context.Context, timeout time.Duration) (interface{}, error)
}

// Request encapsulates request to process
type Request struct {
	id      string
	payload interface{}
	outQ    chan Result
}

// Result encapsulates results
type Result struct {
	id      string
	payload interface{}
	err     error
}

// Worker structure defines inbound channel to receive request and lambda function to execute
type Worker struct {
	id                   int
	handler              Handler
	workerRequestChannel chan *Request
}

// NewWorker creates new worker
func NewWorker(id int, handler Handler) Worker {
	return Worker{
		id:                   id,
		handler:              handler,
		workerRequestChannel: make(chan *Request),
	}
}

func (w Worker) start(ctx context.Context, workersReadyPool chan chan *Request, done chan bool) {
	go func(w Worker) {
		for {
			// register the current worker into the worker queue.
			workersReadyPool <- w.workerRequestChannel

			select {
			case <-ctx.Done():
				break
			case req := <-w.workerRequestChannel:
				payload, err := w.handler(ctx, req.payload)
				req.outQ <- Result{id: req.id, payload: payload, err: err} // out channel is buffered by 1
				close(req.outQ)
			case <-done:
				return
			}
		}
	}(w)
}

// WorkPool - pool of workers
type WorkPool struct {
	size                int
	workersReadyPool    chan chan *Request
	pendingRequestQueue chan *Request
	done                chan bool
	handler             Handler
}

// New Creates new async structure
func New(handler Handler, size int) *WorkPool {
	async := &WorkPool{
		size:                size,
		workersReadyPool:    make(chan chan *Request, BUFFER_CAPACITY),
		pendingRequestQueue: make(chan *Request, BUFFER_CAPACITY),
		done:                make(chan bool),
		handler:             handler}
	return async
}

// Start - starts up workers and internal goroutine to receive requests
func (p *WorkPool) Start(ctx context.Context) {
	for w := 1; w <= p.size; w++ {
		worker := NewWorker(w, p.handler)
		worker.start(ctx, p.workersReadyPool, p.done)
	}
	go p.dispatch(ctx)
}

// Add request to process
func (p *WorkPool) Add(ctx context.Context, payload interface{}) Awaiter {
	// Adding request to process
	req := &Request{id: uuid.New().String(), payload: payload, outQ: make(chan Result, 1)}
	go func() {
		p.pendingRequestQueue <- req
	}()
	return req
}

// Await for reply -- you can only call this once
func (r Request) Await(ctx context.Context, timeout time.Duration) (payload interface{}, err error) {
	select {
	case <-ctx.Done():
		err = errors.New("async_cancelled")
	case res := <-r.outQ:
		payload = res.payload
		err = res.err
	case <-time.After(timeout):
		payload = nil
		err = fmt.Errorf("async_timedout %v", timeout)
	}

	return
}

// Stop - stops thread pool
func (p *WorkPool) Stop() {
	close(p.pendingRequestQueue)
	go func() {
		p.done <- true
	}()
}

// Receiving requests from inbound channel and forward it to the worker's workerRequestChannel
func (p *WorkPool) dispatch(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-p.done:
			return
		case req := <-p.pendingRequestQueue:
			go func(req *Request) {
				// Find next ready worker
				workerRequestChannel := <-p.workersReadyPool
				// dispatch the request to next ready worker
				workerRequestChannel <- req
			}(req)
		}
	}
}

You can download above examples from https://github.com/bhatti/concurency-katas/tree/main/go_pool.

Rust

Rust was designed by Mozilla Research to provide better performance, type safety, strong memory ownership and safe concurrency. With its strong ownership and lifetime scope, Rust minimizes race conditions because each object can only have one owner that can update the value. Further, strong typing, traits/structured-types, abstinence of null references, immutability by default eliminates most of common bugs in the code.

Rust uses OS-threads for multi-threading but has added support for coroutines and async/await recently. Rust uses futures for asynchronous behavior but unlike other languages, it doesn’t provide runtime environment for async/await. Two popular runtime systems available for Rust are https://tokio.rs/ and https://github.com/async-rs/async-std. Also, unlike other languages, async/await in Rust uses zero-cost abstraction where async just creates a future without scheduling until await is invoked. The runtime systems such as async-std and tokio provides executor that polls future until it returns a value.

Following example shows how async/await can be used to implement

extern crate rand;
use std::{error::Error, fmt};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rand::Rng;
use rand::distributions::Alphanumeric;
use rand::seq::SliceRandom;
use futures::future::{Future, join_all, BoxFuture};
use futures::stream::{FuturesUnordered};
use futures::executor;
use async_std::{task, future};
use async_std::future::timeout;

const MAX_DEPTH: u8 = 4;
const MAX_URLS: u8 = 11;

// Request encapsulates details of url to crawl
#[derive(Debug, Clone, PartialEq)]
pub struct Request {
    pub url: String,
    pub depth: u8,
    pub timeout: Duration,
    pub created_at: u128,
}

impl Request {
    pub fn new(url: String, depth: u8, timeout: Duration) -> Request {
        let epoch = SystemTime::now().duration_since(UNIX_EPOCH).expect("epoch failed").as_millis();
        Request{url: url.to_string(), depth: depth, timeout: timeout, created_at: epoch}
    }
}

#[derive(Debug, Copy, Clone)]
pub enum CrawlError {
    Unknown,
    MaxDepthReached,
    DownloadError,
    ParseError,
    IndexError,
    ContentsNotChanged,
    Timedout,
}

impl Error for CrawlError {}

impl fmt::Display for CrawlError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            CrawlError::MaxDepthReached => write!(f, "MaxDepthReached"),
            CrawlError::DownloadError => write!(f, "DownloadError"),
            CrawlError::ParseError => write!(f, "ParseError"),
            CrawlError::IndexError => write!(f, "IndexError"),
            CrawlError::ContentsNotChanged => write!(f, "ContentsNotChanged"),
            CrawlError::Timedout=> write!(f, "Timedout"),
            CrawlError::Unknown => write!(f, "Unknown"),
        }
    }
}


//////// PUBLIC METHODS
// crawling a collection of urls
pub fn crawl(urls: Vec<String>, timeout_dur: Duration) -> Result<usize, CrawlError> {
    // Boundary for concurrency and it will not return until all
    // child URLs are crawled up to MAX_DEPTH limit.
    //
    match task::block_on(
        timeout(timeout_dur, async {
            do_crawl(urls, timeout_dur, 0)
        })
    ) {
        Ok(res) => res,
        Err(_err) => Err(CrawlError::Timedout),
    }
}

//////// PRIVATE METHODS
fn do_crawl(urls: Vec<String>, timeout_dur: Duration, depth: u8) -> Result<usize, CrawlError> {
    if depth >= MAX_DEPTH {
        return Ok(0)
    }

    let mut futures = Vec::new();
    let mut size = 0;

    for u in urls {
        size += 1;
        futures.push(async move {
            let child_urls = match handle_crawl(Request::new(u, depth, timeout_dur)) {
                Ok(urls) => urls,
                Err(_err) => [].to_vec(),
            };
            if child_urls.len() > 0 {
                do_crawl(child_urls, timeout_dur, depth+1)
            } else {
                Ok(0)
            }
        });
    }
    task::block_on(
        async {
            let res: Vec<Result<usize, CrawlError>> = join_all(futures).await;
            let sizes: Vec<usize> = res.iter().map(|r| r.map_or(0, |n|n)).collect::<Vec<usize>>();
            size += sizes.iter().fold(0usize, |sum, n| n+sum);
        }
    );
    Ok(size)
}

// method to crawl a single url
fn handle_crawl(req: Request) -> Result<Vec<String>, CrawlError> {
    let res: Result<Vec<String>, CrawlError> = task::block_on(
        async {
            let contents = match download(&req.url).await {
                Ok(data) => data,
                Err(_err) => return Err(CrawlError::DownloadError),
            };

            if has_contents_changed(&req.url, &contents) && !is_spam(&req.url, &contents) {
                let urls = match index(&req.url, &contents).await {
                    Ok(_) =>
                        match parse_urls(&req.url, &contents) {
                            Ok(urls) => urls,
                            Err(_err) => return Err(CrawlError::ParseError),
                        },
                    Err(_err) => return Err(CrawlError::IndexError),
                };
                return Ok(urls)
            } else {
                return Err(CrawlError::ContentsNotChanged)
            }
        }
    );
    match res {
        Ok(list) => return Ok(list),
        Err(err) => return Err(err),
    }
}

async fn download(url: &str) -> Result<String, CrawlError> {
    // TODO check robots.txt and throttle policies
    // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
    // invoke jsrender to generate dynamic content
    jsrender(url, &random_string(100)).await
}

async fn jsrender(_url: &str, contents: &str) -> Result<String, CrawlError> {
    // for SPA apps that use javascript for rendering contents
    Ok(contents.to_string())
}

async fn index(_url: &str, _contents: &str) -> Result<bool, CrawlError> {
    // apply standardize, stem, ngram, etc for indexing
    Ok(true)
}

fn parse_urls(_url: &str, _contents: &str) -> Result<Vec<String>, CrawlError> {
    // tokenize contents and extract href/image/script urls
    Ok((0..MAX_URLS).into_iter().map(|i| random_url(i)).collect())
}

fn has_contents_changed(_url: &str, _contents: &str) -> bool {
    true
}

fn is_spam(_url: &str, _contents: &str) -> bool {
    false
}

fn random_string(max: usize) -> String {
    rand::thread_rng().sample_iter(&Alphanumeric).take(max).collect::<String>()
}

fn random_url(i: u8) -> String {
    let domains = vec!["ab.com", "bc.com", "cd.com", "de.com", "ef.com", "fg.com", "gh.com", "hi.com", "ij.com", "jk.com", "kl.com", "lm.com", "mn.com",
		"no.com", "op.com", "pq.com", "qr.com", "rs.com", "st.com", "tu.com", "uv.com", "vw.com", "wx.com", "xy.com", "yz.com"];
    let domain = domains.choose(&mut rand::thread_rng()).unwrap();
    format!("https://{}/{}_{}", domain, random_string(20), i)
}

The crawl method defines scope of concurrency and asynchronously crawls each URL recursively but the parent URL waits until child URLs are crawled. The async-std provides support for timeout so that asynchronous task can fail early if it’s not completed within the bounded time-frame. However, it doesn’t provide cancellation support so you have to rely on cooperative cancellation.

Following unit-test and main routine shows example of crawling a list of URLs:

use std::time::Duration;
use futures::prelude::*;
use std::time::{Instant};
use crate::crawler::crawler::*;

mod crawler;

fn main() {
    let _ = do_crawl(8000);
}

fn do_crawl(timeout: u64) -> Result<usize, CrawlError> {
    let start = Instant::now();
    let urls = vec!["a.com", "b.com", "c.com", "d.com", "e.com", "f.com", "g.com", "h.com", "i.com", "j.com", "k.com", "l.com", "n.com"].into_iter().map(|s| s.to_string()).collect();
    let res = crawl(urls, Duration::from_millis(timeout));
    let duration = start.elapsed();
    println!("Crawled {:?} urls in () is: {:?}", res, duration);
    res
}

#[cfg(test)]
mod tests {
    use super::do_crawl;
    #[test]
    fn crawl_urls() {
        match do_crawl(8000) {
            Ok(size) => assert_eq!(size, 19032),
            Err(err) => assert!(false, format!("Unexpected error {:?}", err)),
        }
    }
}

You can download the full source code from https://github.com/bhatti/concurency-katas/tree/main/rust_async.

Following are major benefits of using this approach to implement crawler and its support of structured concurrency:

  • The main crawl method defines high level scope of concurrency and it waits for the completion of child tasks.
  • The async-std runtime environment supports timeout APIs and the crawl method takes the timeout parameter so that the crawling all URLs must complete with the time period.
  • The crawl method captures error from async response and returns the error so that client code can perform error handling.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.

Following are shortcomings using this approach for structured concurrency and general design:

  • Rust async/await APIs doesn’t support native support for cancellation so you will need to implement a cooperative cancellation to persist any state or clean up underlying resources.
  • Rust async/await APIs doesn’t allow you to specify execution context for asynchronous code.
  • The async/await support in Rust is relatively new and has not matured yet. Also, it requires separate runtime environment and there are a few differences in these implementations.
  • Above design for crawler is not very practical because it creates a asynchronous task for each URL that is crawled and it may strain network or IO resources.

Overall, GO provides decent support for low-level concurrency but its complexity can create subtle bugs and incorrect use of go-routines can result in deadlocks. Also, it’s prone to data races due to mutable shared state. Just like structured programming considered GOTO statements harmful and recommended if-then, loops, and function calls for control flow, structured concurrency considers GO statements harmful and recommends parent waits for children completion and supports propagating errors from children to parent. Rust offers async/await syntax for concurrency scope and supports composition and error propagation with strong ownership that reduces chance of data races. Also, Rust uses continuations by suspending async block and async keyword just creates a future and does not start execution so it results in better performance when async code is chained together. However, async/await is still in its inception phase in Rust and lacks proper support for cancellation and customized execution context.

October 29, 2020

Structured Concurrency in modern programming languages – Part-I

Filed under: Computing,Languages,Technology — admin @ 11:01 pm

Herb Sutter wrote about fifteen years ago how free performance lunch is over and you need to leverage concurrency to build high performance applications on modern multi-core machines. Unfortunately, adding concurrency support is not simple and low-level concurrency primitives in many languages can lead to buggy code with potential deadlocks and concurrent code can be hard to understand. Over the last few years, a number of programming languages have been improving support for concurrency and in this series of blogs (Part-II, Part-III, Part-IV, Swift-Followup), I will review some of programming languages that I have used in my current or past project such as Typescript, Elixir, Erlang, GO, Kotlin, and Rust. In particular, I will examine how these languages support structured concurrency so that concurrent code looks like sequential code and can be reasoned easily. Strictly speaking, concurrency relates to application behavior when modern operating systems use context switching to interleave execution of multiple tasks, whereas parallelism allow those tasks to be executed simultaneously. I will evaluate concurrency support in the context of multi-core hardware where we can guarantee correct behavior with preemptive/collaborative based multi-tasking and gain parallelism by utilizing multiple cores. The parallelism across multiple machines or distributed computing will be out of scope for this discussion.

Pitfalls with Concurrency

Before diving into the structured concurrency support, let’s review a few barriers that make writing concurrent code hard such as:

The control and data flow

The sequential code is easier to understand because you can predict the order of execution and though compilers or runtime environments may optimize that code with slightly different order but the top-down structure would remain intact. As opposed, concurrent code using threads/executors is disconnected from the main control and data flow that makes composition, error handling, timeout or cancellation in asynchronous code much harder. In addition, concurrent code requires coordination between threads with some overhead and requires synchronization to prevent data races that is hard to get right resulting in obscure and brittle code.

Race conditions

The race conditions is caused when the application behavior is determined by timing and interleaving of execution steps by multiple threads. The race conditions cause faulty behavior when the shared state or critical section is not properly guarded in a multi-threaded environment. You can eliminate race conditions by removing the shared state, using immutable objects or protecting critical sections using synchronization, mutex or locks.

Mutual Exclusion

The low-level locking primitives such as mutex, read/write/re-entrant locks are difficult to work with and add considerable complexity to your code. The buggy or incorrect implementation can lead to starvation, deadlocks or faulty behavior. Some libraries provide lock-free or concurrent data structures using atomic compare-and-swap (CAS) but they can still prone to contention when accessing from multiple threads.

Deadlocks

You may need to use locks to protect critical sections of the code and avoid race condition but incorrect implementation can lead to deadlocks where a threads can’t make any progress because it’s waiting for the resource held by another thread. In addition, the concurrent code may experience starvation when a thread can’t make a progress or livelock when multiple threads are stepping on each other. In order to circumvent deadlocks and livelocks, you can avoid nested locks, reduce number of locks and reduce scope of critical section. Instead, you can use re-entrant lock or fair locks to favor thread waiting for the longest time.

Callback Hell

A common pattern in many languages when calling an asynchronous method is to pass a callback function or lambda that is invoked when background task is completed. However, this structure devolves into complete mess when multiple asynchronous methods are chained, e.g.

class Crawler {
    crawl(url) {
        download(url, (contents) => {
            render(url, contents, (rendered) => {
                index(url, contents, (data) => { // index could have been running in parallel to parse
                    parse(contents, (urls) => {
                        urls.forEach((u) => crawl(u))
                    } // parse
                }) // index
            }) // render
        }) // download
    }

    download(url, cb) {

        ....
            cb(result)
    }

    render(url, contents, cb) {
        ....
            cb(result)
    }

    index(url, contents, cb) {
        ....
            cb(result)
    }

    parse(url, contents, cb) {
        ....
            cb(result)
    }
}

As you can see, the callback pattern quickly divulges into unwieldy mess and it’s hard to manage the results and error handling from within nested scope. Each callback is called upon completion of previous operation and you can’t use these callbacks for concurrent operations easily.

Promise Hell

Another common pattern for invoking an asynchronous method is to use promise or future objects such as:

class Crawler {
    crawl(url) {
        return download(url)
            .then((contents) =>
                render(url, contents)
            ).then((rendered) => {
                const indexPromise = index(url, contents)
                const parsePromise = parse(url, contents)
                return Promise.all([indexPromise, parsePromise]) // run both tasks in parallel
            }).then([indexResult, urls] => {
                return parse(contents, (urls) => {
                    urls.forEach((u) => crawl(u))
                } // parse
            }).catch((e) => {
                // error handling
            })
        }) // download
    }

    download(url) {
        return new Promise((resolve, reject) => {
            // async operation
            // ....
            if (ok) {
                resolve(result)
            } else {
                reject(new Error('failed'))
            }
        })
    }

    render(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    index(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }

    parse(url, contents) {
        return new Promise((resolve, reject) => {
            // async operation
        })
    }
}

Though, promise model is a bit improved and you can manage concurrent tasks better using Promise.all but you still have to nest operations and error handling requires two separate ways to catch errors (using catch blocks and native try/catch). Another gotcha in dynamic languages such as Javascript is forgetting return in the last statement of promise.

Error Handling

Above examples show that error handling in asynchronous code is tricky and error prone. For example, when using callbacks, you can pass two callback methods, one for valid results and another for error but the nested scope makes it hard to handle these errors. Similarly, when using catch blocks in promises, it’s not clear which operation failed and adds substantial complexity if you need to recover some errors or perform an alternate operation based on conditional logic. You also have to combine promise specific catch blocks with normal try/catch blocks and it’s easy to miss proper error checking.

Cancellation and Timeout

As asynchronous code is run in a separate thread of execution, the cancelling or timing out requires some coordination between threads and it can be hard to implement in absence of library or language support. For example, a thread in expensive computation or database query can’t be cancelled if it’s blocking until that operation is completed. Some libraries support APIs to stop threads but that can leave process in unpredictable state, other libraries use signals to notify threads about termination. In order to properly cancel, you need non-blocking and cooperative model where the detached task checks for cancellation request periodically. Optimally, cancellation needs to cancel underlying operation so that application state remains consistent. Timeout is just an extension of cancellation behavior where asynchronous task is cancelled if it’s not completed within a specified time bound.

Debugging/Stack-traces

Asynchronous code makes it hard to see the call graph or stack traces from caller’s perspective due to execution in a separate thread. For example, you may see following stack trace in case of a database error on NodeJS where root-cause is not easily apparent:

at new QueryResultError (node_modules/pg-promise/lib/errors/queryResult.js:122:24)
at Query.ctx.db.client.query (node_modules/pg-promise/lib/query.js:192:41)
at Query.handleReadyForQuery (node_modules/pg/lib/query.js:126:10)
at Connection.<anonymous> (node_modules/pg/lib/client.js:163:19)
at Socket.<anonymous> (node_modules/pg/lib/connection.js:118:12)
at addChunk (_stream_readable.js:288:12)
at readableAddChunk (_stream_readable.js:269:11)
at Socket.Readable.push (_stream_readable.js:224:10)
at TCP.onStreamRead [as onread] (internal/stream_base_commons.js:94:17)

Out of scope

Note: you may need to handle other concerns such as retries with exponential back-off, circuit-breakers, or idempotence behavior with asynchronous code in distributed systems but it won’t be discussed here.

Concurrency Constructs

I have discussed some of concurrency primitives in my old blogs [1634, 1638, 1621] but following are common constructs or building blocks that are used for implementing concurrency support:

Threads

A thread defines a smallest unit of execution and multi-threading allows executing concurrent operations. There are two types of threads:

Native/OS-Thread

The native threads are tied with kernel threads and are scheduled using preemptive multi-tasking on modern operating systems. The operating system preempts native thread upon IO operation, wait/sleep, hardware interrupts or context switching. Native threads have high cost due to its stack size (about 256KB) and system overhead. As a result, a thread-pool of limited size is often used to share system resources for background processing.

Green/User-space Thread

The green threads use cooperative scheduling where a user-space scheduler performs context switching thus the overhead of spawning new threads is very small. The user-space schedulers use M:N model for mapping M schedulers to N native threads such as:

As green threads use cooperative scheduling, they generally require yield to allow other threads to proceed. The user-space schedulers are not suitable for blocking operations such as sleep or blocking IO. For example, Java initially supported green threads but it replaced with native threads to support preemptive multi-tasking. Similarly, earlier version of Rust used green threads with blocking IO that resulted in slow performance so it replaced green threads with native threads in later version. Thus, green threads are generally used with non-blocking IO or waits that automatically preempts the thread, saves its stack and resumes another thread. As a general rule, native threads work better if an application is CPU bound or requires real-time priority and green threads/coroutines provide better concurrency with IO bound applications.

Structured Concurrency

The structured concurrency was envisioned by Martin Sústrik to simplify writing concurrent applications. Following are building blocks of structured concurrency that remedy concurrency related issues discussed above:

Concurrency Scope

The structured concurrency defines a single entry and exit similar to top-down structured programming. It defines a concurrency scope or boundary where all asynchronous tasks must complete by the end of scope. The scope may optionally define the context or queue where the tasks will be run. This model simplifies semantics of asynchronous behavior because the lifetime of child tasks is tied with the parent scope. The parent scope automatically waits until all child tasks are completed.

Execution Context

The structured concurrency allows you to specify context, threads or queues where asynchronous code will run so that you can manage related asynchronous code and underlying resources easily.

Cancellation and Timeout

The structured concurrency provides first-class support for cancellation and timeout though it still requires that child tasks support cooperative cancellation as blocking operations cannot be easily interrupted.

Error Handling

The errors from child tasks are automatically propagated to the parent scope where they can be handled in a consistent way using language provided syntax.

Immutability or value semantics

The structured concurrency encourages use of immutable objects or pass by value semantics to avoid race condition or need for locks to protect the critical section.

Composition

The structured concurrency allows composing asynchronous code within another asynchronous function so that data and control flow can be managed easily. It allows errors to be propagated from the nested asynchronous code to calling function and it cancels all child asynchronous tasks if parent task is cancelled.

What’s missing from Structured Concurrency

The structured concurrency doesn’t specify exact paradigm of concurrency mechanism such as threads, coroutines, fibers, generators, actors, etc and instead it focuses on concurrency scope, data/control flow, error handling, cancellation/timeout and composition. Thus, it won’t solve data races if your application requires mutable shared data, which is accessed from multiple threads or coroutines concurrently so you will need to rely on synchronization primitives to protect the critical section.

Toy Web Crawler

I will use a toy implementation of a simple web crawler to show support of structured concurrency in some of my preferred languages. Following is a pseudocode of sequential version of this crawler:

MAX_DEPTH = 5
class URLRequest: 
  url, status, depth, error, created_at, started_at, completed_at

class WebCrawler:
  def crawl_all(root_urls):
	# priority such as page-rank
	pq = PriorityQueue()
	# track duplicate urls
	visited = Set()
	# add root urls to queue
	for url in root_urls:
		pq.add(URLRequest(url, pending, 0))

	# crawl urls using BFS
	total = 0
	while not pq.isEmpty():
		total+=1
		request = pq.pop()
		visited.add(request.url)
		urls = crawl(request)
		for url in urls:
			if not visited.contains(url) and 
            not is_spam(url) and request.depth+1 < MAX_DEPTH:
				pq.add(URLRequest(url, pending, request.depth+1))
	# all done
	print total

  # download, parse and index given url in the request
  def crawl(request):
	urls = []
	try:
		rqeuest.started_at = Date()
		contents = download(request.url)
		contents = jsrender(request.url, contents)
		if has_content_changed(request.url, contents):
			index(request.url, contents)
			urls = parse_urls(request.url, contents)
		request.status = completed
	except: err
		request.status = failed
		request.error = err
	# mark request completion time
	request.completed_at = Date()
	return urls

  def download(url):
	# check robots.txt and throttle policies
 	# may need timeout for slow websites and linearize 
    # requests to the same domain to prevent denial of service attack

  def jsrender(url, contents):
 	# for SPA apps that use javascript for rendering contents
	return contents

  def index(parent_url, contents):
	# apply standardize, stem, ngram, etc for indexing

  def parse_urls(parent_url, contents):
	# tokenize contents and extract href/image/script urls
	return urls

  def is_spam(url):
	# check spam or low-quality urls
	return false

  def has_content_changed(url, contents):
	# calculate hash digest and compare it with last digest
	return true

Above example defines crawl_all method to crawl list of root-urls that recursively invokes crawl method using breadth-first-search. The crawl method invokes stub-out methods for downloading url, parsing contents and indexing the contents.

Typescript

Typescript/Javascript on NodeJS platform offers a unique design for managing concurrency where only a single thread processes requests from event queue. Following is high-level architecture of NodeJS:

NodeJS process uses a single thread that takes next operation to execute form the event queue and executes in an event loop. It delegates some of system calls and asynchronous code to a small thread pool and uses non-blocking API when performing disk or network IO operations. This architecture eliminates the need to synchronize shared data as there is only a single thread accessing application state at a time (similar to actor model).

Using async/await in Typescript

Following is an implementation of web crawler using async-await syntax in Typescript:

import { Request, Response } from '../types/index';

const MAX_DEPTH = 4;
const MAX_URLS = 11;
const DOMAINS = [
  'ab.com',
  'bc.com',
  'cd.com',
  'yz.com',
];

export class Crawler {
  async crawl(urls: string[], timeoutMillis: number): Promise<number> {
    // Main scope of concurrency begin
    const res = await doCrawl(urls, 0, timeoutMillis);
    return res.childURLs;
    // Main scope of concurrency end    
  }
}

///////////////// PRIVATE METHODS ////////////////
const doCrawl = async (
  urls: string[],
  depth: number,
  timeoutMillis: number
): Promise<Response> => {
  const res = new Response();
  if (depth >= MAX_DEPTH) {
    res.failed('max-depth');
    return res;
  }
  const requests = urls.map((u) => new Request(u, depth, timeoutMillis));
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

  const childURLs : number = results.reduce((total: number, r: Response) => total + r.childURLs, 0);
  res.succeeded(childURLs);
  return res;
};


const handleCrawl = async (req: Request): Promise<Response> => {
  const res = new Response();
  const contents = await download(req.url);
  const newContents = await jsrender(req.url, contents);
  if (
    hasContentsChanged(req.url, newContents) &&
    !isSpam(req.url, newContents)
  ) {
    await index(req.url, newContents);
    const urls = await parseURLs(req.url, newContents);
    const childResp = await doCrawl(urls, req.depth + 1, req.timeoutMillis);
    res.succeeded(childResp.childURLs + 1);
  } else {
    res.failed("contents didn't change");
  }
  return res;
};

const download = async (url: string): Promise<string> => {
  // TODO check robots.txt and throttle policies
  // TODO add timeout for slow websites and linearize requests to the same domain to prevent denial of service attack
  return randomString(100);
};

const jsrender = async (url: string, contents: string): Promise<string> => {
  // for SPA apps that use javascript for rendering contents
  return contents;
};

const index = async (url: string, contents: string) => {
  // apply standardize, stem, ngram, etc for indexing
};

const parseURLs = (url: string, contents: string): string[] => {
  // tokenize contents and extract href/image/script urls
  const urls = [];
  for (var i = 0; i < MAX_URLS; i++) {
    urls.push(randomUrl());
  }
  return urls;
};

const hasContentsChanged = (url: string, contents: string): boolean => {
  return true;
};

const isSpam = (url: string, contents: string): boolean => {
  return false;
};

const randomUrl = (): string => {
  const i = Math.floor(Math.random() * DOMAINS.length);
  return 'https://' + DOMAINS[i] + '/' + randomString(20);
};

const randomString = (n: number): string => {
  let letters =
    'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
  let text = '';
  for (let i = 0; i < n; i++) {
    text += letters.charAt(Math.floor(Math.random() * letters.length));
  }
  return text;
};

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

The async-await syntax in Typescript is a syntactic sugar on top of promises. The async function returns a promise and automatically wraps return value with Promise.

The crawl method takes a list of URLs with timeout that invokes doCrawl, which crawls list of URLs in parallel and then waits for results using await keyword. The doCrawl method recursively crawls child URLs up to MAX_DEPTH limit. The main crawl method defines boundary for concurrency and returns count of child URLs.

Following is a unit test for testing the behavior of async/await based crawler:

import { Crawler } from '../lib/index';
import { expect } from 'chai';

const EXPECTED_URLS = 19032;
const ROOT_URLS = [
  'a.com',
  'b.com',
  'c.com',
  'd.com',
  'e.com',
  'f.com',
  'g.com',
  'h.com',
  'i.com',
  'j.com',
  'k.com',
  'l.com',
  'n.com',
];

describe('crawler', async () => {
  it('crawling urls with nesting', async () => {
    const started = new Date().getTime();
    const timeout = 5000;
    const crawler = new Crawler();
    const res = await crawler.crawl(ROOT_URLS, timeout);
    const elapsed = new Date().getTime() - started;
    console.log(`Crawl took ${elapsed} to process ${res}`);
    expect(res).equal(EXPECTED_URLS);
  });

});

Following are some of the concurrency support in Typescript:

  • Concurrency scope – Though, typescript doesn’t support concurrency scope as first class citizen or an option to specify queue/context but you can manage boundary using await in the main method.
  • The async declared methods in above implementation shows asynchronous code can be easily composed.
  • Error handling – Async-await syntax uses normal try/catch syntax for error checking instead of specialized syntax of Promise or callback functions.
  • As NodeJS uses a single thread in an event loop, you don’t need to worry about shared state being updated in multiple threads.

Following are the major shortcomings in Typescript for its support of structured concurrency:

  • Typescript doesn’t support value semantics and objects are passed by reference except primitive types.
  • As NodeJS uses a single thread for even loop with small thread pool for asynchronous operation, it limits the tasks that you can run in parallel on multi-core hardware.
  • Typescript doesn’t support cancellation and timeout natively so you have to rely on cooperative cancellation. You can implement timeout partially using Promise.race but it’s not a reliable way to handle timeouts, e.g.
  const promises = requests.map((r) => handleCrawl(r));
  const results = await Promise.race([
    Promise.all(promises),
    timeout(timeoutMillis),
  ]);

const timeout = (ms: number): Promise<any> => {
  return new Promise((resolve, reject) => setTimeout(
	  () => {
		reject(new Error(`Timed out ${ms}`))
	  }, ms));
};

You can download full code for Typescript example from https://github.com/bhatti/concurency-katas/tree/main/node.

Overall, Typescript/NodeJS is suitable for IO-bound applications where a little time is spent on each operation so that event-loop can switch to next task but it’s not suitable when the application has blocking/CPU-bound operations, requires more background tasks or requires high level of concurrency and parallelism.

September 6, 2020

Review of “Software Engineering at Google”

Filed under: Computing,Technology — admin @ 5:09 pm

In “Software Engineering at Google”, engineers from Google share practices from software development life-cycle at Google. Here are a few lessons from the book that are applicable to most engineers while omitting Google’s unique internal practices and tools:

Software Engineering

Hyrum’s law

With a sufficient number of users of an API, it does not matter what you promise in the contract: all observable behavior of your system will be depended on by somebody.

Shifting Left

Finding problems earlier in the developer workflow usually reduces costs.

Hiding considered Harmful

Share ideas early to prevent personal missteps and vet your ideas.

Bus Factor

Disperse knowledge to reduce the bus factor.

3 Pillars of social interaction

  • Humility (lose the ego and learn to give/take criticism)
  • Respect
  • Trust (fail fast and iterate)

Blameless Post-Mortem Culture

  • Summary
  • Timeline of event
  • Primary cause
  • Impact/damage assessment
  • Actions for quick fix
  • Actions for prevent in future
  • Lessons learned.

Knowledge sharing

  • Psychological safety
  • Respect
  • Recognition
  • Developer guides
  • static analysis
  • newsletter
  • readability certification – where each Changelist (CL) requires readability approval from readability certified engineer

Leadership

Servant leadership

  • Create an atmosphere of humility and trust
  • Helping a team achieve consensus and serve your team

Antipattern

  • Hire pushovers
  • Ignore low performers
  • Ignore human issues
  • Be everyone’s friend
  • Compromise the hiring bar
  • Treat your team like children.

Positive patterns

  • Lose the ego – ownership, accountability, responsibility
  • Find people who can give constructive feedback
  • Be a zen master – leader is always on stage, maintain calmness, ask questions
  • Be a catalyst (build consensus)
  • Remove roadblocks
  • Be a teacher and mentor
  • Set clear goals – create mission statement for the team
  • Be honest (give hard feedback without using compliment sandwich)
  • Track happiness – recognition
  • Delegate but get your hands dirty
  • Seek to replace yourself
  • Know when to make waves
  • Shield your team from chaos
  • Give your team air cover – defend team from uncertainty and frivolous demands
  • Let your team know when they are doing well.

Leading

  • Always be deciding (weigh trade-offs)
  • Identify the blinder
  • Identify the key trade-offs
  • Decide/Iterate (e.g. trade-offs within web search are latency, quality and capacity – pick two)
  • Always be leaving
  • Build a self-driving team
  • Divide the problem space (delegate sub-problems to leaders)
  • Anchoring a team’s identity (rather than putting a team in charge of a specific product/solution, anchor team to the problem)
  • Always be scaling
  • Cycle of success: analysis (trade-off/consensus) -> struggle (fake it) -> traction (progress) -> Reward (solves new problem)
  • Important vs Urgent (delegate urgent things, dedicate time, tools such as GTD)
  • Learn to drop balls (split tasks between top 20%, bottom %20, middle %60 – drop bottom 80%)
  • Protecting your energy (vacation/breaks).

Measurement

Goals:

Goal is a desired result/property without reference to metric – QUANTS (Quality, Attention, Intellectual, Tempo, Satisfaction), e.g. quality of the code, attention from engineers, intellectual complexity, tempo/velocity and satisfaction.

Signal:

Signal are things that need to be measured – may not be measurable.g. if goal is learning from readability, signals can be reporting learning from the readability process.

Metrics:

Metric is a proxy for signal, e.g. quantitative metrics such as readability survey how readability process has improved the code quality. Each metric should be traceable.

Styling guiding principles

  • Rules must pull their weight
  • Optimize for the reader
  • Be consistent (scaling, minimize ramp-up, resilience to time)
  • Setting the standard (coding conventions)
  • Avoiding error-prone/surprising constructs
  • Concede to practicalities.
  • Use tools such as error checkers, code formatters, etc

Code Review

  • Correctness & comprehension
  • Approval from OWNERS (stored as a regular file in repository)
  • Approval from readability
  • Knowledge sharing
  • Be polite & professional
  • Write small changes
  • Write good change description
  • Keeping reviewers to minimum
  • Automate where possible

Documentation

  • Know your audience (experience level, domain knowledge, purpose)
  • Documentation types:
    • Reference documentation
    • Design documents
    • Tutorials
    • Conceptual documentation
    • Landing pages)
  • Documentation philosophy (WHO, WHAT, WHEN, WHERE and WHY).

Testing

Benefits

  • Less debugging
  • Increased confidence in changes
  • Improved documentation
  • Simpler reviews
  • Thoughtful design, fast/high quality releases
  • Test scope – 5% functional, 15% integration, 80% unit
  • Beyonce Rule – If you liked it, then you shoulda put a test on it
  • Pitfall of large test suits (no more mocks)
  • Test certified (project health pH tool to gather metrics such as test coverage, test latency, etc)

Unit testing

  • Prevent brittle tests
  • Strive for unchanging tests (pure refactoring, new features, bug fixes, behavioral changes)
  • Test via public APIs (to avoid brittle tests)
  • Test State, Not Interactions (avoid mock objects)
  • Writing clear tests, Make tests complete and concise (using helper methods object-factories, assertions)
  • Test behavior, Not methods (assert each behavior in separate method instead of testing each method per test, e.g. display_showName, display_showWarning)
  • Structure tests to emphasize behavior
    • comment Given/When/Then
    • use And to break it further
    • can have multiple combinations of When/Then for dependent behavior
  • Name tests after the behavior being tested
  • Don’t put logic in tests
  • Write clear failure messages
  • Test and code sharing: DAMP (descriptive and meaningful phrases), Not DRY (duplicating some construction logic of objects instead of helper methods)
  • Shared Values – use builder methods to construct objects instead of static constants
  • Shared Setup (be careful with too much dependencies in common setup)
  • Share helpers and validations/assertions
  • Define test infrastructure – sharing code across multiple test suites

Test Doubles

  • Testable code
  • Applicability to avoid brittleness and complexity
  • Fidelity in how close the behavior mimics real implementation
  • Avoid mocking frameworks if possible
  • Seams – Use Dependency injection to make the code testable

Techniques

  • Faking – lightweight implementation but low fidelity
  • Stubbing – specify the expected behavior with Mocks
  • Interaction testing – verifying method is called properly but it can lead to complex tests so avoid it
  • Real implementation – high fidelity and give more confidence but evaluate based on execution time, determinism
  • Prefer State testing over interaction testing and use interaction testing only for state changing functions
  • Avoid over specification.

Large functional tests

  • Obtain a system under test, seed data, perform actions, verify behavior
  • You may use multiple tests in chains and store intermediate data so that output of one test is used as input to another
  • Each SUT is judged based on hermeticity (SUT’s isolation from usage and interactions from other components) and fidelity (SUT’s accuracy in reflecting the prod environment). For example, staging tests use staging deployment but it requires code to be deployed there. Avoid 3rd party dependencies in SUT environment and use doubles to fake it
  • You can also use record/play proxies or use consumer-driven contract that defines contract for client and provider of the service (Pact contract testing)

Test data

  • Seeded data
  • Test traffic
  • Domain data – pre-populated data in database
  • Realistic baseline/data
  • Seeding API.

Verification

  • Manual
  • Assertions

Types of larger tests

  • Functional testing
  • Browser and device testing
  • Performance
  • Load and stress testing
  • Deployment configuration testing
  • Exploratory testing (manual)
  • Bug bashes
  • A/B diff regression testing
  • UAT, Probes and canary analysis (in prod)
  • Disaster recovery and chaos engineering
  • User evaluation

Version Control and Build System

Google uses Mono-repo, one-version rule for version control to avoid confusing choices and a task based build system. All dependencies also follow one-version rule to simplify deployment. Google also uses static analysis/linters to provide feedback on code before it’s committed.

Dependency management

Google uses semantic versioning for managing dependencies. You can think of dependency as a directed graph and requirements as edges. The process of finding a mutually compatible set of dependencies is akin to SAT-solvers. Minimum version selection (MVS) can be used to find next higher version to make dependencies compatible as semantic version is not reliable way to trust backward compatibility.

Continuous integration

The code goes through edit/compile -> pre-submit -> post-submit -> release-candidate -> rc-promotion -> final-rc phases of time time. The CI provides fast feedback using automated and continuous builds and delivery. Pre-submit uses fast unit tests and post-submit uses large tests (hermetic tests against test environment with greater determinism and isolation) to verify changes and SLO.

Continuous delivery

Google uses idioms of agility (small batches), automation, isolation, reliability, data-driven decision making, and phases rollout for continuous delivery. It uses shifting left to identify problems earlier and ship only what gets used.

August 19, 2020

Review of “Building Secure and Reliable Systems”

Filed under: Computing,Technology — admin @ 5:33 pm

The “Building Secure and Reliable Systems” book shares best practices from Google’s security and SRE engineers. Here is a summary of these best practices:

The first chapter discusses tradeoff between security and reliability, e.g. reliability protects against non-malicious failure but may expand security surface via redundancy whereas security risk comes from adversarial attacks. Both reliability and security need confidentiality, integrity and availability but with different perspectives. Complex systems are difficult to reason so you must apply “Defense in depth”, “Principle of least privilege” and “Distinct failure domains” to limit the blast radius of failure. For example, Google uses geographic regions to limit the scope of credentials.

The second chapter focuses on “security adversaries” and “attack motives” who may come from hobbyists, hacktivist, researchers, criminals, cyber warfare, insiders and other background. You can apply CAPTCHA, automation/AI, zero trust, multi-party authorization, auditing/detection and recoverability to protect against these attacks.

The third chapter is part of second section of the book that focuses on designing secure and reliable systems. It introduces safe proxies in production environment that enforce authentication, multi-party authorization (MPA), auditing, rate limiting, zero touch, access control, etc. For example, Google uses CLI proxy to execute commands that are controlled via security policy, MPA and provides auditing/logs.

The chapter four examines security tradeoffs and reviews product features that may include functional and non-functional requirements (e.g. security, reliability, SLO dev velocity). Reliability and security are also considered emergent properties of system design and encompass entire product and services. The chapter also gives an example of design document template that includes sections for scalability, redundancy/reliability, dependencies, data-integrity, SLA, and security/privacy.

The chapter five discusses designing for least privilege that uses authentication and authorization. It also examines zero-trust networks that doesn’t grant any illegal access and zero-touch interfaces where all access is automated. It recommends writing small functions so that access control can be clearly defined, breaking glass in case of emergency to bypass certain authorization systems, auditing, testing for least privilege, multi-party authorization (MPA), three-factor authorization (3FA where access is approved from two platforms), business justifications, temporary access, proxies etc. This chapter also discusses tradeoffs of complex security with other factors such as company culture, data quality, user productivity, and development complexity.

The chapter six focuses on designing for understanding to reduce likelihood of security vulnerabilities and increase confidence in the system security. It defines system invariant, which is a property that is always true and can be used to assert security and reliability properties. It suggests using mental model to understand complex security system and explains identities, authentication, and access control concepts. When breaking a system into smaller components, the chapter recommends using trusted computing base (TCB) to create a security boundary that enforces security policies. In order to provide access from one TCB to another, you may issue end-user context ticket (EUC) that provides access temporarily. In order to standardize security policies, you may use a common framework for request dispatching, input sanitization, authentication, authorization, auditing, logging, monitoring, quota, load balancing, configuration, testing, dashboard, alerting, etc.

The chapter seven focuses on extensibility and new changes. For example, keeping dependencies up-to-date, automated testing, release frequently, using containers, micro services, etc.

The chapter eight focuses on resilience that describes the system’s ability to hold out against a major malfunction or disruption. It encourages designing the system with independent layers, modularization, redundancy, automation, security in defense, controlled degradation (partially failure), load shedding, throttling, automated response. You will need to consider tradeoffs between reliability and security, e.g. failing safe vs failing secure where reliability/safety may require ACL is “allow-all” but security may require ACL is “deny-all”. You can segment your network and Compartmentalize your system to reduce the blast radius. With micro-service architecture, you can assign distinct roles for each service and add geographic location or time as a scope of access. The chapter then defines failure domain, which is a type of blast radius control that creates isolation by partitioning a system into multiple equivalent but completely independent copies with its own data. Any of the individual partitions can take over for the entire system during an outage and help protect systems from global impact. You can validate the system continuously for failures using fuzzing and other types of testing.

The chapter nine discusses recoverability from random, accidental, software failures and errors. The chapter recommends designing emergency push system to simply be your regular push system turned up-to maximum for recovering it from failure. In order to prevent rollback to older-version, you can collect undesirable versions into a deny list or use white-list of allowed versions, which is used in the release system for verification. Also, you can maintain security version numbers (SVNs) and minimum acceptable security version numbers (MASVNs) and rotate signing keys, e.g.

ComponentState[DenyList] = ComponentState[DenyList].union(self[DenyList))
ComponentState[MASVN] = max(self[MASVN], ComponentState[MASVN])

def IsUpdateAllowed(self, Release, ComponentState, KeyDatabase):
  assert Release[Version] not in ComponentState[DenyList]
  assert Release[SVN] >= ComponentState[MASVN]
  assert VerifySignature(Release, KeyDatabase)

The chapter ten explains how to mitigate D.O.S. attacks where attacker may compromise vulnerable machines or launch amplification attacks. This chapter suggests using edge routers to throttle high-bandwidth attacks and eliminate attack traffic as early as possible. For example, You can use network and application load balancers to continually monitor incoming traffic. Other mitigating techniques include using caching proxies, minimize network requests (e.g. using spriting), minimize egress bandwidth, CAPTCHA, rate limit, monitoring/alerting (MTTD mean-time-to-detect, MTTR mean-time-to-repair), graceful degradation, exponential backoff, jitter, etc.

The chapter eleven is part of third section and focuses on maintaining trusted CA. For example, you can use secure and memory-safe languages to parse certificates or CSR requests. You may need to use third-party libraries but you can add testing for validation.

The chapter twelve focuses on writing code, e.g. using frameworks that enforce security and reliability. You can use RPC frameworks that may provide logging, authentication, authorization, rate-limiting. This chapter covers OWASP top vulnerabilities such as SQL injection that can be prevented by using parameterized SQLs; XSS that can be prevented by using sanitizing user input (safeHTML) and incremental rollout. Other coding techniques include simplicity, minimizing multi-level nesting/cyclometic complexity, eliminate yagni smells, pay tech-debt, refactoring. The chapter also suggests using memory-safe and strongly/static typed languages.

The chapter thirteen examines testing code using unit and integration tests. It also introduces other testing techniques such as fuzz testing, chaos engineering, static program analysis, code inspection tools (Error Prone for Java and Clang-Tidy), and formal methods.

The chapter fourteen describes deployment phase of software development that may include pushing the code, downloading a new binary, updating configuration, migrating database, etc. The chapter reviews threat model to prevent bad deployment such as accidental change, malicious change, bad configuration, stealing integrity keys, deploying older version, backdoor, etc. It suggests best practices such as code-reviews, automation, verifying artifacts, validating configuration, binary provenance, etc. The binary provenance verifies input to the artifact and validate transformation and entity that performed the build. The provenance fields include authenticity (signature), output, input (source and dependencies), command, environment, input metadata, debug-info, versioning. A build is considered verifiable if the binary provenance produced by the build is trustworthy. The verifiable build architectures include trusted build service, hermetic builds, reproducible builds, and verifiable builds, however you may need break-glass mechanism that bypasses the policy in case of outage. You can add post-deployment verification to validate the deployment.

The chapter fifteen shows how to investigate systems using debug flags, verifying data corruption, reviewing logs, and designing for safety.

The chapter sixteen is part of section four that focuses on disaster planning. This chapter introduces best practices to address short and long-term recovery such as performing analysis of potential disaster, establishing a response time, creating a response plans/playbooks, configuring systems, testing procedures/systems, and incorporating feedback from tests and evaluation. It shows how to setup incident response team that may include incident commander, SREs, customer support, legal, forensic, security/privacy engineers, etc. IR teams can use severity and priority models to categorize incidents based on severity of their impact on the organization and priority model to define response time. The response plan include incident reporting, triage, SLO, roles/responsibilities and communications. You also need to test systems and response plans and audit automated system. Red team testing can help simulate how the system reacts to an attack.

The chapter seventeen reviews crisis management that determines if the security incident is a crisis. This can be evaluated in triage that determines severity of the incident and whether the incident is a result of system bug or a compromise that is yet to be discovered. In the context of crisis management, operational security (OpSec) refers to the practice of keeping your response activity secret. For example, common OpSec mistakes include documenting incident in email, logging into the compromised systems, locking accounts/changing passwords, taking system offline. The chapter instead suggests meeting in person, use key-based access (without login), etc. You can apply forensics processes to investigate the security compromise. The chapter ends with summary of best practices that include triage, declaring an incident, communicate with executives and SecOps, creating IR team and forensics team, preparing communication and remediation and closure.

The chapter eighteen reviews recovery and aftermath from the security incident. You can establish recovery time based on if it affected mission critical system.

The goal of your recovery effort is to mitigate an attack and return your systems to their normal routine state, however complex security events may require parallelizing incident management/response execution. In order to return your systems to normal, you need to have a complete list of the systems, networks, and data affected by the attack. You also need sufficient information about the attacker’s tactics, techniques, and procedures (TTPs) to identify any related resources that may be impacted. There are several considerations before recovery such as:

  • how will your attacker respond to your recovery effort?
  • is your recovery infrastructure or tooling compromised?
  • what variants of the attack exist?
  • will your recovery reintroduce attack vectors?
  • what are your mitigation options?

The recovery checklists includes:

  • isolating Assets (quarantine)
  • system Rebuilds and software Upgrades
  • data sanitization
  • recovery data
  • credential rotation
  • postmortems

The chapter nineteen is part of section five of the book that offers making security a part of the organization culture. It suggests making security a team responsibility, providing security to users, designing for defense in depth and being transparent to the community.

The chapter twenty describes roles and responsibilities for security and reliability. For example, security experts implement security specific technologies, SREs develop centralized infrastructures, and security specialists can devise best practices. You can embed security experts with the development teams or review/audit security practices. Organizations can create red team that focus on offensive exercises for simulating attacks and blue team for assessing and hardening software and infrastructure.

The chapter twenty one shows how to build a culture of security and reliability. The chapter suggests organization culture of by-default security and reliability and encourage employees to discuss these topics early in project life-cycle. The chapter also suggests culture of review where peer reviews ensure that code implement least privilege and other security considerations. The culture should include awareness of security aspects, sustainability, transparency, and communication.

February 14, 2020

Review of the “Database Internals”

Filed under: Design,Technology — admin @ 8:06 pm

The database internals is an excellent resource for deep dive into storage engines and distributed systems. The first chapter introduces OLTP, OLAP and HTAP databases. It reviews database architecture and components including transport, query processor, storage engine, transaction manager, lock manager, access methods, buffer manager and recovery manager. The storage may use in-memory store or disk store and some in-memory database use disk for backup, which is updated asynchronously. The chapter reviews row-oriented and column-oriented databases along with data files and index files.

The second chapter covers B-Trees that is often used with disk based storage engines. The chapter introduces binary search trees (BST) and balance trees. However, such BST data structures use add elements in random order and are not optimized for disk storage as parent and child nodes can be stored in different regions of memory. Also, height of BST may limit the search in O(log N) operations. The chapter reviews architecture of hard drives such as SSD and B-Tree data structures where each node can hold up to N keys and N + 1 pointers to the child nodes. The nodes are grouped into root-node, leaf-nodes and internal nodes where each node is used for fixed-size page. Keys in B-Tree nodes are called index entries, separator keys or divider cells and they split the tree into subtrees holding key ranges. B-Trees are based on N logarithm base and there are K times more nodes on each new level. During lookup at most logk (M) (where M is total number of items) pages are fetched to find a searched key. In order to insert a value, it finds target leaf and key/value are appended to it. The node may need to be split if there isn’t enough room. Similarly, deletions find target leaf and key/value are removed. The deletion may result in node merges if neighboring nodes are too few.

The chapter three covers file format for B-Trees for disk. It reviews binary encoding and primitive types, strings and general principles of file format such as header, page-data and trailer. The page format can be fixed or variable size but variable size may incur more overhead. Also, variable size page must reclaim space when records are removed and reference records in page without regard to their exact locations. The variable-size pages generally use slotted page structure that has headers, list of pointers and list of variable size cells where each cell stores flags, key/data size, page-id and byte data. Removing an item may just mark the cell as deleted and reclaim later. The insertion may use first-fit or best-fit strategy to find the free blocks. Also, headers may store version and checksum for data validation.

The chapter four shows how to implement B-Trees, e.g. page-header may store flags, number of cells, magic number, etc. Some implementations of B-Trees store sibling pointers (forward/backward) to locate neighboring nodes but it adds complexity in split/merge. BTrees also store one additional pointer to child pages than the number of keys:

+--------------------------+------------+
|                          |  Separator | ---> Ks >= K3
+--------------------------+------------+
|  K1       |  K2            |    K3    |
+--------------------------+------------+
Ks < K1    K1<=Ks<=K2     K2 <= Ks < K3

Alternatively, you can store rightmost pointer in the cell along with high key. Each node in B-Tree is designed to keep a specific number of items and resizing may require copying data so in order to avoid copying, they can use extension/overflow page and link it form the original page. B-Trees keeps keys in order so that they can use binary search and insertion point is index of the first element that is greater than the given key. Some implementations may store parent pointers in nodes or use breadcrumbs to store path of leaf node in case they need to split/merge. B-Tree implementations may postpone split/merge later, create a new right-most node or use other algorithms to improve re-balancing. B-Trees may also apply compression at various granularity levels and perform maintenance to fix fragmented data or garbage collect non-addressable data (vacuum).

The chapter five reviews transaction processing and introduces concepts of ACID and page caching so that modifications can be done in memory. The pages can be brought in if they are not in memory and evicted/flushed to disk when there isn’t enough memory (with O_DIRECT lag to bypass kernel cache). After page modifications, it’s marked as dirty so that it can be flushed for durability. These modifications are coordinated with the write-ahead-log (WAL) so that data can be recovered if the server crashes (referred as checkpoint). As splits/merge may require multiple writes, B-Tree  can lock pages that have high probability of being used, called pinning and pinned pages are kept in memory. The I/O operations can be buffered to reduce disk I/O. Based on available memory, B-Tree may need to evict old pages when new pages cannot fit in memory and there are a variety of algorithms for eviction policies (page replacement) such as FIFO, LRU, CLOCK (references in circular buffer), LFU, etc. B-Trees use write-ahead log (WAL) to buffer changes to page-contents. These changes to WAL are flushed with fsync, but due to certain error conditions in fsync it may not report errors if they were cleared and it can result in loss of data. B-Tree implementations may use seat/no-steal and force/no-force policies to determine when changes are flushed on disk and they impact undo/redo behavior. The steal policy allows flushing a page without committing a transaction and a force policy requires all pages modified by the transaction to be flushed before the transaction commits.  The chapter explains ARIES algorithm, which is steal/no-force recovery algorithm, uses physical redo to improve performance and logical undo to improve concurrency and uses WAL records to implement repeating history. ARIES uses LSN (log sequence numbers) to identify log records, track pages in dirty page table and use physical undo/logical undo. The chapter reviews concurrency controls such as optimistic concurrency control, multi-version concurrency control and pessimistic concurrency control (using lock and no lock). The chapter reviews transaction isolation and read/write anomalies such as dirty read (uncommitted updates), non-repeatable read (querying again), phantom read (range queries), lost update (last writer wins), dirty write (takes dirty reads), write skew (double spending). Th isolation include read-uncommitted that allows dirty, phantom and non-repeatable reads; read-committed that prevent dirty reads; repeatable that prevent non-repeatable reads but allow phantom reads; serializable level that executes transactions serially and prevent phantom reads. Serializable isolation is difficult to implement and some databases use snapshot isolation to observe all transaction committed since the start time. The snapshot isolation prevents lost update but it’s still susceptible to write skew. Optimistic concurrency control validates transaction before writing and works if retries can be prevented, but it still needs to manage a critical section. Multi-version concurrency control uses monotonically incremented transaction IDs or timestamps and is used to prevent access to uncommitted values. Pessimistic concurrency control can use locks or simple timestamps that it checks to ensure that no other transaction has been committed with higher timestamp. The database maintains max_read_timestamp/max_write_timestamp and read operations with older timestamp are aborted and write operations with lower than max_read_timestamp would conflict but write operations with older than max_write_timestamp are allowed (Thomas Write Rule). Lock-based concurrency control uses locks such as two-phase locking where growing phase all locks are acquired and shrinking phase, where all locks are released after the transaction. Locks can lead to deadlocks so you need timeout to abort long running transactions. The chapter describes distinction between locks and latches where locks are used to isolate and schedule overlapping transactions and latches guard physical B-tree contents (leaf/non-leaf). The latches can use reader-write locks (busy-wait/CAS) and latch crabbing determines to minimize holding time.

The chapter six goes over different types of B-Tree design and implementations. For example, some B-Trees use copy-on-write to copy contents in new shadow tree instead of using synchronization and latches and the pointer to top most page is atomically updated after the update (LMDB). In order to update the page on disk, the in memory representation is updated first using cached version, native pointers (unmanaged languages), language specific structures or using wrapper object to update disk as soon as B-Tree is updated. Lazy B-Trees reduce cost of updating, e.g. WiredTiger different format for in-memory and on-disk pages and updates are first saved in update buffer to reduce I/O. Lazy-Adaptive Tree group nodes into subtrees and attach buffer for batch operations to each subtree. FD-Trees append all changes to a small mutable head tree and multiple immutable sorted runs and use fractional cascading to maintain pointers between levels along with logarithmically sized sorted runs. In order to reduce write amplification, Buzzword-Tree (Bw) uses batch updates using append-only storage. Bw-Tree use compare-and-swap operations instead of synchronization. Cache-Oblivious B-Tree use cache-oblivious structures that give asymptotically optimal performance regardless of underlying memory structure. Cache-oblivious algorithms optimize two levels of hierarchy: page ache and disk and partition disk into blocks that page-location is cache aware. It uses platform parameters so that transfer between page-cache and disk is within constant factor.

The chapter seven discusses Log-Structured storage such as immutable LSM Trees that use append-only storage and merge trees. As B-Trees have high write amplification, LSM trees provide an alternative by using buffering and append-only storage. LSM Trees write immutable files and merge them together over time. LSM Trees use smaller in-memory buffer (memtable) and large disk. A separate write-ahead-log is appended and committed before in-memory operation is acknowledged to the client. After the disk flush, memory and disk sub-trees are discarded and replaced with the result of their merge. In LSM trees, redundant records are reconciled during the read and tombstones are used to mark deleted records. Some implementations use predicate deletes for range of keys to remove records. LSM may use compaction to optimize access such as leveled compaction used by RocksDB where level-0 tables are created by flushing memtable contents and then contents are merged later to create level-1. Some LSM trees use size-tiered compaction that group disk tables based on size or use time window for compaction (used by Cassandra). As opposed to B-Trees that are read-optimized, LSM trees do not require locating the record on disk during write but reads are more expensive with default configuration. The chapter then reviews sorted string tables (SSTables) that are often used to implement disk-resident tables. SSTables consists of index files and data files where index files use B-Trees or hash tables and data files holds data in key order and uses hash tables or other similar data structures for lookup/range queries. During compaction, data files can be read sequentially and merge iteration is order preserving so merge table can be created in a single run. The chapter introduces bloom filters test whether an element is a member of the set. The chapter then reviews Skiplist for keeping sorted data and use probabilistic balancing. A skip list builds hierarchy of linked-list at different heights where each node has more than one successor that point to nodes at lower-levels.

The chapter eight is part of second half of the book that focuses on distributed system. It introduces concepts of concurrency and parallelism where concurrent executions can interleave and shared state must be protected whereas parallel operations are executed by multiple processors. The chapter defines system reliability in terms of presence of fault tolerance and discusses fallacies of distributed computing (published by Peter Deutsch). In real applications, processing and latency time is not instantaneous and queue capacity is not infinite that also requires back-pressure. The queue size is determined by measuring task processing time and average time each task spends in the queue. Distributed system also have to deal with clock/time differences on multiple machines and state consistency such as read-time data repair or eventually consistent systems. Detecting failures in distributed systems is hard and requires heartbeat protocols and network partitions can result in partial failures. The chapter explains cascading failures that can propagate from one part of the system to another. You can use exponential backoff strategy and jitter to avoid amplifying problems. The messages can get lost, delayed or reordered in a distributed systems and sender may retry but it does not know if the message is already delivered, e.g. in fair-loss link a sender keeps retrying send infinitely; finite duplication won’t send messages finitely; and no-creation link won’t send the message the was never sent. Distributed systems use acknowledgments to notify the sender using sequence numbers and sender may re-transmit in absence of ack (stubborn link resend messages indefinitely). In order to prevent duplicate processing as a result of re-transmission, you can use idempotent operations. In distributed systems, messages can arrive out of order and recipient may use sequence to detect out of order message and put it in a buffer until earlier message arrives. The perfect link guarantees reliable delivery without duplication and no-creation (only deliver messages that were actually sent). Exactly-once delivery is very hard in distributed systems and most real applications use at-least-once delivery (at-most-once is not reliable). The chapter describes two-general’s problem to show link failures when communication is asynchronous even with perfect delivery as participants may not be alive or connected. This problem shows that no matter how many ACK you use, you can never be sure if message was delivered to both parties. This was further proved by FLP Impossibility problem that you can never guarantee consensus in a bounded time with asynchronous communication. The chapter finally discusses failure models such as crash faults, omission faults (skips execution of certain steps), arbitrary faults (byzantine faults), etc and you can use process groups and redundancy to mask these failures from user.

The chapter nine discusses failure detection, where a failure detector identifies failed or unreachable processes to exclude them  from the algorithm and guarantee liveness while preserving safety. Most distributed systems use heartbeats to detect failures, where the process notify its status to peers in response to heartbeat. Each process maintains a list of other processes and updates it with last response time. Some distributed systems use a deadline failure detector that uses heartbeat to detect if a process has failed to register within a fixed time interval. Alternatively, other systems use outsourced heartbeat to improve reliability using information from external perspective. Phi-Accural failure detector use phi-accrual failure detector to calculate probability of process’s crash based on sampling arrival time. Other approaches gossips by maintaining a heartbeat counter and sending heartbeat counter to random neighbor periodically. Another approach arranges active processes into groups where a process failure is detected by participants and the failure is propagated as a group failure.

The chapter ten goes over leader election while maintaining liveness, stability and safety. It starts with bully algorithm that uses process rank (e.g. biggest ip-address) to identify the new leader. However, it can be subjected to split brain and create problems if highest rank node is unstable. Next-in-line failover is another alternative where leader provides a list of failover nodes and next highest-ranked node is selected in case of leader failure. Candidate/Ordinary algorithm splits nodes into groups of candidates and ordinary, where one of the candidate node becomes a leader (picking highest-ranked alive node). Invitation algorithm allows processes to invite other processes to join their groups and smaller groups merged with bigger groups. Ring algorithm use ring topology where each process contacts its successor passing a set of nodes until one of the nodes respond. The highest-ranked node from live set is chosen as a leader. Lastly, you may use consensus algorithms to elect a leader along with failure detection algorithm.

The chapter eleven examines replication and consistency properties such as availability, fault tolerance and redundancy. The chapter reviews CAP theorem where availability requires non failing nodes to deliver results and linearizable consistency preserves the original operation order. In asynchronous system, you cannot guarantee both consistency and availability in presence of network partition so you either have to choose best effort availability or best effort consistency (or sacrifice latency). Also, Cap theorem discusses network partition where a node may serve incorrect response and not node crashes that doesn’t respond at all. The chapter reviews concepts of harvest and yield in context of CAP conjecture where harvest may return partial results and yield compares the number of requests succeeded  against the number of requests attempted. Thus, these properties focus on trade-offs as opposed to the absolute numbers. The distributed systems may abstract message passing and represent state as a shared memory where each unit of storage is called a register.  Each operation is tracked with invocation and completion event and the operation is considered failed if the process crashes before completing the operation. Also, some operations may overlap with other operations and are called concurrent operations. The registers can be categorized into safe (dirty/non-repeatable read), regular (repeatable), atomic (linearizable). The consistency model provide different semantics and guarantees from the perspective of state and operations in distributed/concurrent systems. For example, strict consistency provides complete replication transparency as if you hold a global lock but it’s impractical in real-life. Linarizability guarantees visibility of the writes to all readers exactly once without exposing partial state. If two operations overlap, all read operations occur after write operation can observe the effect of the operation. It provides total order of operations running concurrently so that every read of the shared value returns latest value written to the shared variable. The linearization point provides atomic guarantee such that the effect of operation becomes visible. Linearizability is expensive to implement as it requireds coordination and ordering but you can use compare-and-swap where you first prepare result and then use CAS for swapping pointers and publish the state. Sequential consistency is a step below Linearizability that executes operations in some sequential order where operations of each individual processes are executed in the same order. In causal consistency, all process see causally related operations in the same order. It can add logical clocks with each message and the operation is processed only if preceding operation is completed. The chapter defines vector clock as a structure for establishing a partial order between the events. Processes maintain vectors of logical clocks, with one clock per process and is incremented every time a new event arrives. In order to resolve conflict, you check duplicate value with same key and append a new version to the version vector and establish the causal relationships. The chapter discusses session models that evaluate consistency from the perspective of client and assume all client operations are sequential. It may use read-own-writes consistency model and monotonic read model that guarantees that you cannot read old value once you have seen new value. The monotonic write model guarantees that write of v2 follows write of v1. The write-follows-read ensures that writes ordered after writes that were observed by previous read operations. Eventual consistency propagates updates asynchronously and latest value is resolved using lat-write-wins or vector clocks. The eventually consistent systems provide parameters to tweak availability and consistency such as replication-factor (N), write-consistency (W) and read-consistency (R) and you can guarantee most recent value by using (R+W > N). You can optimize replication by grouping nodes into copy and witness subsets where witness replicas may store updates if copy replicas are running behind. The chapter ends with discussion of strong eventual consistency and CRDTs that are specialized data structures to guarantee consistency in any order. However, allowed operations have to be side-effect free, commutative, and causally ordered.

The chapter twelve discusses anti-entropy and dissemination of updates in context of broadcast, peer-to-peer and cooperative broadcast. The broadcast to all processes is expensive with large number of nodes and unreliable from a single process. The anti-entropy brings nodes back in sync in case of failures. Entropy measure disorder in the system and anti-entropy brings the nodes back up-to-date when delivery fails. The read repair detects and eliminate inconsistencies. It can be implemented as a blocking or asynchronous operation. Blocking read repairs ensures read monotonicity for quorum reads. Instead of issuing full read request from each node, the coordinator can issue one full read and send digest request to other replicas and then repair reads in case of inconsistencies. Another alternative is hinted-handoff, which is write-side repair mechanism where write coordinator stores hint record and replays to target node when it comes back. Some databases use sloppy quorum along with hinted-handoff where write operations use additional nodes that update crashed node when it comes back. Merkle Trees provide a compact hash representation of the local data. The replicas compare root-level hashes to check for inconsistency. Bitmap version vectors can also be used to resolve data conflict based on recency where logs of operations are kept on each node and are compared with other nodes and missing data is replicated to the target node. Gossip Dissemination use gossip protocols that are probabilistic communication procedure based on how rumors/diseases are spread. It use cooperative propagation to disseminate information where infective node spreads to susceptible nodes, which randomly update neighboring processes. Message redundancy metric is used to capture the overhead of repeated delivery and amount of time to reach convergence is called latency. Push/lazy-push multicast trees make a trade-off between epidemic and tree-based broadcast primitive by creating a spanning tree overlay of nodes to actively distribute messages with least overhead. It sends full message to a small subset of nodes and just message-id to rest and the node can query peer if it doesn’t have the data.

The chapter thirteen reviews distributed transactions. In order to make operations appear atomic, you may use atomic commitment algorithm that provides prepare, commit or rollback operations along with a transaction manager. For example, two-phase commit execute in two phases: prepare and commit/abort where a coordinator collects votes and rest of nodes called cohorts operate over disjoint datasets. In case of cohort failure, the coordinator will replicate decision values based on log. In case of coordinator failure, cohorts will not be able to learn the final decision. In order to make atomic commitment more robust against coordinator failure, three-phase commit adds extra step: propose, prepare and commit/abort. The transaction is aborted in case of coordinator failure or operation time-out. Next, the chapter reviews Calvin approach that uses deterministic transaction order to remove the need for coordination (as opposed to non-deterministic transaction in most databases that use two-phase or optimistic locking). For example, Calvin uses a sequencer that determines the order of transactions and establishes a global transaction input sequence and it may split time into epochs to minimize contention. The chapter discusses data partitioning and consistent hashing that map hashes to a ring and each node get its own position on the ring and becomes responsible for the range of values. If serializability is not required, you may use snapshot isolation that guarantees that all reads made within the same transaction are consistent with a snapshot of the database and only first committer wins when there is a write-write conflict. Lastly, the chapter discusses mechanisms to avoid coordination by preserving data integrity constraints.

The chapter fourteen discusses consensus that focus agreement, validity and termination (reach the decision). The chapter introduces concept of broadcast communication However, it may result in in-consistent state if the coordinator crashes while in the middle of broadcast. Atomic broadcast guarantee reliable delivery (atomicity) and total order. For example, virtual synchrony framework organizes processes into groups and messages to all its members are delivered in the same order. In Zookeeper atomic broadcast, a process takes the role of leader or follower and protocol splits timeline into epochs identified with monotonically increasing sequence number. The atomic broadcast is equivalent to consensus in asynchronous systems with crash failure. Paxos is commonly used algorithms that defines three roles: proposers, acceptors, and learners. It is split into two phases: voting (proposers compete for the leadership) and replication (proposer distributes values to acceptors). When acceptor receives prepare request, it can accept the proposal, respond with previously accepted message, notify proposer if local sequence number is higher. During replication phase, proposer can start the replication by sending Accept message to all acceptors. Paxos use quorum to make sure some participants can fail but still proceed as long as minimus number of votes required for the operation are available. Liveness is guaranteed in the presence of f failed processes and so that given 2f + 1 processes, f processes can fail and f + 1 processes can proceed. Multi-Paxos algorithm introduces role of a leader, a distinguished proposer to improve efficiency. The leader periodically contacts the participants to notify them it’s still alive with a lease timeout so that participants won’t select other leader until lease expires. Fast Paxos algorithm reduces a number of messages and let any proposer contact accepts directly rather than voting through the leader with total 3f + 1 processes. Egalitarian Paxos partitions the system into smaller segments and uses a leader for the commit of a specific command. Flexible Paxoes uses intersection of nodes that are used in propose and accept phase, .e.g given N participants, Q1 nodes for the propose phase to succeed and Q2 nodes for the accept phase to succeed, wen can ensure that Q1 + Q1 > N and Q2 can contain N/2 acceptors and Q1 = N – Q2 + 1. Next, the chapter discusses raft algorithm that makes concept of leader a first-class citizen that coordinates state machine manipulation and replication similar to atomic broadcast and Multi-Paxos that replicates multiple values instead of just one (a single leader makes atomic decisions and establishes message order). Each participant in Raft take the role of candidate, leader (for a term) and follower (similar to acceptor/learner). It divides time into terms/epochs to guarantee global partial order without relying on clock synchronization. Terms are monotonically increasing and each command is uniquely identified by the term number. During leader election, candidates send RequestVote message to other processes including candidate’s term and ID of the last log entry it observed. After collecting a majority of votes, the candidate is selected as the leader for the term. The Raft protocol uses periodic heartbeat to ensure the liveness of the participants and it may start new election after election timeout. The leader repeatedly append new values to the replicated log by sending AppendEntries message that include leader’s term, index and term of the log entry. A leader is elected only if it has the higher term ID than the follower. In case of split vote, Raft uses randomized timers to reduce the probability of multiple subsequent election ending up in a split vote. The leader sends heartbeat to the followers to detect failures and new election can be initiated if leader is down. The leader does not remove or reorder its log contents; it only appends new messages to it.

The chapter then reviews Byzantine consensus where distributed systems are deployed in adversarial environments that is prone to byzantine failures such as ill intentions, bugs, misconfiguration and data corruption. Most Byzantine consensus algorithms require N^2 messages to complete an algorithm step, where N is the size of the quorum. It discusses Practical Byzantine Fault Tolerance (PBFT) that assumes independent node failure but entire system cannot be taken over at once. All communication is encrypted and replicas know one another’s public keys to verify identities. PBFT guarantees both safety and liveness, no more than (n – 1) / 3 replicas can be faulty. For a system to sustain f compromised nodes, it is required to have at least n = 3f + 1 nodes. To distinguish between cluster configuration, PBFT uses view where in each view, one of the replica is a primary and the rest are backup. All nodes are numbered consecutively and the index of the primary node is v mod N where v is the view id and N is the number of nodes. The view can change when the primary fails. Clients execute their operations against the primary that broadcasts the request to the backup, which execute the request and send a response back to the client. The client waits for f + 1 replicas to respond with the same result for any operation to succeed. Replicas save accepted messages in a stable log and it is kept until it has been executed by at least f + 1 nodes. This log can be used for recovery in case of network partition but it is verified to prevent the attack vector. After every N requests, the primary makes a stable checkpoint, where it broadcasts the latest sequence number and waits for 2f+1 replicas to respond, which constitutes a proof for this checkpoint.

October 6, 2019

Production Deployment Best Practices

Filed under: Computing,Project Management,Technology — admin @ 1:37 pm

Following are a few best practices I have learned over the years for deployment to the production environment especially when working with multiple versions of the software on multiple data centers:

Coding/Debugging Best Practices

Naming Threads

When using Java language, you can name background threads so that logs can show meaningful thread-names and set your threads as a daemon so that they are automatically shutdown when the java process is terminated.

Resource Cleanup

In order to avoid any resource leaking, always close the resources that require explicit cleanup such as I/O streams, database connections, etc.

Backward / Forward Compatibility

When working with multiple versions of the software, it’s critical that all changes to the domain model are both backward and forward compatible so that new schema can be read by the old service and old schema can be read by new service. If possible, always deploy the changes that can read the new schema before deploying the changes that write the new schema.

Character Encoding

Always use UTF-8 for character encoding in your code and databases.

Time Zones

Always use UTC timezone for application code and databases.

Internationalization/Localization/Accessibility

Apply i18n, l10n and accessibility standards for your user interfaces and services.

Data Validation

NullPointerException and wrong formats are common causes of many production issues so always validate your model, input parameters and results for proper format and ranges.

Failure Cases

Think about all failure and edge cases that your code may run.

Operational Best Practices

Use multiple stages for testing

Test your changes in multiple stages such as QA, UAT, alpha, beta, gamma where you can properly bake and test your changes. The size of data and variation will increase with each stage so that you can test as close to the production data and environment as possible. The testing will include load and performance testing to detect any impact to latency and availability. Further, you can use canary testing to release changes to a subset of users or data centers so that you can compare impact of changes before rolling out to all customers. If you are using multiple regions for deployment, you can start with a smaller or low-risk region for initial production release.

Check Calendar before the release

You can check calendar for any holidays or major changes to the infrastructure that might impact the release.

Automate Automate Automate

Avoid any manual changes to the build/deployment process.

Infrastructure as a code

Apply best practices and tools from infrastructure as a code so that you are applying all changes consistently.

Error Logging

Use appropriate level for production logs and log all exceptions including stack traces. You can log additional input parameters but ensure they don’t include any sensitive data.

Deploy domain objects and data access code together

In order to avoid any schema mismatch errors, always deploy both domain object and data access changes together. In addition, you can ignore unknown properties to the model to make these changes forward compatible. If domain and data access changes cannot be released together then release domain changes first and then release data access changes.

Plan for Rollback

Always test for rollback your changes and include multiple scenarios if changes were released in staggered mode where domain changes and data access changes were published separately. You can implement automated rollbacks when releasing changes incrementally so that new changes are immediately blocked before going to the next stage of testing.

SLA/SLO

Monitor SLA/SLO at each stage of the release so that you can fix any violation in these metrics.

Change Management

Apply best practices from change management to track any high-risk or major changes to the production environment.

Security Policies

Apply best practices from zero-trust security practices and principles of a lease privilege in all test and production environments.

Health Checks / Observability

Collect metrics for health checks, failure rates, usage, etc so that you can be immediately notified when you see a suspicious or a peculiar activity.

Chaos Testing

Apply chaos testing and game days to simulate various failure conditions to verify the behavior of your services and your reaction to those failures.

Integrate/Deploy often with small changes

Continuously integrate and deploy small changes to reduce the risk with major changes so that you can test your changes in production environment safely.

Feature Flags and Dark Launch

Use feature flags judiciously to launch features that are not yet available to all users so that you can test those changes for a subset of users.

July 12, 2019

Review of “Designing Data Intensive Applications”

Filed under: Computing,Technology — admin @ 4:38 pm

Designing Data-Intensive Applications is one of best resource for building scalable systems. The book is full of comprehensive material on data processing and following is summary of essential lessons from the book:

The first four chapter covers foundation of data systems and the first chapter starts with basic terminology of reliability, scalability, maintainability, Operability, simplicity and evolvability. It recommends measuring latencies using percentile and defining SLO/SLA.

The second chapter reviews data model such as relational model, document/ graph model, etc. It reviews differences in query model SQL, graph queries, and SPARQL and compares them in terms of schema evolution and performance.

The third chapter introduces basic concepts of storage for relational and NoSQL databases. It shows how you can use hash indexes for looking up key-value data, which can be enhanced with compaction. It further reviews SSTable and LSM-tree structures. The Sorted-String Table or SSTable keeps key-value in sorted fashion where each key only occurs once within each merged segment file. These segments are later merged using merge sort and written to disk. The Log-Structured Merge-Tree (LSM tree) is used to merge and compact sorted files. In order to reduce lookup time, bloom filter can be used to ensure key exists in the database. The relational databases use B-Tree data structure that break the database into fixed size pages and each page is identified by address that are stored in a tree structure. The B-tree use write-ahead log (WAL) to persist new data before updating B-tree data structure. Another form of index is multi-column index that combines several fields into one key. The  chapter then differentiates between OLTP and OLTP and using star (fact table surrounded by its dimension tables) and snowflakes (dimensions are broken down into sub dimensions) schema for analytics. Column-oriented storage can also be used for data warehouse that leads to better compression, CPU cache usage and vectorized processing. Some data warehouse use materialized aggregates or data cube that provides aggregates grouped by different dimensions.

The fourth chapter covers encoding and serialization schemes such as JSON, XML and binary encoding including MessagePack and Avro. It reviews binary protocols of Thrift and Protocol Buffers. Further, it provides support of schema evolution in these protocols for backward and forward compatibility. It describes data flow via network exchange using SOAP, REST/RPC message brokers and distributed actor frameworks.

The fifth chapter is part of second part of the book that focuses on distributed data with emphasis on scaling and shared-nothing architecture. The fifth chapter describes replication and defines leaders and followers. It starts with basic leader-based replication such as active/passive or master/slave where writes go through leader and reads can be served by any node. The replication can be synchronous, asynchronous or semi-synchronous. Though, synchronous replication has performance issues but research in chain replication provides good performance, which is used by Azure storage. The chapter reviews scenarios of leader or follower node failure, which may require election of new leader but can be subjected to loss of data that wasn’t replicated from the old leader. The replication can use statement-based replication, shipping WAL logs, logical (row-based) replication or trigger-based replication, where former can have side-effects due to triggers/non-deterministic behavior. The version mismatch may cause incompatibilities with WAL based replication. Asynchronous cause effects of eventual consistency where latest reads are not fully replicated so you need read-after-write consistency that can be addressed using read-your-writes consistency, e.g. reading from the leader or remembering timestamp of last write. The lag in replication can display load data after showing new data and monotonic read consistency addresses that behavior. Consistent prefix read guarantee sequence of writes order is preserved when reading those writes. When replicating with multi-leaders across data-centers can have higher lag time between replication and may result in write conflicts between leaders. You may associate users to a specific leader based on location or resolve these conflicts using timestamps or higher unique id. Other forms of replication uses leaderless or quorum based consistency. Some implementation use asynchronous read-repair to fix stale data and anti-entropy process to add any missing data. The quorum based consistency uses odd number of nodes with w = r = ceil(n+1)/2, though it can also be subjected to stale values (with sloppy quorum and hinted handoff) or loss of data (last writer wins). The quorum based nodes may use version number for every key to preserve order of write (version vectors).

The chapter six covers partitioning works with replication so that each node might have more than one partition. Some partitions may be skewed having more data than others, also referred as hot spot. You can partition based on key-range or hash of key, where hashing may use consistent hashing to distribute keys fairly. The partitioning data also requires partitioning secondary indexes or maintaining a global index that covers all partitions by term. The partitioning may also require re-balancing where you may define 100 times more partition than nodes so that data is not only partially relocated. When reading data requires routing to nodes where client may contact any node directly, which forwards to other node if needed; send all requests to a routing node; or client is aware of partitioning and contacts appropriate node.

The chapter seven describes transactions and defines meaning of ACID, however it cautions against failures due to asynchronous writes, caching, disk failures (despite fsync), etc. The relational databases use transaction scope to write multiple objects atomically and in isolation and other databases use compare-and-set operations to prevent lost updates. Due to high cost of serializable transactions, most databases provide weak isolation level when running multiple transactions concurrently. In order to prevent concurrency bugs, database provide transaction isolation including read-committed for preventing dirty reads/writes that may use row-level locks and keeping copies of old data; snapshot isolation/repeatable read that prevents read skew (non repeatable reads) using reader/write locks and MVCC. Relational databases provide explicit locking using SELECT…FOR UPDATE to prevent lost updates, other databases use compare-and-set to avoid lost update. These transaction isolation can still lead to write skew and phantoms that can be prevented using SELECT FOR UPDATE, e.g. meeting reservation, choosing username, preventing double spending. The serializable isolation provides strongest guarantees but it’s not provided in most databases and has limitations with partitioned data. Two-phase locking can be used for serializable isolation but it suffers performance issues and can lead to deadlocks. Another alternative is predicate locks that locks all objects matching criteria but they also suffer from performance issues. Other alternatives include index-range locks and serializable snapshot isolation (SSI) that uses optimistic concurrency controls.

The chapter eight discusses network faults and failures in distributed systems. The chapter covers cloud computing, supercomputer where nodes communicate through RDMA/shared memory. These failures are common in most systems and can be detected by load balancer or monitoring system. Partial failures are hard to detect and you may use timeout to detect failures. Distributed systems may use monotonic clocks (System.nanoTime) or time-of-date (NTP) clocks, however unreliable clocks can make measuring time in distributed systems error prone. The NTP synchronization is not always reliable and drift in time-of-clock may result in incorrect order of events and last-write-win strategy may overwrite data with old value. Google TrueTime API uses confidence interval with clock time, e.g. Spanner uses clock confidence interval for snapshot isolation to create global monotonic increasing transaction ID. When scheduling task periodically, check process delay due to GC, virtualization, disk I/O, etc. When using a lock or lease, fencing ensure there is only one leader. When the lock server grants a lock/lease, it returns fencing token (monotonic number) and client includes it with each request so old requests are rejected. However, fencing token cannot prevent against Byzantine faults (deliberate faults).

The chapter nine explains consistency and consensus. Most replicated database provides eventual consistency, which is weak consistency. Linearizability is strongest consistency that makes system appear as if there is a single copy of the data and reads cannot return old data if it previously returned new data. Linearizability may use compare-and-set operation to prevent data overwrite. The chapter also distinguishes Linearizability with Serializability that is isolation property of transaction that guarantees serial order of transactions, where Linearizability guarantees recent data after read/write and doesn’t prevent write skew. Leader election, distributed locks, unique constraints such as username may use Linearizability to come up with a single up-to-date value. Simplest way to have linearizable systems is to keep a single copy of the data but you need replication for fault tolerance system. You can use single-leader replication where all writes go to leader or consensus algorithm but multi-leader and leaderless replication (dynamo style) don’t provide linearizable guarantees. The chapter then describes CAP theorem, where consistency (C) relates to linearizability and you give up availability if some replicas are disconnected and wait until replicas are fixed. On the other hand, a replica can remain available even if it’s disconnected from other replicas to provide availability at the cost of linearizability. Also, CAP theorem only considers one kind of fault – network partition or nodes that are alive but disconnected from each other so most highly available systems don’t meet CAP definition. Though, causal order can define what happened before what but it’s not total order guaranteed by linearizability and sets can only be partially ordered. Linearizability is stronger than causality but most systems only need causality consistency that show what operation happened before other operation. Sequence number or timestamp (logical clock) can generate sequence number to identify order of operations, which can be incremented by the single leader. Other systems can preallocate blocks of sequence, attach timestamp or generate local sequence number but they are not consistent with causality. The chapter then reviews Lamport timestamps (logical) that enforce order (distinct from version vectors). You need a leader to sequence all operations on a single CPU to guarantee total order broadcast (atomic), but it’s not scalable. A leader per partition can maintain ordering per partition but it doesn’t guarantee total ordering. Total order broadcast requires reliable delivery and totally ordered order (same order to all nodes) and it can be used to implement serializable transaction. It can also be used for implementing lock service that provides fencing tokens. You can implement this by appending message to log, reading the log and waiting for the message delivered (same order for all nodes) back to verify the unique identifier such as username. But it only guarantees linearizable writes and you must sequence reads only after message is delivered back to you to guarantee linearizable reads.

Next part of chapter nine describes consensus, where you have to agree on leader after election (in presence of network faults while avoiding split brain) and atomic commits (as in ACID). Despite FLP that proved no algorithm can reach consensus if a node can crash, distributed systems can achieve consensus. The chapter reviews Two-phase commit (2PC) that involves multiple nodes as opposed to a single node that uses logs the data before committing it. The 2PC uses coordinator/transaction manager that creates globally unique tx-id and tracks two phases: prepare and commit. The coordinator must write transactions in logs before commit in case the coordinator crashes while nodes may have to wait for coordinator indefinitely. Three-phase commit assumes bounded delay/timeout to prevent blocking atomic commits. The chapter then distinguishes between internal database and distributed transactions, where distributed transactions guarantee exactly-once processing atomically such as XA transactions. On the downside, coordinator in distributed transactions would be a single point of failure and they limit scalability. The fault tolerant consensus requires uniform agreement, integrity, validity, and termination to agree on same value. The best algorithm that provides fault tolerant consensus include VSR, Paxos, Raft and Zab that uses total order broadcast algorithms that requires messages be delivered exactly once in the same order to all nodes. These algorithms use epoch numbers (monotonically increasing) for each election and quorum is used to agree on the value. There are a few limitations of consensus such as synchronous replication, majority voting (minimum 3 nodes), static membership, continuous re-election due to partial failures. Some of implementations include Zookeeper and etc that provides linearizable atomic operations using compare and set to implement locks; total ordering of operations using fencing token (monotonically increasing); failure detection; change modifications.

The chapter ten describes batch processing and is part of third part of the book that covers data derived from system of record for OLAP and reporting. The batch processing is offline processing as opposed to real-time services or near-real-time stream processing. The chapter starts with basic Unix tools that uses pipes and files for batch processing. It then reviews MapReduce such as Hadoop and distributed file systems such as HDFS, GlusterFS, QFS. The mapper extracts key/value from input records and generate a number of key-value pairs, whereas reducer takes key-value pairs and collects all values belonging to the same key. The scheduler uses principle of “putting the computation near the data” to run mapper on the machines with replica of input file. MapReduce often requires workflow systems to manage dependencies such as Oozie, Azkaban, Luigi, Airflow, Pinball. These frameworks use groups to merge or collate related data. However, you can use skewed join when group data is very large to fit on a single machine. where work can be parallelized on multiple reducers. Mappers can use broadcast hash joins and partitioned joins when working with large data. The chapter also discusses data flow engines like Spark, Tea, Flink that supports operators for providing more flexible way to create data pipeline. It also reviews graph processing systems such as Apache Graph, Spark GraphX that supports bulk synchronous parallel (BSP) model of computation, which sends messages from one vertex to all connecting vertices similar to actor model if you think of each vertex as actor.

The chapter eleven discusses stream processing for providing near real-time processing. The stream processing transmit event streams using messaging systems and pub/sub model. The messaging system may use UDP, brokerless libraries such as ZeroMQ or message brokers with queue support. The message brokers support load balancing when a single message is delivered to one of consumer and share the work (shared subscription). Alternatively, they use fan-out where message is delivered to all consumers. Message brokers use ack to remove the message when it’s processed by consumer. The broker delivers the message again in event of connection failure using atomic commit protocol. Some message brokers such as Apache Kafka use append-only logs to add incoming messages and these logs can be partitioned where each message uses monotonically increasing sequence number (without ordering gurantee). The consumers read files sequentially by specifying offset. There are some limitations of these log based brokers, e.g. number of nodes sharing the work can be at most the number of log partition in that topic and if a single message is slow to process, it holds up processing in that partition. Some implementations may use circular buffer to store messages on disk but if consumers cannot keep with producers, it may drop old messages. The chapter discusses change data capture for syncing data that capture the changes in the database and apply same changes to search index. You can also use compact logs for syncing the data from offset 0 and scan over all messages. The chapter reviews event sourcing that stores all changes to the application state as a log of change events. The event source distinguishes between event and commands and after command is validated, it becomes an event that is durable and immutable. The immutable data is related to command query responsibility serration principle.

The last chapter reviews future of data systems and limits of total order that may require a single leader but it can be difficult in distributed data-center and micro services. You can send all updates to the same partition but that is not sufficient to capture causal dependency. The chapter covers lambda architecture and unbundling of databases including storage technologies such as secondary indexes, replication logs, text search indexes. It reviews exactly-once execution of operation, duplicate suppression and operation identifier (using request-id to make operation idempotent).

« Newer PostsOlder Posts »

Powered by WordPress