Designing Data-Intensive Applications is one of best resource for building scalable systems. The book is full of comprehensive material on data processing and following is summary of essential lessons from the book:
The first four chapter covers foundation of data systems and the first chapter starts with basic terminology of reliability, scalability, maintainability, Operability, simplicity and evolvability. It recommends measuring latencies using percentile and defining SLO/SLA.
The second chapter reviews data model such as relational model, document/ graph model, etc. It reviews differences in query model SQL, graph queries, and SPARQL and compares them in terms of schema evolution and performance.
The third chapter introduces basic concepts of storage for relational and NoSQL databases. It shows how you can use hash indexes for looking up key-value data, which can be enhanced with compaction. It further reviews SSTable and LSM-tree structures. The Sorted-String Table or SSTable keeps key-value in sorted fashion where each key only occurs once within each merged segment file. These segments are later merged using merge sort and written to disk. The Log-Structured Merge-Tree (LSM tree) is used to merge and compact sorted files. In order to reduce lookup time, bloom filter can be used to ensure key exists in the database. The relational databases use B-Tree data structure that break the database into fixed size pages and each page is identified by address that are stored in a tree structure. The B-tree use write-ahead log (WAL) to persist new data before updating B-tree data structure. Another form of index is multi-column index that combines several fields into one key. The chapter then differentiates between OLTP and OLTP and using star (fact table surrounded by its dimension tables) and snowflakes (dimensions are broken down into sub dimensions) schema for analytics. Column-oriented storage can also be used for data warehouse that leads to better compression, CPU cache usage and vectorized processing. Some data warehouse use materialized aggregates or data cube that provides aggregates grouped by different dimensions.
The fourth chapter covers encoding and serialization schemes such as JSON, XML and binary encoding including MessagePack and Avro. It reviews binary protocols of Thrift and Protocol Buffers. Further, it provides support of schema evolution in these protocols for backward and forward compatibility. It describes data flow via network exchange using SOAP, REST/RPC message brokers and distributed actor frameworks.
The fifth chapter is part of second part of the book that focuses on distributed data with emphasis on scaling and shared-nothing architecture. The fifth chapter describes replication and defines leaders and followers. It starts with basic leader-based replication such as active/passive or master/slave where writes go through leader and reads can be served by any node. The replication can be synchronous, asynchronous or semi-synchronous. Though, synchronous replication has performance issues but research in chain replication provides good performance, which is used by Azure storage. The chapter reviews scenarios of leader or follower node failure, which may require election of new leader but can be subjected to loss of data that wasn’t replicated from the old leader. The replication can use statement-based replication, shipping WAL logs, logical (row-based) replication or trigger-based replication, where former can have side-effects due to triggers/non-deterministic behavior. The version mismatch may cause incompatibilities with WAL based replication. Asynchronous cause effects of eventual consistency where latest reads are not fully replicated so you need read-after-write consistency that can be addressed using read-your-writes consistency, e.g. reading from the leader or remembering timestamp of last write. The lag in replication can display load data after showing new data and monotonic read consistency addresses that behavior. Consistent prefix read guarantee sequence of writes order is preserved when reading those writes. When replicating with multi-leaders across data-centers can have higher lag time between replication and may result in write conflicts between leaders. You may associate users to a specific leader based on location or resolve these conflicts using timestamps or higher unique id. Other forms of replication uses leaderless or quorum based consistency. Some implementation use asynchronous read-repair to fix stale data and anti-entropy process to add any missing data. The quorum based consistency uses odd number of nodes with w = r = ceil(n+1)/2, though it can also be subjected to stale values (with sloppy quorum and hinted handoff) or loss of data (last writer wins). The quorum based nodes may use version number for every key to preserve order of write (version vectors).
The chapter six covers partitioning works with replication so that each node might have more than one partition. Some partitions may be skewed having more data than others, also referred as hot spot. You can partition based on key-range or hash of key, where hashing may use consistent hashing to distribute keys fairly. The partitioning data also requires partitioning secondary indexes or maintaining a global index that covers all partitions by term. The partitioning may also require re-balancing where you may define 100 times more partition than nodes so that data is not only partially relocated. When reading data requires routing to nodes where client may contact any node directly, which forwards to other node if needed; send all requests to a routing node; or client is aware of partitioning and contacts appropriate node.
The chapter seven describes transactions and defines meaning of ACID, however it cautions against failures due to asynchronous writes, caching, disk failures (despite fsync), etc. The relational databases use transaction scope to write multiple objects atomically and in isolation and other databases use compare-and-set operations to prevent lost updates. Due to high cost of serializable transactions, most databases provide weak isolation level when running multiple transactions concurrently. In order to prevent concurrency bugs, database provide transaction isolation including read-committed for preventing dirty reads/writes that may use row-level locks and keeping copies of old data; snapshot isolation/repeatable read that prevents read skew (non repeatable reads) using reader/write locks and MVCC. Relational databases provide explicit locking using SELECT…FOR UPDATE to prevent lost updates, other databases use compare-and-set to avoid lost update. These transaction isolation can still lead to write skew and phantoms that can be prevented using SELECT FOR UPDATE, e.g. meeting reservation, choosing username, preventing double spending. The serializable isolation provides strongest guarantees but it’s not provided in most databases and has limitations with partitioned data. Two-phase locking can be used for serializable isolation but it suffers performance issues and can lead to deadlocks. Another alternative is predicate locks that locks all objects matching criteria but they also suffer from performance issues. Other alternatives include index-range locks and serializable snapshot isolation (SSI) that uses optimistic concurrency controls.
The chapter eight discusses network faults and failures in distributed systems. The chapter covers cloud computing, supercomputer where nodes communicate through RDMA/shared memory. These failures are common in most systems and can be detected by load balancer or monitoring system. Partial failures are hard to detect and you may use timeout to detect failures. Distributed systems may use monotonic clocks (System.nanoTime) or time-of-date (NTP) clocks, however unreliable clocks can make measuring time in distributed systems error prone. The NTP synchronization is not always reliable and drift in time-of-clock may result in incorrect order of events and last-write-win strategy may overwrite data with old value. Google TrueTime API uses confidence interval with clock time, e.g. Spanner uses clock confidence interval for snapshot isolation to create global monotonic increasing transaction ID. When scheduling task periodically, check process delay due to GC, virtualization, disk I/O, etc. When using a lock or lease, fencing ensure there is only one leader. When the lock server grants a lock/lease, it returns fencing token (monotonic number) and client includes it with each request so old requests are rejected. However, fencing token cannot prevent against Byzantine faults (deliberate faults).
The chapter nine explains consistency and consensus. Most replicated database provides eventual consistency, which is weak consistency. Linearizability is strongest consistency that makes system appear as if there is a single copy of the data and reads cannot return old data if it previously returned new data. Linearizability may use compare-and-set operation to prevent data overwrite. The chapter also distinguishes Linearizability with Serializability that is isolation property of transaction that guarantees serial order of transactions, where Linearizability guarantees recent data after read/write and doesn’t prevent write skew. Leader election, distributed locks, unique constraints such as username may use Linearizability to come up with a single up-to-date value. Simplest way to have linearizable systems is to keep a single copy of the data but you need replication for fault tolerance system. You can use single-leader replication where all writes go to leader or consensus algorithm but multi-leader and leaderless replication (dynamo style) don’t provide linearizable guarantees. The chapter then describes CAP theorem, where consistency (C) relates to linearizability and you give up availability if some replicas are disconnected and wait until replicas are fixed. On the other hand, a replica can remain available even if it’s disconnected from other replicas to provide availability at the cost of linearizability. Also, CAP theorem only considers one kind of fault – network partition or nodes that are alive but disconnected from each other so most highly available systems don’t meet CAP definition. Though, causal order can define what happened before what but it’s not total order guaranteed by linearizability and sets can only be partially ordered. Linearizability is stronger than causality but most systems only need causality consistency that show what operation happened before other operation. Sequence number or timestamp (logical clock) can generate sequence number to identify order of operations, which can be incremented by the single leader. Other systems can preallocate blocks of sequence, attach timestamp or generate local sequence number but they are not consistent with causality. The chapter then reviews Lamport timestamps (logical) that enforce order (distinct from version vectors). You need a leader to sequence all operations on a single CPU to guarantee total order broadcast (atomic), but it’s not scalable. A leader per partition can maintain ordering per partition but it doesn’t guarantee total ordering. Total order broadcast requires reliable delivery and totally ordered order (same order to all nodes) and it can be used to implement serializable transaction. It can also be used for implementing lock service that provides fencing tokens. You can implement this by appending message to log, reading the log and waiting for the message delivered (same order for all nodes) back to verify the unique identifier such as username. But it only guarantees linearizable writes and you must sequence reads only after message is delivered back to you to guarantee linearizable reads.
Next part of chapter nine describes consensus, where you have to agree on leader after election (in presence of network faults while avoiding split brain) and atomic commits (as in ACID). Despite FLP that proved no algorithm can reach consensus if a node can crash, distributed systems can achieve consensus. The chapter reviews Two-phase commit (2PC) that involves multiple nodes as opposed to a single node that uses logs the data before committing it. The 2PC uses coordinator/transaction manager that creates globally unique tx-id and tracks two phases: prepare and commit. The coordinator must write transactions in logs before commit in case the coordinator crashes while nodes may have to wait for coordinator indefinitely. Three-phase commit assumes bounded delay/timeout to prevent blocking atomic commits. The chapter then distinguishes between internal database and distributed transactions, where distributed transactions guarantee exactly-once processing atomically such as XA transactions. On the downside, coordinator in distributed transactions would be a single point of failure and they limit scalability. The fault tolerant consensus requires uniform agreement, integrity, validity, and termination to agree on same value. The best algorithm that provides fault tolerant consensus include VSR, Paxos, Raft and Zab that uses total order broadcast algorithms that requires messages be delivered exactly once in the same order to all nodes. These algorithms use epoch numbers (monotonically increasing) for each election and quorum is used to agree on the value. There are a few limitations of consensus such as synchronous replication, majority voting (minimum 3 nodes), static membership, continuous re-election due to partial failures. Some of implementations include Zookeeper and etc that provides linearizable atomic operations using compare and set to implement locks; total ordering of operations using fencing token (monotonically increasing); failure detection; change modifications.
The chapter ten describes batch processing and is part of third part of the book that covers data derived from system of record for OLAP and reporting. The batch processing is offline processing as opposed to real-time services or near-real-time stream processing. The chapter starts with basic Unix tools that uses pipes and files for batch processing. It then reviews MapReduce such as Hadoop and distributed file systems such as HDFS, GlusterFS, QFS. The mapper extracts key/value from input records and generate a number of key-value pairs, whereas reducer takes key-value pairs and collects all values belonging to the same key. The scheduler uses principle of “putting the computation near the data” to run mapper on the machines with replica of input file. MapReduce often requires workflow systems to manage dependencies such as Oozie, Azkaban, Luigi, Airflow, Pinball. These frameworks use groups to merge or collate related data. However, you can use skewed join when group data is very large to fit on a single machine. where work can be parallelized on multiple reducers. Mappers can use broadcast hash joins and partitioned joins when working with large data. The chapter also discusses data flow engines like Spark, Tea, Flink that supports operators for providing more flexible way to create data pipeline. It also reviews graph processing systems such as Apache Graph, Spark GraphX that supports bulk synchronous parallel (BSP) model of computation, which sends messages from one vertex to all connecting vertices similar to actor model if you think of each vertex as actor.
The chapter eleven discusses stream processing for providing near real-time processing. The stream processing transmit event streams using messaging systems and pub/sub model. The messaging system may use UDP, brokerless libraries such as ZeroMQ or message brokers with queue support. The message brokers support load balancing when a single message is delivered to one of consumer and share the work (shared subscription). Alternatively, they use fan-out where message is delivered to all consumers. Message brokers use ack to remove the message when it’s processed by consumer. The broker delivers the message again in event of connection failure using atomic commit protocol. Some message brokers such as Apache Kafka use append-only logs to add incoming messages and these logs can be partitioned where each message uses monotonically increasing sequence number (without ordering gurantee). The consumers read files sequentially by specifying offset. There are some limitations of these log based brokers, e.g. number of nodes sharing the work can be at most the number of log partition in that topic and if a single message is slow to process, it holds up processing in that partition. Some implementations may use circular buffer to store messages on disk but if consumers cannot keep with producers, it may drop old messages. The chapter discusses change data capture for syncing data that capture the changes in the database and apply same changes to search index. You can also use compact logs for syncing the data from offset 0 and scan over all messages. The chapter reviews event sourcing that stores all changes to the application state as a log of change events. The event source distinguishes between event and commands and after command is validated, it becomes an event that is durable and immutable. The immutable data is related to command query responsibility serration principle.
The last chapter reviews future of data systems and limits of total order that may require a single leader but it can be difficult in distributed data-center and micro services. You can send all updates to the same partition but that is not sufficient to capture causal dependency. The chapter covers lambda architecture and unbundling of databases including storage technologies such as secondary indexes, replication logs, text search indexes. It reviews exactly-once execution of operation, duplicate suppression and operation identifier (using request-id to make operation idempotent).