Scaling the Big Peer Store
The Ditto Big Peer may appear like any another peer in a mesh, but it is different. It is a distributed database that is made up of a number of components, including:
- Subscription Servers that communicate with Small Peer meshes using the Ditto Replication Protocol
- API Servers that host the HTTP API
- A distributed transaction log
- A partitioned and replicated distributed database: the Big Peer Store
- A Change Data Capture (CDC) system that monitors changes and provides them to third-party systems, either through pre-defined Kafka topics or WebHooks.
This makes the Big Peer both a system of record with data at rest, as well as a a conduit of data from existing data systems, into the mesh, and back out to existing systems.
This article focuses on the Big Peer Store, specifically how the Store scales up and down, while continuing to provide transactional causal consistency.
The Big Peer Store
To understand how we can scale the Big Peer Store while maintaining functionality, it is first necessary to describe how it works. What follows describes the key primitives of its design and shows how we scale the system using transitions.
The Big Peer Store is inspired by the paper PaRiS: Causally Consistent Transactions with Non-blocking Reads and Partial Replication.
Partitioned
The Big Peer is a distributed database that stores Documents as CRDTs. It can store more documents than can fit on a cheap commodity disk, and so it partitions its state by storing subsets of documents on multiple Store Nodes.
To ensure a balanced partitioning mechanism, the Store uses Random Slicing, partitioning the available keyspace using the range of integers from 0 to 2^64 − 1. Each partition of the Store is responsible for multiple sub-ranges of this keyspace, called Intervals. These sub-ranges don't need to be contiguous, as random slicing uses the Cut-Shift algorithm, which might take sections of pre-existing partitions and use them to make new partitions. This ensures that the minimal amount of data is moved when creating partitions.
Using this strategy, each document identifier is hashed to produce an u64
, which in turn
is mapped to a specific Interval, stored by a particular Store node.
Replicated
The other axis of distribution for the Big Peer Store is
replication. Each partition is fully replicated, resulting in storing
multiple copies of each document. For simplicity's sake, we can say that every
Big Peer Store cluster is made of P
partitions replicated on R
Store
nodes. For example, a cluster of 3 partitions with 2 replicas will
have 6 store nodes.
The Log, Transactions, and Transaction Timestamps
All data enters the Big Peer Store via the Transaction Log. Each
transaction may contain updates, inserts, and deletes for many
documents. The log is totally ordered, where each transaction gets
assigned a position in the log (the first transaction gets position 1
, the second 2
, and so on).
In the Big Peer Store these log positions become Transaction Timestamps, such that the logical
time of a transaction is represented by a single scalar value.
Each Big Peer Store Node consumes from the log, sequentially and independently. Although a transaction may affect documents that are owned by different partitions, each Store Node will only store modifications for the documents that it owns. Independently of this, each Store Node records the fact that it has seen a given transaction by storing the Transaction Timestamp in its metadata.
Periodically the Store Nodes gossip to each other the maximum Transaction Timestamp that they have committed to disk.
Universally Stable Timestamp (UST)
The Universally Stable Timestamp (UST) represents the timestamp of the latest transaction that is guaranteed to be stored (stable) at every node, meaning any read at that timestamp will be consistent. The UST is calculated by computed the minimum of the latest stored transaction, for every Store Node in the system.
NOTE: We actually use a subset of the Store Nodes to calculate a stable timestamp, but explaining how detracts from the purpose of the article.
Store Versions
Each Transaction Timestamp can be thought of as a version of the
database, meaning that a store node can contain multiple versions for a given document.
This means that if a read starts when the UST is 100
, it must be allowed to complete consistently,
alongside another read at a UST of 200
.
Garbage Collection Timestamp
Keeping all versions of all documents forever is prohibitively expensive. Instead, the Big Peer Store Nodes independently perform garbage collection of document versions that will never be read again.
The Garbage Collection Timestamp (GC Timestamp) is much like the UST. When a read transaction is served by a Store Node, it gets assigned a Read Timestamp equal to the UST at that node. Each Store Node gossips their lowest active Read Timestamp, and in turn computes the GC Timestamp by taking the minimum of all read transaction timestamps. Any version below the GC Timestamp can be discarded as that version will never be read again. For a given Store Node with no in-flight reads, its local GC Timestamp will be equal to its view of the UST.
One can imagine a sliding window of visible versions, with the lower edge being the GC Timestamp, and the upper edge the UST.
Cluster Configurations
As described above, the Big Peer Store is made up of Store Nodes, each being a replica of a partition. The partitions themselves are made up of intevals of the keyspace. All this information is stored in the Cluster Configuration, which aids in mapping Intervals to partitions, and partitions to store nodes.
Epochs and Configuration-aware Read Timestamps
Each Cluster Configuration has a unique monotonically increasing identifier,
starting from 0
(the empty cluster configuration). This identifier is also
called the Cluster Configuration Epoch, or Epoch for short.
Recall that we described Read Timestamps earlier as a single scalar. We now augment this concept by making read transactions Configuration-aware, such that a Read Timestamp is pair of Epoch and Transaction Timestamp (UST). Only nodes in the current Cluster Configuration co-ordinate read transactions.
This pair of scalars, rising monotonically, are used to manage cluster Transitions.
Transitions
Transitions are the process by which the Big Peer Store moves from the Current Cluster Configuration to the Next Cluster Configuration, allowing the Big Peer Store to scale up and down. The number of partitions and replicas in a Cluster Configuration describe the size and shape of the Big Peer Store.
For example, assume that the Big Peer Store is deployed with a single partition and two replicas (thus two Big Peer Store Nodes running).
To scale up the deployment, we add a new partition. This means that half
the documents that were owned by partition 1 (P1
) will now be
owned by partition 2 (P2
). The Transition must be performed in such
a way that writes and reads progress, and all queries remain causally
consistent.
Using this concrete example (1P*2R -> 2P*2R
) we can name the Big
Peer Store Nodes P1R1
and P1R2
for the two original nodes and
P2R1
and P2R2
for the two new nodes. The Current Cluster Configuration
contains one partition made up of a single Interval covering
the whole keyspace, while the Next Cluster Configuration contains two partitions:
P1
is made up of an Interval of the top half of the keyspace, and P2
is the
Interval of the bottom half of the keyspace.
Backfill
The first stage of a Transition is Backfill. In our example we can
start P2R1
and P2R2
as new Big Peer Store Nodes, providing them
with the Current Cluster Configuration and the Next Cluster
Configuration. We also let P1R1
and P1R2
know about the Next Cluster Configuration,
so that they too know about the Transition.
As soon as P2R1
and P2R2
are brought online, they will start consuming from
the end of the transaction log. Using the Current Cluster Configuration, they can
send a Backfill Query to the current owners of any data they will own in the Next Cluster
Configuration. In this example, P2R1
and P2R2
send backfill queries to P1R1
and P1R2
,
asking for the data from the bottom half of the keyspace.
Ownership and Gossip
During the Transition, Store Nodes gossip their view of the cluster to the union of all Store Nodes from both Cluster Configurations. Store Nodes also consume and store Transaction data that they might own in either the Current or Next Cluster Configurations.
In this example P1R1
and P1R2
continue to store transactions for
the whole keyspace during the transition.
New UST and Routing Configuration
The UST is calculated within the scope of a Cluster Configuration. The UST for the Next Cluster Configuration cannot be calculated until all Store Nodes in that Configuration are backfilled. Once Backfill is complete, the Store Nodes compare the UST in the Next Cluster Configuration with the UST in the Current Cluster Configuration. When the Next Cluster Configuration UST is equal or greater to the Current Cluster Configuration UST, it means that the Store Nodes in the Next Cluster Configuration have all the data they need, and can serve queries. A Store Node in the Current Cluster Configuration can now route queries to the Store Nodes in the Next Cluster Configuration. Each Store Node makes these calculations independently, based on received gossip messages.
In order to ensure that a Transition eventually completes, the UST in the Current Configuration stops rising once all Store Nodes in the Next Cluster Configuration have Backfilled. Since the new Store Nodes begin consuming from the Transaction Log before they begin to backfill, the UST in the Next Cluster Configuration should immediately match that of the Current Cluster Configuration. However, this might not be the case when we scale the Big Peer to a much larger number of nodes, given that the gossip time can slow the perceived rate of UST rise.
Installing new Cluster Configurations
As described above, read transactions occur at a given Read Timestamp, which is a pair of scalars, made up of the Cluster Configuration Epoch and the UST. The GC Timestamp is the locally calculated minimum of all Read Timestamps received by gossip. This means that each Store Node can independently calculate when all Store Nodes in the Current Cluster Configuration are routing queries in the Next Cluster Configuration, because all Read Timestamps are in the Next Cluster Configuration Epoch.
At this point, any of the Current Configuration Store Nodes can declare that the Transition is complete, and that the Next Cluster Configuration is installable as the Current Cluster Configuration.
When scaling down, nodes that are no longer in the Current Cluster Configuration can be shut down.
Garbage Collection
Garbage Collection also checks for ownership. Any data not owned by
the Store Node will be deleted. In the example above that means P1R1
and P1R2
can delete all the data from the bottom half of the
keyspace that is now owned, and safely stored, by P2R1
and P2R2
.
Summary
This post described the process for transitioning a Big Peer Store cluster for scale up purposes. It describes the process in terms of two primitives: Transaction Timestamps and Cluster Configuration Epochs. This pair of scalars are composed to make the UST, Read Timestamps, and the GC Timestamp. Store Nodes gossip to each other about their Read Transactions and their latest consumed transaction, which allows each Store Node to independently calculate the progress of a Transition.
This post skips many implementation details of the engineering to ensure monotonicity, fault tolerance, and liveness of this process, for the sake of brevity.
Acknowledgements
With thanks to my colleague, Borja de Régil, for his review, edits and suggestions.