In response to You Do It Too: Forfeiting Partition Tolerance in Distributed Systems, I’d like to remind folks of a few things around CAP.

Partition intolerance does not mean that partitions cannot happen, it means partitions are not supported.

Specifically, partition-intolerant systems must sacrifice invariants when partitions occur. Which invariants? By Gilbert & Lynch, either the system allows nonlinearizable histories, or some requests to non-failing nodes cannot complete. Related proofs tell us that systems which preserve availability during partitions also cannot provide sequential consistency, serializability, repeatable read, cursor stability, or snapshot isolation.

CP, AP describe the behavior if a partition occurs.

CP and AP are upper bounds: systems can provide C or A during a partition, but might provide neither.

This obviously leaves room for an overlap between the two categories.

This is not obvious at all. CP describes systems which are both C and P; AP describes systems which are both A and P. The existence of an overlap between CP and AP implies there exists some system which is C, A, and P. The entire point of the CAP theorem is that such systems cannot exist. The existence of such a system, even theoretically, would disprove the theorem. You’ve got a paper to publish.

Many CA systems are not CP.

Every CA system is not CP.

Many CP systems are not CA.

Every CP system is not CA. Or the theorem’s wrong, and we’ve got several proofs to overturn!

Systems that belong to these two categories are only systems that stop working during the partition, but are consistent once the partition is fixed (trivially a webserver connected to a database). I personally prefer to call these systems ‘CA’…

Gilbert & Lynch’s proof is very clear about what constitutes availability: “For a distributed system to be continuously available, every request received by a non-failing node in the system must result in a response.” Systems which stop working during a partition trivially fail the theorem’s definition of availability by failing to make progress. You can’t call them CA.

Without data agreed upon, there is no real way out from this debate.

Luckily, we do have data: network partitions happen in LANs all the time. Claiming small clusters will save you is a hand-waving argument at best: there are plenty of cases of clusters as small as two nodes, connected by redundant physical switches, encountering network partitions.

“node failures, processes crashes and network partitions are partitions so you have to be partition tolerant”. This is not only false but also dangerous: it hides the fact that each of these faults could be tackled independently with a specific priority.

I’d like to reiterate that “network partitions” don’t only happen in the network. In a formal model of a distributed system, like that used in the Gilbert & Lynch proof, we refer to everything that transmits messages between processes as “the network”, and make idealizing assumptions about the processes themselves: e.g. they are always singlethreaded, they execute in bounded time, etc. Real software is fuzzier: our processes are usually not realtime, which means the network effectively extends within the node. Garbage collection, in particular, is a notorious cause of “network” partitions, because it delays messages.

More informally, I suspect that attempting to tackle network partitions as an independent type of fault is why so many databases fail their Jepsen tests. It’s easier to choose an algorithm which is safe in the general case of asynchronous networks, than to try and impose synchronous delivery via imperfect failure detectors and special-cases.

In the previous post, we discovered the potential for data loss in RabbitMQ clusters. In this oft-requested installation of the Jepsen series, we’ll look at etcd: a new contender in the CP coordination service arena. We’ll also discuss Consul’s findings with Jepsen.

Like Zookeeper, etcd is designed to store small amounts of strongly-consistent state for coordination between services. It exposes a tree of logical nodes; each identified by a string key, containing a string value, and with a version number termed an index–plus, potentially, a set of child nodes. Everything’s exposed as JSON over an HTTP API.

Etcd is often used for service discovery, distributed locking, atomic broadcast, sequence numbers, and pointers to data in eventually consistent stores. Because etcd offers atomic compare-and-set by both value and version index, it’s a powerful primitive in building other distributed systems.

In this post, we’ll write a Jepsen test for etcd, and see whether it lives up to its consistency claims.

Writing a client

A client, in Jepsen, applies a series of operations to one particular node in a distributed system. Our client will take invocations like {:process 2, :type :invoke, :f :cas, :value [1 2]}, try to change the value of a register from 1 to 2, and return a completion like {:process 2, :type :ok, :f :cas, :value [1 2]} if etcd acknowledges the compare-and-set. If you’re a little confused, now might be a good time to skim through the earlier discussion of strong consistency models.

So: first things first. We’ll define a new datatype, called CASClient, with two fields: a key k, and an etcd client client.

(defrecord CASClient [k client]

Jepsen has a protocol–a suite of functions–for interacting with clients. We’ll define how CASClient supports that protocol by declaring the protocol name client/Client, followed by three functions from that protocol: setup!, invoke!, and teardown!.

client/Client (setup! [this test node]

The setup function takes three arguments: the CASClient itself (this), the test being run, and the name of the node this client should connect to. Think of a client like a stem cell: before the test runs, it lies latent, unspecialized. When the test starts, we’ll spawn a client for each node. The setup! function differentiates a latent client, returning an active client bound to one particular node. Some state, like the key k, will be inherited by the new client. Other state, like the database connection, will be set up for each new client independently.

(let [client (v/connect (str "http://" (name node) ":4001"))] (v/reset! client k (json/generate-string nil)) (assoc this :client client)))

In this namespace, we’ll use my Verschlimmbesserung etcd client and call its namespace v. v/connect creates a new Verschlimmbersserung client for the given node. We call v/reset! to initialize the key k to nil, json-encoded. Then, using assoc, we return a copy of this CASClient, but with the :client field replaced by the Verschlimmbersserung client.

Next, we’ll implement the Client protocol’s invoke! function, which takes a Client, a test, and an invocation to apply.

(invoke! [this test op]

Things get a little complicated now. We often say that an unexpected exception or a timeout means an operation failed–but in verifying strong consistency, we need to be a little more precise. We must distinguish between three outcomes:

  1. :ok results, where the operation definitely occurred,
  2. :fail results, where the operation definitely did not occur, and
  3. :info results, where the operation might or might not have taken place.

Indeterminate results, like timeouts, are the bane of the model checker. We never know whether those operations might complete at some point hours or weeks later–so when a timeout occurs, we consider the process crashed and spawn a new one. That process is still concurrent with every subsequent operation in the history, which imposes a huge cost at verification time. Wherever possible, we want to declare definitively that an operation did or did not happen.

Reads are a special case: they don’t affect the state of the system, so as far as the model checker is concerned, an indeterminate read can always be interpreted as never having happened at all–e.g., a :fail state. We’re going to use this distinction in some error handling code later.

; Reads are idempotent; if they fail we can always assume they didn't ; happen in the history, and reduce the number of hung processes, which ; makes the knossos search more efficient (let [fail (if (= :read (:f op)) :fail :info)]

Now, depending on the function :f of the invoke operation, we’ll either read, write, or compare-and-set the value at key k.

(try+ (case (:f op) :read (let [value (-> client (v/get k {:consistent? true}) (json/parse-string true))] (assoc op :type :ok :value value)) :write (do (->> (:value op) json/generate-string (v/reset! client k)) (assoc op :type :ok)) :cas (let [[value value'] (:value op) ok? (v/cas! client k (json/generate-string value) (json/generate-string value'))] (assoc op :type (if ok? :ok :fail))))

For a read, we’ll take the client, get the key using an etcd consistent read, and parse the key as JSON. Then we’ll return a copy of the invocation, but with the type :ok and the :value obtained from etcd. Note that we’re using etcd’s consistent read option, which claims:

If your application wants or needs the most up-to-date version of a key then it should ensure it reads from the current leader. By using the consistent=true flag in your GET requests, etcd will make sure you are talking to the current master.

For a write, we’ll take the :value from the operation, serialize it to JSON, and call v/reset! to change the register to that new value.

For a compare-and-set (:cas), we’ll take a pair of values–old and new–and bind them to value and value'. We’ll serialize both to JSON, and call v/cas! to atomically set k to the new value iff it currently has the old value. v/cas! returns true if the CAS succeeded, and false if the CAS failed, so we return a :type of :ok or :fail depending on its return value ok?.

Finally, we’ll handle a few common error conditions, just to reduce the chatter in the logs.

; A few common ways etcd can fail (catch java.net.SocketTimeoutException e (assoc op :type fail :value :timed-out)) (catch [:body "command failed to be committed due to node failure\n"] e (assoc op :type fail :value :node-failure)) (catch [:status 307] e (assoc op :type fail :value :redirect-loop)) (catch (and (instance? clojure.lang.ExceptionInfo %)) e (assoc op :type fail :value e)) (catch (and (:errorCode %) (:message %)) e (assoc op :type fail :value e)))))

If we’re doing a read, these error handlers will return :fail as a performance optimization. If we’re doing a write or CAS, they’ll return :info, letting Knossos know that those operations might or might not have taken place.

One last function from the Client protocol: teardown!, which releases any resources the clients might be holding on to. These clients are just stateless HTTP wrappers, so there’s nothing to do here.

(teardown! [_ test]))

That’s it, really. We’ll write a little function to create a latent instance of this datatype, and use it in our test! We’ll call our key "jepsen", and leave the client field blank–it’ll be filled in by calls to setup!.

(defn cas-client "A compare and set register built around a single etcd key." [] (CASClient. "jepsen" nil))

A singlethreaded model

We need a model of an etcd register to go along with this client. The model’s job is to take an operation, apply it to the current state of the model, and return a new model state–or a special inconsistent state if the given operation can’t be applied. We’ll create a datatype called CASRegister, which has a single field called value.

(defrecord CASRegister [value] Model (step [r op] (condp = (:f op) :write (CASRegister. (:value op)) :cas (let [[cur new] (:value op)] (if (= cur value) (CASRegister. new) (inconsistent (str "can't CAS " value " from " cur " to " new)))) :read (if (or (nil? (:value op)) (= value (:value op))) r (inconsistent (str "can't read " (:value op) " from register " value))))))

Just like our invoke! function, CASRegister chooses what to do based on the operation’s function :f. For a write, it returns a new CASRegister wrapping the given value.

For a compare-and-set, it binds the current and new values, then checks whether its own value is equal to the operation’s current value. If it is, we return a new register with the new value. If it isn’t, we construct a special inconsistent result, explaining why the CAS operation won’t work.

When a read is invoked, the client may not know what value it’s reading. We allow the read to go through–returning the same model r–if the client doesn’t provide a value to be read, or if the value the client read is equal to our current value. If the client tries to read some specific value, and it’s not the current value of the register, though–that’s an inconsistent state.

Then, a quick constructor function that starts off with the value nil. Note that this initial value corresponds to the value we wrote to the etcd key when the clients start up; both the real system and the model have to start in the same state.

(defn cas-register "A compare-and-set register" ([] (cas-register nil)) ([value] (CASRegister. value)))

With the client and model written, it’s time to combine both into a Jepsen test:

Designing a test

We’ll start with a baseline noop-test, and override it with etcd-specific fields. We’re pulling in etcd’s db to automate setup and teardown of the cluster, the cas-client we wrote earlier, and model/cas-register–our singlethreaded model of a compare-and-set register. We’ll use two checkers: an HTML timeline visualization, and the linearizable checker, powered by Knossos. A special client, the :nemesis, introduces network failures by partitioning the cluster into randomly selected halves.

(deftest register-test (let [test (run! (assoc noop-test :name "etcd" :os debian/os :db (db) :client (cas-client) :model (model/cas-register) :checker (checker/compose {:html timeline/html :linear checker/linearizable}) :nemesis (nemesis/partition-random-halves) :generator (gen/phases (->> gen/cas (gen/delay 1) (gen/nemesis (gen/seq (cycle [(gen/sleep 5) {:type :info :f :start} (gen/sleep 5) {:type :info :f :stop}]))) (gen/time-limit 20)) (gen/nemesis (gen/once {:type :info :f :stop})) (gen/sleep 10) (gen/clients (gen/once {:type :invoke :f :read})))))] (is (:valid? (:results test))) (report/linearizability (:linear (:results test)))))

The generator defines the sequence and schedule of operations. This test proceeds in phases–all clients must complete a phase before any can move to the next. We’ll start off with gen/cas, which emits a mix of random :read, :write, and :cas invocations.

(def cas "Random cas/read ops for a compare-and-set register over a small field of integers." (reify Generator (op [generator test process] (condp < (rand) 0.66 {:type :invoke :f :read} 0.33 {:type :invoke :f :write :value (rand-int 5)} 0 {:type :invoke :f :cas :value [(rand-int 5) (rand-int 5)]}))))

We wrap that generator in gen/delay, adding an extra second of latency to each operation to slow down the test a bit. Meanwhile, the nemesis cycles through an infinite sequence of sleeping, starting a network partition, sleeping, then resolving the partition. We limit the entire phase to 20 seconds–etcd convergence times are quite fast.

In the next phase, the nemesis emits a single :stop operation, resolving the network partition. We sleep for 10 seconds, then ask each client to perform a final read, just to see how the system stabilized.

Running the test

etcd-race.jpg

Etcd starts up fast. It converges in a matter of milliseconds, whereas many systems take 10 seconds or even minutes to detect failures. This is really convenient for testing–and arguably a nice property in production–but it also exposed a number of serious issues in etcd’s cluster state management: most notably, race conditions.

For instance, Issue 716 caused the primary to death-spiral almost every time I stood up a cluster, even with five or ten seconds between joining each node. The etcd team was incredibly responsive about fixing these bugs, but I’m kind of surprised to find problems like this in software that’s been released for almost a year. I’ve heard several anecdotal reports of other concurrency issues in goraft (the implementation of the underlying consensus protocol) which makes me a little nervous about trusting it, but it’s tough to turn anecdotes into reproducible failure cases, so I won’t dive into those here.

With some work, I was able to reliably stand up a cluster

Here’s one of the shorter cases in full. First, Jepsen stands up the etcd cluster on nodes :n1, :n2, etc, and spools up five worker threads.

INFO jepsen.system.etcd - :n4 etcd ready INFO jepsen.system.etcd - :n1 etcd ready INFO jepsen.system.etcd - :n5 etcd ready INFO jepsen.system.etcd - :n2 etcd ready INFO jepsen.system.etcd - :n3 etcd ready INFO jepsen.core - Worker 0 starting INFO jepsen.core - Worker 3 starting INFO jepsen.core - Worker 2 starting INFO jepsen.core - Worker 4 starting INFO jepsen.core - Worker 1 starting

Each worker thread, representing a process, concurrently invokes a series of random operations against a single etcd register. Each process talks to a distinct etcd node for each request, but follows redirects to whatever node that node thinks is the current leader. They happen to start off all making reads of the initial value nil. Then process 0 begins a read, process 3 begins a compare-and-set from 2 to 4, which fails since the value is nil, and so on.

INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 4 :invoke :read nil INFO jepsen.util - 0 :invoke :read nil INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 3 :invoke :read nil INFO jepsen.util - 0 :ok :read nil INFO jepsen.util - 3 :ok :read nil INFO jepsen.util - 4 :ok :read nil INFO jepsen.util - 2 :ok :read nil INFO jepsen.util - 1 :ok :read nil INFO jepsen.util - 0 :invoke :read nil INFO jepsen.util - 3 :invoke :cas [2 4] INFO jepsen.util - 4 :invoke :cas [4 4] INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 0 :ok :read nil INFO jepsen.util - 2 :ok :read nil INFO jepsen.util - 1 :ok :read nil INFO jepsen.util - 4 :fail :cas [4 4] INFO jepsen.util - 3 :fail :cas [2 4]

The nemesis process initiates a network partition, isolating :n5 from :n4 and :n1, isolating :n2 from :n4 and :n1, etc. Notice that it takes some time for the nemesis to make those changes to the network.

INFO jepsen.util - :nemesis :info :start nil INFO jepsen.util - 0 :invoke :write 4 INFO jepsen.util - 0 :ok :write 4 INFO jepsen.util - 3 :invoke :read nil INFO jepsen.util - 3 :ok :read 4 INFO jepsen.util - 2 :invoke :cas [0 4] INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 1 :ok :read 4 INFO jepsen.util - 4 :invoke :write 1 INFO jepsen.util - 2 :fail :cas [0 4] INFO jepsen.util - 4 :ok :write 1 INFO jepsen.util - 0 :invoke :read nil INFO jepsen.util - 0 :ok :read 1 INFO jepsen.util - 3 :invoke :read nil INFO jepsen.util - 3 :ok :read 1 INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 1 :ok :read 1 INFO jepsen.util - :nemesis :info :start "Cut off {:n5 #{:n4 :n1}, :n2 #{:n4 :n1}, :n3 #{:n4 :n1}, :n1 #{:n3 :n2 :n5}, :n4 #{:n3 :n2 :n5}}"

We see a few operations time out–which we expect from a CP system.

INFO jepsen.util - 3 :info :cas :timed-out INFO jepsen.util - 1 :invoke :write 3 INFO jepsen.util - 1 :ok :write 3 INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 2 :ok :read 3 INFO jepsen.util - 4 :invoke :cas [2 3] INFO jepsen.util - 4 :fail :cas [2 3] INFO jepsen.util - 0 :invoke :write 0 INFO jepsen.util - 8 :invoke :cas [1 2] INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 1 :ok :read 3 INFO jepsen.util - 2 :invoke :write 2 INFO jepsen.util - 2 :ok :write 2 INFO jepsen.util - 4 :invoke :cas [2 3] INFO jepsen.util - 0 :info :write :timed-out INFO jepsen.util - 4 :ok :cas [2 3] INFO jepsen.util - 8 :info :cas :timed-out

After a few cycles of isolating and reconnecting nodes, something interesting happens just as the network is cut off: a “Raft Internal Error”:

FO jepsen.util - 4 :info :cas {:status 500, :errorCode 300, :message "Raft Internal Error", :index 41} INFO jepsen.util - 10 :ok :write 0 INFO jepsen.util - :nemesis :info :start "Cut off {:n1 #{:n2 :n5}, :n4 #{:n2 :n5}, :n3 #{:n2 :n5}, :n5 #{:n3 :n4 :n1}, :n2 #{:n3 :n4 :n1}}"

Another failure case: two nodes each think the other is the current leader, returning HTTP redirects to the other node in an infinite loop.

INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 2 :ok :read 0 INFO jepsen.util - 1 :fail :read :redirect-loop

And the test completes. By and large, operations completed reliably with low latencies, and despite some failures, eyeballing the test things look correct. The Jepsen I wrote a year ago would have called these results A-OK.

INFO jepsen.util - 18 :ok :read 2 INFO jepsen.core - Worker 3 done INFO jepsen.core - Run complete, writing INFO jepsen.core - Analyzing INFO jepsen.core - Analysis complete

Jepsen II, however, is not quite so forgiving.

Results

The very first test I ran with reported a linearizability failure. I was so surprised I spent another week double-checking Knossos and Jepsen, then writing my own etcd client, to make sure I hadn’t made a mistake. Sure enough, etcd’s registers are not linearizable.

FAIL in (register-test) (etcd_test.clj:45) expected: (:valid? (:results test)) actual: false Not linearizable. Linearizable prefix was: 2 :invoke :read nil 4 :invoke :read nil 0 :invoke :read nil 1 :invoke :read nil 3 :invoke :read nil 0 :ok :read nil 3 :ok :read nil 4 :ok :read nil 2 :ok :read nil 1 :ok :read nil 0 :invoke :read nil 3 :invoke :cas [2 4] 4 :invoke :cas [4 4] 2 :invoke :read nil 1 :invoke :read nil 0 :ok :read nil 2 :ok :read nil 1 :ok :read nil 4 :fail :cas [4 4] 3 :fail :cas [2 4] 0 :invoke :read nil 0 :ok :read nil 2 :invoke :write 1 1 :invoke :cas [2 3] 2 :ok :write 1 1 :fail :cas [2 3] 4 :invoke :write 3 3 :invoke :write 1 4 :ok :write 3 3 :ok :write 1 0 :invoke :cas [4 1] 2 :invoke :write 2 1 :invoke :write 1 0 :fail :cas [4 1] 3 :invoke :read 1 4 :invoke :write 0 3 :ok :read 1 2 :ok :write 2 1 :ok :write 1 4 :ok :write 0 :nemesis :info :start nil 0 :invoke :write 4 0 :ok :write 4 3 :invoke :read 4 3 :ok :read 4 2 :invoke :cas [0 4] 1 :invoke :read 4 1 :ok :read 4 4 :invoke :write 1 2 :fail :cas [0 4] 4 :ok :write 1 0 :invoke :read 1 0 :ok :read 1 3 :invoke :read 1 3 :ok :read 1 1 :invoke :read 1 1 :ok :read 1 :nemesis :info :start "Cut off {:n5 #{:n4 :n1}, :n2 #{:n4 :n1}, :n3 #{:n4 :n1}, :n1 #{:n3 :n2 :n5}, :n4 #{:n3 :n2 :n5}}" 2 :invoke :cas [1 4] 4 :invoke :read 1 4 :ok :read 1 2 :ok :cas [1 4] Followed by inconsistent operation: 0 :invoke :read 1

Why aren’t we allowed to read 1 from the register at this point? Knossos can provide us a litany of possible worlds, just prior to that fatal read. For instance, we might have ordered events like so: process 1 reads nil, process 3 reads nil, …

World with fixed history: 1 :invoke :read nil 3 :invoke :read nil 2 :invoke :read nil 4 :invoke :read nil 0 :invoke :read nil 2 :invoke :read nil 1 :invoke :read nil 0 :invoke :read nil 0 :invoke :read nil 2 :invoke :write 1 4 :invoke :write 3 3 :invoke :write 1 3 :invoke :read 1 4 :invoke :write 0 1 :invoke :write 1 2 :invoke :write 2 0 :invoke :write 4 3 :invoke :read 4 1 :invoke :read 4 4 :invoke :write 1 0 :invoke :read 1 3 :invoke :read 1 1 :invoke :read 1 4 :invoke :read 1 2 :invoke :cas [1 4] led to state: {:value 4} with pending operations: (and 12928 more worlds, elided here)

But the key problem is that in all thirteen-thousand odd interpretations of this history, every one of those worlds led to a register with the value 4.

Inconsistent state transitions: ([{:value 4} "can't read 1 from register 4"])

Once that CAS goes through, a linearizable register can’t return the previous value for a read. This violates linearizability.

What’s going on here?

etcd-cas-stale-read.jpg

Looking at the history just prior to that failure, we see that process 4 wrote 1 to the register, and several processes read that value before the partition occurred. It looks like the value was 1, a compare-and-set from 1 to 4 took place, but after that CAS completed, some process managed to read the previous value. In the consistency literature, this is called a stale read.

Stale reads are bad. They don’t just violate linearizability–they violate sequential consistency, causal consistency, read-your-write, monotonic writes, monotonic reads–basically everything you’d want from a single-valued register goes out the window. This is particularly surprising because Raft, the etcd consensus algorithm, guarantees that committed log entries are linearizable.

But etcd’s “consistent” reads don’t go through the Raft log.

etcd-raft-multiprimary.jpg

Instead, they simply return the local state if the current node considers itself a leader. But Raft says nothing about guaranteeing leader exclusivity: multiple nodes can consider themselves the leader simultaneously.

So imagine two nodes, separated by a network partition, have the value 1. The node on top has just been elected leader for the most recent term, and accepts that CAS request, changing 1 to 4. It’s unable to propagate that change to the old leader because they’re separated by a network partition. The old leader goes on happily replying to reads with the old value, until it realizes it hasn’t received a heartbeat from a majority of peers in some time, and steps down.

Once the partition resolves, the old leader receives the new value 4 from the new leader, and the system continues on its way.

I want to be explicit, because some people have asserted that this behavior is “linearizable with respect to the Raft index”, even if it isn’t “linearizable in general”. It’s neither. An etcd “consistent read” can read a value from index 5, then index 4, then index 6, and so on. I think you might be able to recover sequential consistency by adding an FSM to the client that tracks the etcd index and tags all requests with a minimum-index constraint, but this is a.) optional, b.) not in any of the clients I know of, and c.) isn’t linearizable anyway.

A note on Consul

Just after I found this bug in etcd, Hashicorp announced a new service-discovery project called Consul. A half-dozen people asked me what I thought of its design, and to my delight, its authors had already tested their system using Jepsen’s etcd test as a template. They reported:

As part of our Consul testing, we ran it against Jepsen to determine if any consistency issues could be uncovered. In our testing, Consul gracefully recovered from partitions without introducing any consistency issues.

This is not quite the whole story.

Jepsen actually did find a consistency issue. In fact, it found the same mistake that etcd made: “consistent” reads in Consul return the local state of any node that considers itself a leader, allowing stale reads. Their solution at the time was to change the leader timeout from 1 second to 300 milliseconds, side-stepping the race condition.

- LeaderLeaseTimeout: time.Second, + LeaderLeaseTimeout: 300 * time.Millisecond,

Now, I’ve fought quite a few race conditions in my day, and adjusting the timeouts is a great nuclear option–but it doesn’t really guarantee correctness. High IO utilization and blocking syscalls can introduce surprising delays into processes at runtime. VMWare vmotion will happily pause a process for seconds, as will garbage collection. Go’s GC is not particularly sophisticated yet, and there are production reports of ten second garbage collection pauses. Bottom line: a seven-hundred millisecond pause is not gonna cut it. The best way to solve a race condition, in general, is to remove the time dependence from the algorithm altogether.

Future iterations of Jepsen may be somewhat more challenging with respect to clock assumptions.

Good news, everyone!

I’ve corresponded with both the etcd and Consul teams about this, and the emerging consensus is to implement three types of reads, for varying performance/correctness needs:

  • Anything-goes reads, where any node can respond with its last known value. Totally available, in the CAP sense, but no guarantees of monotonicity. Etcd does this by default, and Consul terms this “stale”.
  • Mostly-consistent reads, where only leaders can respond, and stale reads are occasionally allowed. This is what etcd currently terms “consistent”, and what Consul does by default.
  • Consistent reads, which require a round-trip delay so the leader can confirm it is still authoritative before responding. Consul now terms this consistent.

Consul has, I believe, already implemented these changes, and written comprehensive documentation for the tradeoffs involved. Etcd is still in process, but I think they’ll get to it soon.

The etcd and Consul teams both take consistency seriously, and have been incredibly responsive to bug reports. I’m very thankful for their help in getting both systems running, and for their care in finding good tradeoffs between latency and consistency. I’m very excited to see a spate of strongly-consistent systems emerging in the last couple years, and I look forward to watching both etcd and Consul evolve. It’s a good time to be a software engineer!

In particular, I’d like to thank Xiang Li, Armon Dadgar, Evan Phoenix, Peter Bailis, and Kelly Sommers for their help in this analysis. A big thanks as well to Comcast, whose research grant made this round of Jepsen verification achievable. Y'all rock.

Next up: Elasticsearch.

RabbitMQ

RabbitMQ is a distributed message queue, and is probably the most popular open-source implementation of the AMQP messaging protocol. It supports a wealth of durability, routing, and fanout strategies, and combines excellent documentation with well-designed protocol extensions. I’d like to set all these wonderful properties aside for a few minutes, however, to talk about using your queue as a lock service. After that, we’ll explore RabbitMQ’s use as a distributed fault-tolerant queue.

Rabbit as a lock service

While I was working on building Knossos–Jepsen’s linearizability checker–a RabbitMQ blog post made the rounds of various news aggregators. In this post, the RabbitMQ team showed how one could turn RabbitMQ into a distributed mutex or semaphore service. I thought this was a little bit suspicious, because the RabbitMQ documentation is very clear that partitions invalidate essentially all Rabbit guarantees, but let’s go with it for a minute.

rabbit-ideal.jpg

RabbitMQ provides a feature whereby a crashed consumer–or one that declares it was unable to process a message–may return the message to the queue with a negative-ack. The message will then be redelivered to some other consumer later. We can use a queue containing a single message as a shared mutex–a lock that can only be held by a single consumer process at a time.

rabbit-ideal-limit.jpg

To acquire the lock, we attempt to consume the message from the queue. When we get the message, we hold the lock. When we wish to release the lock, we issue a negative-ack for the message, and Rabbit puts it back in the queue. It may be some time before another process comes to get the message from the queue, but even in the limiting case, where another process is waiting immediately, a linearizable queue guarantees the safety of this mutex. In order to successfully acquire the mutex, the mutex has to be free in the queue. In order to be free, the mutex had to have been released by another process, and if released, that process has already agreed to give up the lock–so at no point can two processes hold the mutex at the same time.

rabbit-partition.jpg

RabbitMQ, however, is not a linearizable queue.

It can’t be linearizable, because as a queue, Rabbit needs to be tolerant of client failures. Imagine that a client process crashes. That process is never going to send Rabbit a negative-ack message. Rabbit has to infer the process has crashed because it fails to respond to heartbeat messages, or because the TCP connection drops, and, when it decides the process has crashed, it re-enqueues the message for delivery to a second process.

This is still a safe mutex, because a truly crashed process can’t do anything, so it has essentially given up the lock. RabbitMQ can safely hand the message to another process, allowing that process to recover the lock.

rabbit-partition-history.jpg

But then an ugly thought occurs: in an asynchronous network, reliable failure detectors are really darn hard. RabbitMQ can’t tell the difference between a client that’s crashed, and one that’s simply unresponsive. Perhaps the network failed, or perhaps the node is undergoing a GC pause, or the VM hiccuped, or the thread servicing RabbitMQ crashed but the thread using the mutex is still running, etc etc etc.

When this happens, Rabbit assumes the process crashed, and sensibly re-enqueues the message for another process to consume. Now our mutex is held by two processes concurrently. It is, in short, no longer mutually exclusive.

A demonstration

I’ve written a basic mutex client using this technique in Jepsen. We’ll invoke this client with a stream of alternating :acquire and :release operations, like so:

(deftest mutex-test (let [test (run! (assoc noop-test :name "rabbitmq-mutex" :os debian/os :db db :client (mutex) :checker (checker/compose {:html timeline/html :linear checker/linearizable}) :model (model/mutex) :nemesis (nemesis/partition-random-halves) :generator (gen/phases (->> (gen/seq (cycle [{:type :invoke :f :acquire} {:type :invoke :f :release}])) gen/each (gen/delay 180) (gen/nemesis (gen/seq (cycle [(gen/sleep 5) {:type :info :f :start} (gen/sleep 100) {:type :info :f :stop}]))) (gen/time-limit 500)))))] (is (:valid? (:results test))) (report/linearizability (:linear (:results test)))))

We’re starting out with a basic test skeleton called noop-test, and merging in a slew of options that tell Jepsen how to set up, run, and validate the test. We’ll use the DB and client from RabbitMQ’s namespace. Then we’ll check the results using both an HTML visualization and Jepsen’s linearizability checker, powered by Knossos. Our model for this system is a simple mutex

(defrecord Mutex [locked?] Model (step [r op] (condp = (:f op) :acquire (if locked? (inconsistent "already held") (Mutex. true)) :release (if locked? (Mutex. false) (inconsistent "not held")))))

… which can be acquired and released, but never double-acquired or double-released. To create failures we’ll use the partition-random-halves nemesis: a special Jepsen client that cuts the network into randomly selected halves. Then we give a generator: a monadic structure that determines what operations are emitted and when.

:generator (->> (gen/seq (cycle [{:type :invoke :f :acquire} {:type :invoke :f :release}])) gen/each (gen/delay 180) (gen/nemesis (gen/seq (cycle [(gen/sleep 5) {:type :info :f :start} (gen/sleep 100) {:type :info :f :stop}]))) (gen/time-limit 500)))))]

We start with an infinite sequence of alternating :acquire and :release operations, wrap it in a generator using gen/seq, then scope that generator to each client independently using gen/each. We make each client wait for 180 seconds before the next operation by using gen/delay, to simulate holding the lock for some time.

Meanwhile, on the nemesis client, we cycle through four operations: sleeping for five seconds, emitting a :start operation, sleeping for 100 seconds, and emitting a :stop. This creates a random network partition that lasts a hundred seconds, resolves it for five seconds, then creates a new partition.

Finally, we limit the entire test to 500 seconds. During the test, we’ll see partition messages in the RabbitMQ logs, like

=ERROR REPORT==== 10-Apr-2014::13:16:08 === ** Node rabbit@n3 not responding ** ** Removing (timedout) connection ** =INFO REPORT==== 10-Apr-2014::13:16:29 === rabbit on node rabbit@n5 down =ERROR REPORT==== 10-Apr-2014::13:16:45 === Mnesia(rabbit@n1): ** ERROR ** mnesia_event got {inconsistent_database, running_partitioned_network, rabbit@n3}

When the test completes, Jepsen tells us that the mutex failed to linearize. The linearizable prefix is the part of the history where Knossos was able to find some valid linearization. The first line tells us that the :nemesis process reported an :info message with function :start (and no value). A few lines later, process 1 :invokes the :acquire function, concurrently with the other four processes. Most of those processes result in a :fail op, but process 1 sucessfully acquires the lock with an :ok.

Note that Knossos fills in the values for invocations with the known values for their completions, which is why some invocations–like the failed acquire attempts near the end of the history–have values “from the future”.

Not linearizable. Linearizable prefix was: :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n4 :n5) (:n1 :n3 :n2)]" :nemesis :info :stop nil :nemesis :info :stop "fully connected" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n1 :n5) (:n4 :n2 :n3)]" 1 :invoke :acquire 1 3 :invoke :acquire nil 0 :invoke :acquire nil 2 :invoke :acquire nil 4 :invoke :acquire nil 3 :fail :acquire nil 2 :fail :acquire nil 1 :ok :acquire 1 :nemesis :info :stop nil 4 :info :acquire "indeterminate: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@n2' of durable queue 'jepsen.semaphore' in vhost '/' is down or inaccessible, class-id=60, method-id=70), null, \"\"}" :nemesis :info :stop "fully connected" 0 :info :acquire "indeterminate: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@n2' of durable queue 'jepsen.semaphore' in vhost '/' is down or inaccessible, class-id=60, method-id=70), null, \"\"}" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n2 :n4) (:n1 :n5 :n3)]" :nemesis :info :stop nil :nemesis :info :stop "fully connected" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n3 :n2) (:n1 :n5 :n4)]" 3 :invoke :release :not-held 2 :invoke :release :not-held 3 :fail :release :not-held 2 :fail :release :not-held 1 :invoke :release nil 1 :ok :release nil 9 :invoke :acquire "clean connection shutdown; reason: Attempt to use closed channel" 9 :fail :acquire "clean connection shutdown; reason: Attempt to use closed channel" 5 :invoke :acquire "clean connection shutdown; reason: Attempt to use closed channel" 5 :fail :acquire "clean connection shutdown; reason: Attempt to use closed channel" :nemesis :info :stop nil :nemesis :info :stop "fully connected" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n1 :n4) (:n3 :n5 :n2)]" 3 :invoke :acquire nil 2 :invoke :acquire 1 2 :ok :acquire 1 1 :invoke :acquire 2

Next, Jepsen tells us which operation invalidated every consistent interpretation of the history.

Followed by inconsistent operation: 1 :ok :acquire 2

Process 2 successfully acquired the lock in the final lines of the linearizable prefix. It was OK for process 1 to try to acquire the lock, so long as that invocation didn’t go through until process 2 released it. However, process 1’s acquisition succeeded before the lock was released! Jepsen can show us the possible states of the system just prior to that moment:

Last consistent worlds were: ---------------- World from fixed history: 1 :invoke :acquire 1 1 :invoke :release nil 2 :invoke :acquire 1 and current state #jepsen.model.Mutex{:locked true} with pending operations: 3 :invoke :acquire nil 1 :invoke :acquire 2 0 :invoke :acquire nil 4 :invoke :acquire nil ---------------------------------------------

In this world, the lock was acquired by 1, released by 1, and acquired by 2. That led to the state Mutex{:locked true}, with four outstanding acquire ops in progress. But when process 1 successfully acquired the lock, we were forced to eliminate this possibility. Why? Because

Inconsistent state transitions: ([{:locked true} "already held"])

The state {:locked true} couldn’t be followed by a second :acquire op, because the lock was already held.

This shouldn’t be surprising: RabbitMQ is designed to ensure message delivery, and its recovery semantics require that it deliver messages more than once. This is a good property for a queue! It’s just not the right fit for a lock service.

Thinking a little deeper, FLP and Two-Generals suggests that in the presence of a faulty process or network, the queue and the consumer can’t agree on whether or not to consume a given message. Acknowledge before processing the message, and a crash can cause data loss. Acknowledge after processing, and a crash can cause duplicate delivery. No distributed queue can offer exactly-once delivery–the best they can do is at-least-once or at-most-once.

So, the question becomes: does RabbitMQ offer at-least-once delivery in the presence of network partitions?

Rabbit as a queue

For this test, we’ll use a different kind of client one that takes :enqueue and :dequeue operations, and applies them to a RabbitMQ queue. We’ll be using durable, triple-mirrored writes across our five-node cluster, with the publisher confirms extension enabled so we only consider messages successful once acked by RabbitMQ. Here’s the generator for this test case:

:generator (gen/phases (->> (gen/queue) (gen/delay 1/10) (gen/nemesis (gen/seq (cycle [(gen/sleep 60) {:type :info :f :start} (gen/sleep 60) {:type :info :f :stop}]))) (gen/time-limit 360)) (gen/nemesis (gen/once {:type :info, :f :stop})) (gen/log "waiting for recovery") (gen/sleep 60) (gen/clients (gen/each (gen/once {:type :invoke :f :drain}))))))]

This test proceeds in several distinct phases: all clients must complete a phase before moving, together, to the next. In the first phase, we generate random :enqueue and :dequeue requests for sequential integers, with a 10th of a second delay in between ops. Meanwhile, the nemesis cuts the network into random halves every sixty seconds, then repairs it for sixty seconds, and so on. This proceeds for 360 seconds in total.

In the next phase, the nemesis repairs the network. We log a message, then sleep for sixty seconds to allow the cluster to stabilize. I’ve reduced the Erlang net_tick times and timeouts to allow for faster convergence, but the same results hold with the stock configuration and longer failure durations.

Finally, each process issues a single :drain operation, which the client uses to generate a whole series of :dequeue ops, for any remaining messages in the queue. This ensures we should see every enqueued message dequeued at least once.

Rabbit has several distribution mechanisms, so I want to be specific: we’re testing clustering, not the Federation or Shovel systems. We’re aiming for the safest, most consistent settings, so let’s consult Rabbit’s documentation about distribution:

clustering.png

The column on the right suggests that we can choose either consistency and availability, or consistency and partition tolerance. We’ve talked about how sacrificing partition tolerance leads to terrible things happening, so let’s opt for consistency and partition tolerance. The CP link points to https://www.rabbitmq.com/partitions.html#cp-mode, which says:

In pause-minority mode RabbitMQ will automatically pause cluster nodes which determine themselves to be in a minority (i.e. fewer or equal than half the total number of nodes) after seeing other nodes go down. It therefore chooses partition tolerance over availability from the CAP theorem. This ensures that in the event of a network partition, at most the nodes in a single partition will continue to run.

CP mode sounds like the safest option, so we’ll enable pause_minority in our configuration, and expect to see failures from the minority nodes. Rabbit can’t guarantee exactly-once delivery, so it can’t really be CP, but shutting down the minority component should reduce duplicate deliveries as compared to allowing every node to respond to dequeues. Theoretically every node could accept enqueues during the partition (sacrificing the relative order of delivery), so we’ll hope to see that behavior as well.

During the partition, Jepsen shows that processes talking to the majority partition can successfully enqueue and dequeue messages, but processes talking to the minority component fail.

INFO jepsen.util - 15 :invoke :dequeue nil INFO jepsen.util - 15 :ok :dequeue 2935 INFO jepsen.util - 2551 :invoke :enqueue 3519 INFO jepsen.util - 2551 :ok :enqueue 3519 INFO jepsen.util - 2501 :invoke :dequeue nil WARN jepsen.core - Process 2501 indeterminate java.util.concurrent.ExecutionException: java.io.IOException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:202) ... Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@n1' of durable queue 'jepsen.queue' in vhost '/' is down or inaccessible, class-id=60, method-id=70), null, ""}
MOTOKOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO

Process 2501 has crashed: it’s uncertain whether its enqueue suceeded or not. Because the enqueued message might succeed at some later time, we log an :info operation rather than an :ok or :fail. Because processes are singlethreaded in this formalism, we abandon that client and start a new client with a unique process ID. Process 2501 will never appear again in this history–but process 2506 will replace it and carry on performing operations against that node in the cluster.

At the end of the test, Jepsen heals the cluster, waits, and drains the queue:

INFO jepsen.util - 2896 :invoke :drain nil INFO jepsen.util - 2896 :ok :dequeue 3724 INFO jepsen.util - 9 :invoke :drain nil INFO jepsen.util - 15 :invoke :drain nil INFO jepsen.util - 9 :ok :dequeue 3733 INFO jepsen.util - 15 :ok :dequeue 3730 INFO jepsen.util - 273 :invoke :drain nil INFO jepsen.util - 273 :ok :dequeue 3741 INFO jepsen.util - 2872 :invoke :drain nil INFO jepsen.util - 2872 :ok :dequeue 3746 INFO jepsen.util - 9 :ok :drain :exhausted INFO jepsen.util - 2896 :ok :drain :exhausted INFO jepsen.util - 15 :ok :drain :exhausted INFO jepsen.util - 2872 :ok :drain :exhausted INFO jepsen.util - 273 :ok :drain :exhausted INFO jepsen.core - Worker 1 done INFO jepsen.core - Worker 4 done INFO jepsen.core - Worker 0 done INFO jepsen.core - Worker 3 done INFO jepsen.core - Worker 2 done INFO jepsen.core - Run complete, writing

We run two checkers on the resulting history. The first, :queue, asserts that every enqueue is balanced by exactly one dequeue. It fails quickly: 487 is dequeued twice in this history, after one node crashes. We expect this behavior from a robust queue.

FAIL in (rabbit-test) (rabbitmq_test.clj:77) expected: (:valid? (:results test)) actual: false {:valid? false, :queue {:valid? false, :error "can't dequeue 487"},

The :total-queue checker, though, is more forgiving for lossy systems. It tells us about four kinds of operations:

  • OK messages are enqueued and dequeued successfully.
  • Recovered messages are where we didn’t know if the enqueue was successful, but it was dequeued nonetheless.
  • Unexpected messages are those which came out of the queue despite never having been enqueued. This includes cases where the same message is dequeued more than once.
  • Lost messages were successfully enqueued but never came back out. This is the worst type of failure: the loss of acknowledged writes.
:total-queue {:valid? false, :lost #{2558 3359 3616 3173 1858 2030 2372 3135 3671 3534 3358 2855 3251 3429 2615 3248 2479 1976 2430 3478 3693 2388 3174 3484 3638 2813 3280 2282 2475 3239 2973 1984 3630 2264 2523 2565 2462 3278 3425 ... lots more lines ... 3313 3413 3443 2048 3513 2705 3392 3001 2215 3097 3364 3531 2605 2411 2220 2042 1923 2314 3592 3538 3128 2801 3636 1861 3500 3143 3276 1991 3343 3656 3233 3611 3244 3717 3314 2922 3404 3708}, :unexpected #{487 497 491 510 493 490 508 504 505 502 495 506 500 496 501 498 507 494 489 492 503 509 499 488}, :recovered #{519 521 529 510 527 518 512 517 516 515 523 531 525 528 522 1398 520 524 513 509 511}, :ok-frac 786/1249, :unexpected-frac 8/1249, :lost-frac 1312/3747, :recovered-frac 7/1249}}

The fractions all have the same denominator: attempted enqueues. 2358 of 3747 attempted enqueues were successfully dequeued–which we might expect, given the pause_minority mode shut down 2/5 nodes for a good part of the test. We also saw 24 duplicate deliveries, which, again, we expect from a distributed queue, and 28 recovered writes from indeterminate enqueues–we’ll take as many of those as we can get.

The real problem is those 1312 lost messages: RabbitMQ lost ~35% of acknowledged writes.

You guessed right, folks. When a RabbitMQ node rejoins the cluster, it wipes its local state and adopts whatever the current primary node thinks the queue should contain. Are there any constraints on which node takes precedence? As far as I can ascertain, no–you can induce arbitrary data loss by carefully manipulating the order of partitions.

This is not a theoretical problem. I know of at least two RabbitMQ deployments which have hit this in production.

Recommendations

If you use RabbitMQ clustering, I recommend you disable automatic partition handling. pause_minority reduces availability but still allows massive data loss. autoheal improves availability but still allows for massive data loss. Rabbit advises that you use ignore only if your network is “really reliable”, but I suspect it is the only mode which offers a chance of preventing the loss of acknowledged messages.

If you use the ignore partition mode, Rabbit will allow primary replicas on both sides of a partition, and those nodes will run independently. You can’t dequeue messages written to the other node, but at least neither will overwrite the other’s state. When the network partition recovers, you can isolate one of the nodes from all clients, drain all of its messages, and enqueue them into a selected primary. Finally, restart that node and it’ll pick up the primary’s state. Repeat the process for node which was isolated, and you’ll have a single authoritative cluster again–albeit with duplicates for each copy of the message on each node.

Alternatively, you can eschew RabbitMQ clustering altogether, and connect all nodes via Federation or the Shovel: an external process that ferries messages from one cluster to another. The RabbitMQ team recommends these for unreliable links, like those between datacenters, but if you’re concerned about partitions in your local network, you might choose to deploy them in lieu of clustering. I haven’t figured out how to use either of those systems in lieu of clustering yet. The Shovel, in particular, is a single point of failure.

To the RabbitMQ team, I suggest that the autoheal and pause_minority recover by taking the union of the messages extant on both nodes, rather than blindly destroying all data on one replica. I know this may be a big change to make, given the way Rabbit uses mnesia–but I submit that duplicate delivery and reordering are almost certainly preferable to message loss.

To recap

We used Knossos and Jepsen to prove the obvious: RabbitMQ is not a lock service. That investigation led to a discovery hinted at by the documentation: in the presence of partitions, RabbitMQ clustering will not only deliver duplicate messages, but will also drop huge volumes of acknowledged messages on the floor. This is not a new result, but it may be surprising if you haven’t read the docs closely–especially if you interpreted the phrase “chooses Consistency and Partition Tolerance” to mean, well, either of those things.

I’d like to conclude by mentioning that RabbitMQ does documentation right. Most of the distributed systems I research say nothing about failure modes–but Rabbit’s docs have an entire section devoted to partition tolerance, reliable delivery, and truly comprehensive descriptions of the various failure modes in the API and distribution layer. I wish more distributed systems shared Rabbit’s integrity and attention to detail.

This research was made possible by a generous grant from Comcast’s open-source fund, and by the invaluable assistance of Alvaro Videla from the RabbitMQ team. I’m also indebted to Michael Klishnin and Alex P for their hard work on the Langohr RabbitMQ client. Thanks for reading!

Next up: etcd and Consul.

Network partitions are going to happen. Switches, NICs, host hardware, operating systems, disks, virtualization layers, and language runtimes, not to mention program semantics themselves, all conspire to delay, drop, duplicate, or reorder our messages. In an uncertain world, we want our software to maintain some sense of intuitive correctness.

Well, obviously we want intuitive correctness. Do The Right Thing™! But what exactly is the right thing? How might we describe it? In this essay, we’ll take a tour of some “strong” consistency models, and see how they fit together.

Correctness

There are many ways to express an algorithm’s abstract behavior–but just for now, let’s say that a system is comprised of a state, and some operations that transform that state. As the system runs, it moves from state to state through some history of operations.

uniprocessor-history.jpg

For instance, our state might be a variable, and the operations on the state could be the writes to, and reads from, that variable. In this simple Ruby program, we write and read a variable several times, printing it to the screen to illustrate the reads.

x = "a"; puts x; puts x x = "b"; puts x x = "c" x = "d"; puts x

We already have an intuitive model of this program’s correctness: it should print “aabd”. Why? Because each of the statements happen in order. First we write the value a, then read the value a, then read the value a, then write the value b, and so on.

Once we set a variable to some value, like a, reading it should return a, until we change the value again. Reading a variable returns the most recently written value. We call this kind of system–a variable with a single value–a register.

We’ve had this model drilled into our heads from the first day we started writing programs, so it feels like second nature–but this is not the only way variables could work. A variable could return any value for a read: a, d, or the moon. If that happened, we’d say the system was incorrect, because those operations don’t align with our model of how variables are supposed to work.

This hints at a definition of correctness for a system: given some rules which relate the operations and state, the history of operations in the system should always follow those rules. We call those rules a consistency model.

We phrased our rules for registers as simple English statements, but they could be arbitrarily complicated mathematical structures. “A read returns the value from two writes ago, plus three, except when the value is four, in which case the read may return either cat or dog” is a consistency model. As is “Every read always returns zero”. We could even say “There are no rules at all; every operation is permitted”. That’s the easiest consistency model to satisfy; every system obeys it trivially.

More formally, we say that a consistency model is the set of all allowed histories of operations. If we run a program and it goes through a sequence of operations in the allowed set, that particular execution is consistent. If the program screws up occasionally and goes through a history not in the consistency model, we say the history was inconsistent. If every possible execution falls into the allowed set, the system satisfies the model. We want real systems to satisfy “intuitively correct” consistency models, so that we can write predictable programs.

Concurrent histories

Now imagine a concurrent program, like one written in Node.js or Erlang. There are multiple logical threads of control, which we term “processes”. If we run a concurrent program with two processes, each of which works with the same register, our earlier register invariant could be violated.

multiprocessor-history.jpg

There are two processes at work here: call them “top” and “bottom”. The top process tries to write a, read, read. The bottom process, meanwhile, tries to read, write b, read. Because the program is concurrent, the operations from these two processes could interleave in more than one order–so long as the operations for a single process happen in the order that process specifies. In this particular case, top writes a, bottom reads a, top reads a, bottom writes b, top reads b, and bottom reads b.

In this light, the concept of concurrency takes on a different shape. We might imagine every program as concurrent by default–when executed, operations could happen in any order. A thread, a process–in the logical sense, anyway–is a constraint over the history: operations belonging to the same thread must take place in order. Logical threads impose a partial order over the allowed operations.

Even with that order, our register invariant–from the point of view of an individual process–no longer holds. The process on top wrote a, read a, then read b–which is not the value it wrote. We must relax our consistency model to usefully describe concurrency. Now, a process is allowed to read the most recently written value from any process, not just itself. The register becomes a place of coordination between two processes; they share state.

Light cones

lightcone-history.jpg

Howerver, this is not the full story: in almost every real-world system, processes are distant from each other. An uncached value in memory, for instance, is likely on a DIMM thirty centimeters away from the CPU. It takes light over a full nanosecond to travel that distance–and real memory accesses are much slower. A value on a computer in a different datacenter could be thousands of kilometers–hundreds of milliseconds–away. We just can’t send information there any faster; physics, thus far, forbids it.

This means our operations are no longer instantaneous. Some of them might be so fast as to be negligible, but in full generality, operations take time. We invoke a write of a variable; the write travels to memory, or another computer, or the moon; the memory changes state; a confirmation travels back; and then we know the operation took place.

concurrent-read.jpg

The delay in sending messages from one place to another implies ambiguity in the history of operations. If messages travel faster or slower, they could take place in unexpected orders. Here, the bottom process invokes a read when the value is a. While the read is in flight, the top process writes b–and by happenstance, its write arrives before the read. The bottom process finally completes its read and finds b, not a.

This history violates our concurrent register consistency model. The bottom process did not read the current value at the time it invoked the read. We might try to use the completion time, rather than the invocation time, as the “true time” of the operation, but this fails by symmetry as well; if the read arrives before the write, the process would receive a when the current value is b.

In a distributed system–one in which it takes time for an operation to take place–we must relax our consistency model again; allowing these ambiguous orders to happen.

How far must we go? Must we allow all orderings? Or can we still impose some sanity on the world?

Linearizability

finite-concurrency-bounds.jpg

On careful examination, there are some bounds on the order of events. We can’t send a message back in time, so the earliest a message could reach the source of truth is, well, instantly. An operation cannot take effect before its invocation.

Likewise, the message informing the process that its operation completed cannot travel back in time, which means that no operation may take effect after its completion.

If we assume that there is a single global state that each process talks to; if we assume that operations on that state take place atomically, without stepping on each other’s toes; then we can rule out a great many histories indeed. We know that each operation appears to take effect atomically at some point between its invocation and completion.

We call this consistency model linearizability; because although operations are concurrent, and take time, there is some place–or the appearance of a place–where every operation happens in a nice linear order.

linearizability-complete-visibility.jpg

The “single global state” doesn’t have to be a single node; nor do operations actually have to be atomic. The state could be split across many machines, or take place in multiple steps–so long as the external history, from the point of view of the processes, appears equivalent to an atomic, single point of state. Often, a linearizable system is made up of smaller coordinating processes, each of which is itself linearizable; and those processes are made up of carefully coordinated smaller processes, and so on, down to linearizable operations provided by the hardware.

Linearizability has powerful consequences. Once an operation is complete, everyone must see it–or some later state. We know this to be true because each operation must take place before its completion time, and any operation invoked subsequently must take place after the invocation–and by extension, after the original operation itself. Once we successfully write b, every subsequently invoked read must see b–or some later value, if more writes occur.

We can use the atomic constraint of linearizability to mutate state safely. We can define an operation like compare-and-set, in which we set the value of a register to a new value if, and only if, the register currently has some other value. We can use compare-and-set as the basis for mutexes, semaphores, channels, counters, lists, sets, maps, trees–all kinds of shared data structures become available. Linearizability guarantees us the safe interleaving of changes.

Moreover, linearizability’s time bounds guarantee that those changes will be visible to other participants after the operation completes. Hence, linearizability prohibits stale reads. Each read will see some current state between invocation and completion; but not a state prior to the read. It also prohibits non-monotonic reads–in which one reads a new value, then an old one.

Because of these strong constraints, linearizable systems are easier to reason about–which is why they’re chosen as the basis for many concurrent programming constructs. All variables in Javascript are (independently) linearizable; as are volatile variables in Java, atoms in Clojure, or individual processes in Erlang. Most languages have mutexes and semaphores; these are linearizable too. Strong assumptions yield strong guarantees.

But what happens if we can’t satisfy those assumptions?

Sequential consistency

sequential-history.jpg

If we allow processes to skew in time, such that their operations can take effect before invocation, or after completion–but retain the constraint that operations from any given process must take place in that process' order–we get a weaker flavor of consistency: sequential consistency.

Sequential consistency allows more histories than linearizability–but it’s still a useful model: one that we use every day. When a user uploads a video to YouTube, for instance, YouTube puts that video into a queue for processing, then returns a web page for the video right away. We can’t actually watch the video at that point; the video upload takes effect a few minutes later, when it’s been fully processed. Queues remove synchronous behavior while (depending on the queue) preserving order.

Many caches also behave like sequentially consistent systems. If I write a tweet on Twitter, or post to Facebook, it takes time to percolate through layers of caching systems. Different users will see my message at different times–but each user will see my operations in order. Once seen, a post shouldn’t disappear. If I write multiple comments, they’ll become visible sequentially, not out of order.

Causal consistency

We don’t have to enforce the order of every operation from a process. Perhaps, only causally related operations must occur in order. We might say, for instance, that all comments on a blog post must appear in the same order for everyone, and insist that any reply be visible to a process only after the post it replies to is visible. If we encode those causal relationships like “I depend on operation X” as an explicit part of each operation, the database can delay making operations visible until it has all the operation’s dependencies.

This is weaker than ordering every operation from the same process–operations from the same process with independent causal chains could execute in any relative order–but prevents many unintuitive behaviors.

Serializable consistency

serializable-history.jpg

If we say that the history of operations is equivalent to one that took place in some single atomic order–but say nothing about the invocation and completion times–we obtain a consistency model known as serializability. This model is both much stronger and much weaker than you’d expect.

Serializability is weak, in the sense that it permits many types of histories, because it places no bounds on time or order. In the diagram to the right, it’s as if messages could be sent arbitrarily far into the past or future, that causal lines are allowed to cross. In a serializable database, a transaction like read x is always allowed to execute at time 0, when x had not yet been initialized. Or it might be delayed infinitely far into the future! The transaction write 2 to x could execute right now, or it could be delayed until the end of time, never appearing to occur.

For instance, in a serializable system, the program

x = 1 x = x + 1 puts x

is allowed to print nil, 1, or 2; because the operations can take place in any order. This is a surprisingly weak constraint! Here, we assume that each line represents a single operation and that all operations succeed.

On the other hand, serializability is strong, in the sense that it prohibits large classes of histories, because it demands a linear order. The program

print x if x = 3 x = 1 if x = nil x = 2 if x = 1 x = 3 if x = 2

can only be ordered in one way. It doesn’t happen in the same order we wrote, but it will reliably change x from nil -> 1 -> 2 -> 3, and finally print 3.

Because serializability allows arbitrary reordering of operations (so long as the order appears atomic), it is not particularly useful in real applications. Most databases which claim to provide serializability actually provide strong serializability, which has the same time bounds as linearizability. To complicate matters further, what most SQL databases term the SERIALIZABLE consistency level actually means something weaker, like repeatable read, cursor stability, or snapshot isolation.

Consistency comes with costs

We’ve said that “weak” consistency models allow more histories than “strong” consistency models. Linearizability, for example, guarantees that operations take place between the invocation and completion times. However, imposing order requires coordination. Speaking loosely, the more histories we exclude, the more careful and communicative the participants in a system must be.

You may have heard of the CAP theorem, which states that given consistency, availability, and partition tolerance, any given system may guarantee at most two of those properties. While Eric Brewer’s CAP conjecture was phrased in these informal terms, the CAP theorem has very precise definitions:

  1. Consistency means linearizability, and in particular, a linearizable register. Registers are equivalent to other systems, including sets, lists, maps, relational databases, and so on, so the theorem can be extended to cover all kinds of linearizable systems.

  2. Availability means that every request to a non-failing node must complete successfully. Since network partitions are allowed to last arbitrarily long, this means that nodes cannot simply defer responding until after the partition heals.

  3. Partition tolerance means that partitions can happen. Providing consistency and availability when the network is reliable is easy. Providing both when the network is not reliable is provably impossible. If your network is not perfectly reliable–and it isn’t–you cannot choose CA. This means that all practical distributed systems on commodity hardware can guarantee, at maximum, either AP or CP.

family-tree.jpg

“Hang on!” you might exclaim. “Linearizability is not the end-all-be-all of consistency models! I could work around the CAP theorem by providing sequential consistency, or serializability, or snapshot isolation!”

This is true; the CAP theorem only says that we cannot build totally available linearizable systems. The problem is that we have other proofs which tell us that you cannot build totally available systems with sequential, serializable, repeatable read, snapshot isolation, or cursor stability–or any models stronger than those. In this map from Peter Bailis' HAT not CAP paper, models shaded in red cannot be fully available.

If we relax our notion of availability, such that client nodes must always talk to the same server, some types of consistency become achievable. We can provide causal consistency, PRAM, and read-your-writes consistency.

If we demand total availability, then we can provide monotonic reads, monotonic writes, read committed, monotonic atomic view, and so on. These are the consistency models provided by distributed stores like Riak and Cassandra, or ANSI SQL databases on the lower isolation settings. These consistency models don’t have linear orders like the diagrams we’ve drawn before; instead, they provide partial orders which come together in a patchwork or web. The orders are partial because they admit a broader class of histories.

A hybrid approach

weak-not-unsafe.jpg

Some algorithms depend on linearizability for safety. If we want to build a distributed lock service, for instance, linearizability is required; without hard time boundaries, we could hold a lock from the future or from the past. On the other hand, many algorithms don’t need linearizability. Eventually consistent sets, lists, trees, and maps, for instance, can be safely expressed as CRDTs even in “weak” consistency models.

Stronger consistency models also tend to require more coordination–more messages back and forth–to ensure their operations occur in the correct order. Not only are they less available, but they can also impose higher latency constraints. This is why modern CPU memory models are not linearizable by default–unless you explicitly say so, modern CPUs will reorder memory operations relative to other cores, or worse. While more difficult to reason about, the performance benefits are phenomenal. Geographically distributed systems, with hundreds of milliseconds of latency between datacenters, often make similar tradeoffs.

So in practice, we use hybrid data storage, mixing databases with varying consistency models to achieve our redundancy, availability, performance, and safety objectives. “Weaker” consistency models wherever possible, for availability and performance. “Stronger” consistency models where necessary, because the algorithm being expressed demands a stricter ordering of operations. You can write huge volumes of data to S3, Riak or Cassandra, for instance, then write a pointer to that data, linearizably, to Postgres, Zookeeper or Etcd. Some databases admit multiple consistency models, like tunable isolation levels in relational databases, or Cassandra and Riak’s linearizable transactions, which can help cut down on the number of systems in play. Bottom line, though: anyone who says their consistency model is the only right choice is likely trying to sell something. You can’t have your cake and eat it too.

Armed with a more nuanced understanding of consistency models, I’d like to talk about how we go about verifying the correctness of a linearizable system. In the next Jepsen post, we’ll discuss the linearizability checker I’ve built for testing distributed systems: Knossos.

For a more formal definition of these models, try Dziuma, Fatourou, and Kanellou’s Survey on consistency conditions

Copyright © 2015 Kyle Kingsbury.
Non-commercial re-use with attribution encouraged; all other rights reserved.
Comments are the property of respective posters.