# Call me maybe: Elasticsearch

Previously, on Jepsen, we saw RabbitMQ throw away a staggering volume of data. In this post, we’ll explore Elasticsearch’s behavior under various types of network failure.

Elasticsearch is a distributed search engine, built around Apache Lucene–a well-respected Java indexing library. Lucene handles the on-disk storage, indexing, and searching of documents, while ElasticSearch handles document updates, the API, and distribution. Documents are written to collections as free-form JSON; schemas can be overlaid onto collections to specify particular indexing strategies.

As with many distributed systems, Elasticsearch scales in two axes: sharding and replication. The document space is sharded–sliced up–into many disjoint chunks, and each chunk allocated to different nodes. Adding more nodes allows Elasticsearch to store a document space larger than any single node could handle, and offers quasilinear increases in throughput and capacity with additional nodes. For fault-tolerance, each shard is replicated to multiple nodes. If one node fails or becomes unavailable, another can take over. There are additional distinctions between nodes which can process writes, and those which are read-only copies–termed “data nodes”–but this is primarily a performance optimization.

Because index construction is a somewhat expensive process, Elasticsearch provides a faster, more strongly consistent database backed by a write-ahead log. Document creation, reads, updates, and deletes talk directly to this strongly-consistent database, which is asynchronously indexed into Lucene. Search queries lag behind the “true” state of Elasticsearch records, but should eventually catch up. One can force a flush of the transaction log to the index, ensuring changes written before the flush are made visible.

But this is Jepsen, where nothing works the way it’s supposed to. Let’s give this system’s core assumptions a good shake and see what falls out!

## What does the docs say?

What is the Elasticsearch consistency model? When I evaluate a new database, I check the documentation first.

The docs tell us that Elasticsearch provides optimistic concurrency control: each document has an atomic version number, and updates can specify a particular version required for the write to go through; this allows atomic CAS and provides the basis for independently linearizable updates to each document; i.e. every update to a particular document appears to take place atomically between the start of a request and its completion.

Moreover, Elasticsearch requires synchronous acknowledgement of each write from a majority of nodes, which suggests that a.) nodes in the minority side of a network partition will reject writes, and b.) acknowledged writes will be durable in the event that a new primary node is elected. From the documentation:

To prevent writes from taking place on the “wrong” side of a network partition, by default, index operations only succeed if a quorum (>replicas/2+1) of active shards are available.

By default, the index operation only returns after all shards within the replication group have indexed the document (sync replication).

Synchronous replication to a majority of replicas, plus the use of version-based CAS, suggests that Elasticsearch has all the primitives required to treat documents as linearizable registers. Moreover, if we issue a flush command, Elasticsearch will apply any outstanding writes from the transaction log to Lucene; a composite flush+search operation should also be linearizable, much like Zookeeper’s sync+read.

So: the consistency docs paint an optimistic picture–but what does Elasticsearch do when a node or network fails?

A search for partition on elasticsearch.org site reveals only a single reference–the “on the ‘wrong’ side of a network partition” comment regarding quorum writes. A search for fault tolerance returns only a single result–an overview blog post which simply describes Elasticsearch as “resilient to failing nodes”.

The results for “failure” are a bit more comprehensive, though. Coping with failure, part of the introductory tutorial, explains that after killing a node,

The first thing that the new master node did was to promote the replicas of these shards on Node 2 and Node 3 to be primaries, putting us back into cluster health yellow. This promotion process was instantaneous, like the flick of a switch.

I like “instantaneous” promotion. Fast convergence times are key for testing a system in Jepsen; systems that take minutes to converge on a new state drag out the testing process, especially when testing eventual consistency.

So why is our cluster health yellow and not green? We have all 3 primary shards, but we specified that we wanted two replicas of each primary and currently only one replica is assigned. This prevents us from reaching green, but we’re not too worried here: were we to kill Node 2 as well, our application could still keep running without data loss because Node 3 contains a copy of every shard.

So Elasticsearch claims to cope with the loss of a majority of nodes. That could be a problem from a consistency standpoint: it means that reads might, depending on the implementation, be able to read stale data. As we saw in the RabbitMQ post, stale reads rule out a number of consistency models. We’ll be careful to wait for the cluster to report green before doing a read, just in case.

So the official documentation is inspecific about fault tolerance. The next thing I look at in evaluating a database is reports from users: Blog posts, mailing list questions, and so on. Let’s start with a search for Elasticsearch partition tolerance.

Result #1 is this mailing list discussion from 2010, where Tal Salmona asks what ElasticSearch guarantees with respect to the CAP theorem. Shay Banon, the primary author of Elasticsearch, responds:

When it comes to CAP, in a very high level, elasticsearch gives up on partition tolerance. This is for several reasons:

2010 was a simpler time.

I personally believe that *within the same data center", network partitions very rarely happen, and when they do, its a small set (many times single) machine that gets “partitioned out of the network”. When a single machine gets disconnected from the network, then thats not going to affect elasticsearch.

Quantitative evidence is hard to find, but we have some data. Rare events are still worth defending against. Still, this is somewhat reassuring: single-node partitions were a design consideration, and we can expect Elasticsearch to be robust against them.

When it comes to search engines, and inverted index, its very hard to the point of impossible to build a solution that tries to resolve consistency problems on the fly as most products do (the famous “read repair”). When you search, you search over a large amount of docs and you can’t read repair each one all the time. Key value / column based solutions have life easy

This is true! Search is a hard problem to distribute. It doesn’t tell us much about Elasticsearch’s behavior, though. Sergio Bossa asks what happens in a simple partitioned scenario, and Banon responds with some very interesting discussion of the general problem, but not many specifics. One key point:

One simple option to solve this is to have the user define a “minimum” size of cluster that should be up, and if its not, the nodes will not allow writes. This will be in elasticsearch and its actually not that difficult to implement.

This is now built into Elasticsearch as an option called discovery.zen.minimum_master_nodes. As numerous users have noted, Elasticsearch will happily allow concurrent primary nodes when this number is less than n/2+1. Why is it even legal to set it lower? I’m not sure, but we’ll make sure to set the minimum_master_nodes to 3 for our five-node cluster.

All this is old news, though. The next interesting result is a mailing list discussion from early 2014, in which Nic asks for a clarification of Elasticsearch’s CAP tradeoffs, given the 2010 thread.

From my understanding it seems like Kimchy was confused here. As a distributed system ES can’t give up on the P - you can’t will network/communication failures out of existent!

Instead, it seems like ES mostly compromises on the A (availability) part of CAP. For example, unless you are willing to suffer potential split-brain scenarios, setting min master nodes to n/2 + 1 will mean the smaller group under a network partition will become unavailable (it will not respond to read/writes). If you do allow split-brain then clearly consistency is compromised and the client service will need to have some kind of conflict resolution mechanism.

My thoughts exactly. What’s going on here?

It would be great if there were a page on the ES site/guide which went into these issues in more detail as it is (IMO) essential information in understanding how ES works and in deciding whether it is appropriate for your use case.

Yes, yes, a thousand times yes. One of the principle reasons I put so much time into Jepsen is to prompt database vendors to clearly explain their safety and liveness invariants.

Jörg Prante, a regular on the mailing list, responds:

ES gives up on partition tolerance, it means, if enough nodes fail, cluster state turns red and ES does not proceed to operate on that index.

That sounds like giving up availability to me. Remember, CAP A means that every request to a non-failing node completes successfully.

ES is not giving up on availability. Every request will be responded, either true (with result) or false (error). In a system being not available, you would have to expect the property of having some requests that can no longer be answered at all (they hang forever or the responder is gone).

Errors are not successful responses; they don’t count towards availability.

The principle design of distributed operations in ES is like this: write all ops on an index into a WAL (the translog). Send the ops to the nodes while even some nodes may work reliable, some not. Stopping a node does not harm the cluster as long as the replica level is high enough. When a stopped node rejoins, initiate a recovery, using the WAL. Let the “best” WAL result of all consistent results of the replica win for recovering the index state.

This is useful, concrete information, and appears to describe something like a CP system, but given the poster’s confusion with respect to CAP I’d like a more authoritative explanation.

Itamar Syn-Hershko responds to Prante, arguing Elasticsearch gives up consistency when configured with fewer than quorum primary-eligible nodes. This seems valid to me, but I’m curious what happens when we do use the quorum constraints correctly.

The very fact that you can have enough replicas of your index which will make the cluster never get to a red state (e.g. the number of nodes you have) proves this. An index (an eventually, all indexes on your cluster) can survive a network split. It can also be always available, hence ES is AP.

Elasticsearch’s compromise is on C - consistency - like most NoSQL databases. It uses Eventual Consistency to answer queries, not just because of NRT search, but also because you may be querying a replica (a slave node) which hasn’t been brought up to speed yet.

Writes may be linearizable even if reads aren’t. We might be able to get Elasticsearch to do CAS safely, and test it by waiting for the cluster to recover and performing an index flush before read.

Prante responds, asserting that Elasticsearch compare-and-set operations are linearizable.

Consistency is not given up by ES. First, on doc level, you have “write your own read” consistency implemented as versioning - a doc read followed by a doc write is guaranteed to be consistent if both read and write versions match (MVCC).

Replica are not interfering with consistency, they are for availability.

As we’ve discussed, replicas are the reason why consistency is hard in the first place.

split brains can happen and ES can happily proceed reading and writing to the index in such a case, but the result is not predictable - the usual case is that two masters are going to control two divergent indices, and that is catastrophic. This is not a fault but a (nasty) feature, and must be controlled by extra safeguarding, by setting the minimum master value in the configuration …

Well, I’m a little confused by all this, but the impression that I get is that if we set minimum_master_nodes to a majority of the nodes in the cluster, Elasticsearch should be safe.

That’s really it for partition discussion, as far as I can tell. No official docs explaining the resolution algorithm, and some possibly outdated, contradictory discussion on a mailing list. There are a number of blog posts that repeat these sources, and some meetup discussions, but nothing concrete.

## Nontransitive partitions

Then somebody pointed me at this GitHub issue, reported by Saj Goonatilleke in December 2012. It’s titled “minimum_master_nodes does not prevent split-brain if splits are intersecting.” That sounds interesting.

With this setup, I can easily split a 3-node cluster into two ‘hemispheres’ (continuing with the brain metaphor) with one node acting as a participant in both hemispheres. I believe this to be a significant problem, because now minimum_master_nodes is incapable of preventing certain split-brain scenarios.

In Saj’s test case, a split in a three-node cluster isolates two nodes from each other, but not from a common node visible to both. Because both isolated nodes can see two thirds of the cluster (themselves and the common node), they believe they are eligible for leader election even when minimum_master_nodes is at least a majority.

This points to a deeper problem in Elasticsearch’s leader election process. Most leader election algorithms divide time into monotonic epochs or terms, and allow only one node to become the leader for a given term. They often enforce this constraint by requiring that a given node can only support one candidate node per term. This property, combined with the constraint that a candidate must receive votes from a majority to become leader for a term, ensures that there will be at most one leader per term.

ZenDisco, Elasticsearch’s cluster membership system, has no such invariant. A node will happily support two leaders simultaneously.

Some users reported success in switching to the Zookeeper plugin for leader election, but that component has been, as far as I know, broken since 0.90.6.

## Building a nemesis

This sort of scenario isn’t hard to verify with Jepsen, and it’s a good opportunity to introduce a Jepsen concept that we haven’t explored yet: the nemesis. Nemeses are a special type of Jepsen client, but instead of performing operations against a node in the database, they play havoc with the entire cluster; severing network links, adjusting clocks, killing processes, causing single-bit errors on disk, and so on.

First things first: we’ll use iptables to drop traffic from a node, or a collection of nodes.

(defn snub-node! "Drops all packets from node." [node] (c/su (c/exec :iptables :-A :INPUT :-s (net/ip node) :-j :DROP))) (defn snub-nodes! "Drops all packets from the given nodes." [nodes] (dorun (map snub-node! nodes))) 

Next, we need a way to fully describe a given network topology. One way would be as an adjacency list, but for our purposes it’s a little clearer to describe the links that we’re going to cut.

(defn partition! "Takes a *grudge*: a map of nodes to the collection of nodes they should reject messages from, and makes the appropriate changes. Does not heal the network first, so repeated calls to partition! are cumulative right now." [grudge] (->> grudge (map (fn [ [node frenemies]] (future (c/on node (snub-nodes! frenemies))))) doall (map deref) dorun)) 

What kind of grudge might we want to hold? Let’s call a complete grudge one in which all links are symmetric: if A can talk to B, B can talk to A, and vice-versa. We’ll be nice to our networks, and avoid unidirectional partitions for now–though they can happen in production, and Magnus Haug reported just last week that asymmetric partitions will wedge an Elasticsearch cluster in a split-brain state.

(defn complete-grudge "Takes a collection of components (collections of nodes), and computes a grudge such that no node can talk to any nodes outside its partition." [components] (let [components (map set components) universe (apply set/union components)] (reduce (fn [grudge component] (reduce (fn [grudge node] (assoc grudge node (set/difference universe component))) grudge component)) {} components))) 

We want to cut the network roughly in half, so we’ll take a list of nodes and chop it into two pieces. Then we can feed those pieces to complete-grudge to compute the links we need to cut to isolate both nodes into their own network components.

(defn bisect "Given a sequence, cuts it in half; smaller half first." [coll] (split-at (Math/floor (/ (count coll) 2)) coll)) 

Finally, a slight modification to the complete grudge: we’re going to allow one node to talk to everyone.

(defn bridge "A grudge which cuts the network in half, but preserves a node in the middle which has uninterrupted bidirectional connectivity to both components." [nodes] (let [components (bisect nodes) bridge (first (second components))] (-> components complete-grudge ; Bridge won't snub anyone (dissoc bridge) ; Nobody hates the bridge (->> (util/map-vals #(disj % bridge)))))) 

Last piece of the puzzle: we’ve got to respond to operations by initiating and healing the network partition we’ve computed. This partitioner nemesis is generic–it takes a pluggable function, like bridge, to figure out what links to cut, then calls partition! with those links.

(defn partitioner "Responds to a :start operation by cutting network links as defined by (grudge nodes), and responds to :stop by healing the network." [grudge] (reify client/Client (setup! [this test _] (c/on-many (:nodes test) (net/heal)) this) (invoke! [this test op] (case (:f op) :start (let [grudge (grudge (:nodes test))] (partition! grudge) (assoc op :value (str "Cut off " (pr-str grudge)))) :stop (do (c/on-many (:nodes test) (net/heal)) (assoc op :value "fully connected")))) (teardown! [this test] (c/on-many (:nodes test) (net/heal))))) 

Let’s give it a shot. In this test we’ll implement a linearizable set which supports adding elements and reading the current set, just like we did with Etcd, by using Elasticsearch’s compare-and-set primitives that enqueues integers (slightly staggered in time to avoid too many CAS failures) , while cutting the network into the nontransitive partition shape reported in the ticket.

(deftest register-test (let [test (run! (assoc noop-test :name "elasticsearch" :os debian/os :db db :client (cas-set-client) :model (model/set) :checker (checker/compose {:html timeline/html :set checker/set}) :nemesis (nemesis/partitioner nemesis/bridge) :generator (gen/phases (->> (range) (map (fn [x] {:type :invoke :f :add :value x})) gen/seq (gen/stagger 1/10) (gen/delay 1) (gen/nemesis (gen/seq (cycle [(gen/sleep 60) {:type :info :f :start} (gen/sleep 300) {:type :info :f :stop}]))) (gen/time-limit 600)) (gen/nemesis (gen/once {:type :info :f :stop})) (gen/clients (gen/once {:type :invoke :f :read})))))] (is (:valid? (:results test))) (pprint (:results test)))) 

## Speed bumps

The first thing I found was, well, nothing. The test crashed with a cryptic error message about exceptions–and it took a while to figure out why.

Elasticsearch supports two protocols: an HTTP protocol, which is slower, and a native binary protocol. In most cases you want to use the native protocol where available. Unfortunately, Elasticsearch uses Java serialization for exceptions–which is problematic because different JVMs serialize classes differently. In particular, InetAddress’s representation is not stable across different JVM patchlevels, which means that servers or even clients running different JVM versions will explode when trying to parse an error message from another node.

Another roadblock: “instantaneous” cluster convergence isn’t. It takes Elasticsearch ninety seconds (three rounds of 30-second timeouts) to detect a node has failed and elect a new primary. We’ll have to slow down the test schedule, allowing the network to stabilize for 300 seconds, in order to get reliable cluster transitions. There’s a configuration option to adjust those timeouts…

# Set the time to wait for ping responses from other nodes when discovering. # Set this option to a higher value on a slow or congested network # to minimize discovery failures: # discovery.zen.ping.timeout: 3s

But as far as I can tell, that knob doesn’t make failure detection any faster.

[2014-06-16 17:25:40,901][INFO ][discovery.zen ] [n5] master_left [[n1][9fvd-dZBTo2re4dukDuuRw][n1][inet[/192.168.122.11:9300]]], reason [failed to ping, tried [3] times, each with maximum [30s] timeout]

Elasticsearch has some really terrific tools for cluster introspection: you can request a JSON dump of the cluster status easily via CURL, which makes writing tests a breeze. You can also–and this is really awesome–block until the cluster reaches some desired status. In the Jepsen client, I block until the cluster health reaches green before starting the test, and once the network is repaired, before doing a final read of the set.

The problem is that the health endpoint will lie. It’s happy to report a green cluster during split-brain scenarios. I introduced additional delays to the test schedule to give Elasticsearch even more time to recover, with mixed success. Sometimes the cluster will wedge hard, and refuse to make progress until I start bouncing nodes.

Anyway, with those caveats: results!

## Nontransitive partition results

Elasticsearch compare-and-set isn’t even close to linearizable. During this type of network partition, both primaries will happily accept writes, and the node in the middle… well, it’s caught in a three-way call with Regina George, so to speak. When the cluster comes back together, one primary blithely overwrites the other’s state.

Knossos reported a linearization failure–but the final state was misssing tons of writes, and Knossos panics as soon as a single one is lost. “Welp, my job is done here: peace”.

But we’re not just interested in whether the system is linearizable. We want to quantify just how much data might be corrupted. I wrote a custom checker for Elasticsearch to try and quantify that write loss, similar to the RabbitMQ queue checker.

Here’s a result from a typical run with that checker

 :total 1961, :recovered-count 4, :unexpected-count 0, :lost-count 645, :ok-count 1103 

645 out of 1961 writes acknowledged then lost. Only 1103 writes made it to the final read. Elasticsearch issued more false successes than failure results.

Clearly Elasticsearch’s MVCC support is neither consistent nor available; it happily allows two concurrent primaries to accept writes, and destroys the writes on one of those primaries when the partition resolves, but the majority-write constraint prevents Elasticsearch from offering total availability.

What’s really surprising about this problem is that it’s gone unaddressed for so long. The original issue was reported in July 2012; almost two full years ago. There’s no discussion on the website, nothing in the documentation, and users going through Elasticsearch training have told me these problems weren’t mentioned in their classes. Fixing a distributed system is hard, but documenting its gotchas is easy. Look to RabbitMQ or Riak, both of which have extensive documentation around failure recovery, and the scenarios in which their respective systems could lose or corrupt data.

This is not a theoretical constraint. There are dozens of users reporting data loss in this ticket’s discussion, and more on the mailing list. Paul Smith reports

I’m pretty sure we’re suffering from this in certain situations, and I don’t think that it’s limited to unicast discovery.

We’ve had some bad networking, some Virtual Machine stalls (result of SAN issues, or VMWare doing weird stuff), or even heavy GC activity can cause enough pauses for aspects of the split brain to occur.

Ivan Brusic confirms:

I have seen the issue on two different 0.20RC1 clusters. One having eight nodes, the other with four.

Trevor Reeves observes:

We have been frequently experiencing this ‘mix brain’ issue in several of our clusters - up to 3 or 4 times a week. We have always had dedicated master eligible nodes (i.e. master=true, data=false), correctly configured minimum_master_nodes and have recently moved to 0.90.3, and seen no improvement in the situation.

We just ran into this problem on a 41 data node and 5 master node cluster running 0.90.9

And Mark Tinsley:

I have been having some strange occurrences using elasticsearch on aws…. For some reason, node A and B cannot talk to each other… but both can still talk to C and C can talk to A and B i.e. a ‘on the fence’ network partition as C can still see all…. As you can see B is now a new master but A has not been removed as a master, because A can still see C so has the minimum master node criteria satisfied.

Some of these problems can be ameliorated by improving Elasticsearch’s GC performance and overall responsiveness–but this will only reduce the frequency of split-brain events, not fix the underlying problem. Elasticsearch needs a real consensus algorithm to linearize updates to documents.

## Needless data loss

So let’s back off. Maybe version control is a lost cause–but there is something Elasticsearch can definitely handle safely, and that’s inserts. Inserts will never conflict with one another, and can always be merged with set union. Updates and deletes to those documents may not linearize correctly, but we can bias towards preserving data. Moreover, insertions can theoretically be 100% available: every node can take writes and queue them for later, modulo durability constraints.

Here’s a client which implements a set of integers by inserting a fresh document with an auto-generated ID for each element.

The results? Massive write loss.

 :total 2023, :recovered-count 77, :unexpected-count 0, :lost-count 688, :ok-count 1265 

Of two thousand attempts, six hundred eighty eight documents were acknowledged by Elasticsearch then thrown away.

This is something the ticket didn’t make clear: Elasticsearch could merge divergent replicas by preserving inserts on both nodes–but it doesn’t. Instead it throws all that data away. In fact, sometimes you’ll get mind-boggling errors like

{:body {"error": "RemoteTransportException[ [Cadaver][inet[/192.168.122.12:9300]][index]]; nested: RemoteTransportException[ [Death Adder][inet[/192.168.122.11:9300]][index]]; nested: DocumentAlreadyExistsException[ [jepsen-index][1] [number][EpVU56YERBOfRqyVc-_hAg]: document already exists]; ", "status": 409}} 

Elasticsearch claims–and Wireshark traces confirm–that documents inserted without an ID will receive an auto-generated UUID for their document ID. How is it possible that an insert of a fresh document with a fresh UUID can fail because that document already exists? Something is seriously wrong here.

## Random transitive partitions

Maybe you don’t believe that nontransitive–e.g. bridged–network partitions are all that common. Can Elasticsearch handle a complete, disjoint partition which isolates the cluster cleanly into two halves, without a node connecting the two sides?

Here’s a nemesis to generate this kind of partition. To determine the grudge–the links to cut–we’ll shuffle the list of nodes randomly, cut it in half, and isolate both halves from each other completely. We’ll use comp again to compose several functions together.

(defn partition-random-halves "Cuts the network into randomly chosen halves." [] (partitioner (comp complete-grudge bisect shuffle))) 

Because this partitioner shuffles the nodes, each time we invoke it we’ll get a different minority and majority component to the partition. This is a pattern that a CP system can handle well: because a majority is always visible, after a brief reconvergence latency spike a linearizable database could offer availability throughout this test.

What does Elasticsearch do in this scenario?

Elasticsearch loses data even when the partitions are total. You don’t need a bridge node to make replicas diverge. Again, we’re not even talking CAS updates–these are all freshly inserted documents; no changes, no deletions. Every single one of these writes could have been preserved safely.

FAIL in (create-test) (elasticsearch_test.clj:83) expected: (:valid? (:results test)) actual: false {:valid? false, :html {:valid? true}, :set {:valid? false, :lost "#{348 350 372 392..393 396 403 427 446 453 458 467 476 504 526 547 568..569 573 578 599 603 607 648 652}", :recovered "#{273 281 285..286 290..292 296 301 305 311 317 323 325 328..329 334 340 345 353 356 360 365 368..369 377..378 384 395 398..399 404 406 412..413 417..419 422 425..426 430..432 435 437..438 442 445 449..450 452 454 457 462..463 470 473 475 477 582 593 611 615 630 632 653 657 671 675 690 694 708 712 727 729 744 748 1034 1038 1040 1042..1043 1045..1046 1050 1052 1055 1057..1058 1060 1062 1067..1068 1070 1072 1075 1077..1078 1080 1082..1083 1085 1087 1090 1092..1093 1095 1098 1100 1107 1112..1113 1115 1117..1118 1120 1122..1123 1125 1127 1130 1132..1133 1135 1138 1140 1142..1143 1145 1147..1148 1150 1153 1155 1157..1158 1160 1162..1163 1165 1167..1168 1170 1172..1173 1175 1177..1178}", :ok "#{0..269 273 278 281 285..286 290..292 296..297 301..302 305 311..313 317 323..325 328..329 334 338 340 345..347 351 353 356 358..360 365 368..369 375..378 380 383..384 389 395 398..399 401..402 404 406 409 411..413 417..419 422..426 430..432 435..438 441..443 445 447 449..450 452 454 456..457 459 461..463 465..466 468 470..473 475 477 479 481..484 486..488 490..503 505 507..508 510..525 528..530 532..546 549..551 553..567 570..572 575..577 579..598 600..602 604..606 608..647 649..651 653..1035 1038 1040 1042..1043 1045..1046 1050 1052 1055 1057..1058 1060 1062 1067..1068 1070 1072 1075 1077..1078 1080 1082..1083 1085 1087 1090 1092..1093 1095 1098 1100 1107 1112..1113 1115 1117..1118 1120 1122..1123 1125 1127 1130 1132..1133 1135 1138 1140 1142..1143 1145 1147..1148 1150 1153 1155 1157..1158 1160 1162..1163 1165 1167..1168 1170 1172..1173 1175 1177..1178}", :total 1180, :recovered-count 149, :unexpected-count 0, :unexpected "#{}", :lost-count 25, :ok-count 970}} 

Only 25 writes thrown away this round, but that number should have been zero. You can lower the probability of loss by adding more replicas, but even with a replica on every single node, Elasticsearch still finds a way to lose data.

## Fixed transitive partitions

Maybe the problem is that the partitions are shifting over time–perhaps isolating one set of nodes, then another set, is what confuses Elasticsearch. We might be looking at a time horizon too short for Elasticsearch to stabilize between failures, causing it to interpret the disjoint components as some partially-transitive graph.

But as Shikhar Bhushan noted last week, even a nemesis which always generates the same partitioning pattern, like [n1 n2] [n3 n4 n5], causes split brain and write loss. This nemesis is just like the random partitoner, but we drop the shuffle step.

(defn partition-halves "Responds to a :start operation by cutting the network into two halves--first nodes together and in the smaller half--and a :stop operation by repairing the network." [] (partitioner (comp complete-grudge bisect))) 

Running this test reveals that it’s not just shifting partitions: a constant partition pattern will cause Elasticsearch to enter split brain and throw away data.

 {:valid? false, :lost "#{90 94 104 107 122..123 135 160 173 181 188 200 229 279 337 398 422}", :recovered "#{6 8 12 15 17 19 23 27 29 31 34 36 38 40 43..44 47 50 53 55 61 64 66 71 74 78 81 83..84 86 91..92 97 99..100 103 109..110 114 116 119..121 126 132..133 137..139 142..144 147..149 152..154 157..159 163..165 168..169 171 176..177 179 182 184..185}", :ok "#{0..4 6 8 12 15 17 19 23 27 29 31 34 36 38 40 43..44 47 50 53 55 61 64 66 71 74 78 81 83..84 86 91..92 97 99..100 103 109..110 114 116 119..121 126 132..133 137..139 142..144 147..149 152..154 157..159 163..165 168..171 175..179 182 184..185 187 189 191..192 194..199 201..203 205..210 212..223 225..228 230..231 233..241 243..251 253..261 263..270 272..278 280..282 284..289 291..301 303..307 309..319 321..326 328..336 338..342 344 346..360 362..364 366..378 380..382 384..397 399..401 404..420 423..425 427..439 441..443 445..458 460}", :total 461, :recovered-count 73, :unexpected-count 0, :unexpected "#{}", :lost-count 17, :ok-count 319}} 

Seventeen writes lost here. It doesn’t take a shifting pattern of failures to break the cluster.

## Single-node partitions

Maybe this is to be expected; Banon asserted on the mailing list that partitions involving multiple nodes are rare–but Elasticsearch should be able to handle a single node being isolated.

By symmetry, the most interesting node to isolate is the primary. To find the primary, we can query Elasticsearch’s cluster status endpoint on each host, to see what it thinks the current primary is.

We’ll take the nodes, make an HTTP request to each, and extract the master_node field of the result. That’s a unique instance identifier, so we dig into the nodes structure to map that identifier back to a node name. Then we turn those [node, supposed-primary] pairs into a hashmap.

(defn primaries "Returns a map of nodes to the node that node thinks is the current primary, as a map of keywords to keywords. Assumes elasticsearch node names are the same as the provided node names." [nodes] (->> nodes (pmap (fn [node] (let [res (-> (str "http://" (name node) ":9200/_cluster/state") (http/get {:as :json-string-keys}) :body) primary (get res "master_node")] [node (keyword (get-in res ["nodes" primary "name"]))]))) (into {}))) 

Let’s limit ourselves to nodes that think they’re the primary.

(defn self-primaries "A sequence of nodes which think they are primaries." [nodes] (->> nodes primaries (filter (partial apply =)) (map key))) 

We’re just filtering that map to pairs where the key and value are equal, then returning the keys–nodes that think they themselves are current primaries. Next, we’ll write a special partitioner, just for Elasticsearch, that isolates those nodes.

(def isolate-self-primaries-nemesis "A nemesis which completely isolates any node that thinks it is the primary." (nemesis/partitioner (fn [nodes] (let [ps (self-primaries nodes)] (nemesis/complete-grudge ; All nodes that aren't self-primaries in one partition (cons (remove (set ps) nodes) ; Each self-primary in a different partition (map list ps))))))) 

The first time we run this, a single node will be isolated. We’ll let the cluster converge for… say 200 seconds–more than long enough for pings to time out and for a new primary to be elected, before repairing the partition. Then we’ll initiate a second partition–again cutting off every node that thinks it’s a primary. There might be more than one if Elasticsearch is still in split-brain after those 200 seconds.

Elasticsearch still loses data, even when partitions isolate only single nodes.

FAIL in (create-test) (elasticsearch_test.clj:86) expected: (:valid? (:results test)) actual: false {:valid? false, :html {:valid? true}, :set {:valid? false, :lost "#{619..620 624 629..631 633..634}", :recovered "#{7..8 10..11 15 24 26..27 30 32..35 41 43..44 46..49 51 53 55 57 59..61 67..70 72 75..77 79..81 86..87 91..94 96..99 527 544 548 550 559 563 577 579 583..584 589 591 597 600 604 612 615 618}", :ok "#{0..5 7..11 13 15..20 22..24 26..28 30 32..35 37 39..44 46..49 51..55 57..61 63..70 72..77 79..82 84..89 91..94 96..99 101..108 110..131 133..154 156..176 178..200 202..221 223..243 245..267 269..289 291..313 315..337 339..359 361..382 384..405 407..427 429..451 453..475 477..497 499..521 523..527 529..531 533 537..539 544..545 548 550 552..553 556 559 563 566 572 574..575 577..579 583..584 587..591 596..597 600 602 604 607 610 612..613 615 617..618 621 623 625 627..628 632 635..637}", :total 638, :recovered-count 66, :unexpected-count 0, :unexpected "#{}", :lost-count 8, :ok-count 541}} 

This test lost eight acknowledged writes, all clustered around write number 630, just prior to the end of the second partition. It also wedges the cluster hard–even though cluster status reports green, some nodes refuse to converge. All kinds of weird messages in the logs, including an enigmatic reason [do not exists on master, act as master failure]. In order to get any data back from the final read in these tests, I had to manually restart the stuck nodes during the end-of-test waiting period. Anecdotal reports suggest that this is not merely a theoretical problem; Elasticsearch convergence can deadlock in production environments as well.

## One single-node partition

Perhaps isolating nodes more than once is too difficult to handle. Let’s make things easier by only having one partition occur. We’ll cut off a single primary node once, wait for the cluster to converge, let it heal, wait for convergence again, and do a read. This is the simplest kind of network failure I know how to simulate.

 (gen/nemesis (gen/seq [(gen/sleep 30) {:type :info :f :start} (gen/sleep 200) {:type :info :f :stop}])) 

This time the cluster manages to recover on its own, but it still lost inserted documents.

 {:valid? false, :lost "#{687..689 692..693 700 709 711 717..718 728..730 739 742 755 757 764 766 771 777 780 783 785 797 800 816..817 820 841 852}", :recovered "#{140..142 147 151..153 161 163 166 168..169 172 175 178..179 182..185 187..192 197 200 203..207 209..210 212..213 215..216 218 220..221 223..225 229 231..232 235 237 239..240 242 323 368 393 484 506 530 552 577 598 623 644 714 720 727 746 758 760 762 772 774 790..791 802 806 836 851}", :ok "#{0..142 144..145 147..149 151..154 156..163 165..166 168..173 175 177..179 181..185 187..213 215..226 228..242 244..253 255..276 278..301 303..346 348..414 416..437 439..460 462..666 668..685 694..699 701..707 710 712..716 719..727 731..738 740..741 744..748 750..754 756 758..763 765 767..770 772..775 778..779 781..782 784 786..787 789..796 798..799 801..815 818 821..822 824..826 828..840 842..851 853..854}", :total 855, :recovered-count 79, :unexpected-count 0, :unexpected "#{}", :lost-count 31, :ok-count 792}} 

31 acknowledged writes lost, out of 855 attempts. Or, if you’re unlucky, it might throw away almost all your data.

 {:valid? false, :lost "#{0..1 4..6 8..11 14 17..18 20..23 25..26 28 30..32 35 38..42 44 46..49 51..52 54..56 58 60 62 64..66 68 70 72..73 77..78 80 84..85 87 89 91..92 94 96 98..100 105..110 112..113 118..123 125 128..129 131 134..137 139 144 147 154..158 160..162 166..167 172..174 180 182..183 186 190..192 196 200 202 207..208 221 226..228 230..233 235..237 239 244..256 258..277 279..301 303..323 325..346 348..368 370..390 392..413 415..436 438..460 462..482 484..506 508..528 530..552 554..574 576..598 600..619}", :recovered "#{}", :ok "#{2..3 7 12..13 15..16 19 24 27 29 33..34 36..37 43 45 50 53 57 59 61 63 67 69 71 74..76 79 81..83 86 88 90 93 95 97 101..104 111 114..117 124 126..127 130 132..133}", :total 620, :recovered-count 0, :unexpected-count 0, :unexpected "#{}", :lost-count 484, :ok-count 54}} 

In this run, a single network partition isolating a primary node caused the loss of over 90% of acknowledged writes. Of 619 documents inserted, 538 returned successful, but only 54–10%–of those documents appeared in the final read. The other 484 were silently discarded.

Remember, this is the one kind of network failure Elasticsearch was designed to withstand. Note that the write loss pattern varies with time; we lost a mixture of writes through the first quarter of the test, and recovered nothing written after 133.

This test is a little harder to reproduce; sometimes it’ll recover all writes, and occasionally it’ll drop a ton of data. Other times it just loses a single write. I thought this might be an issue with flush not actually flushing the transaction log, so I upped the delay before read to 200 seconds–and still found lost data. There may be multiple bugs at play here.

## Recommendations for Elasticsearch

To summarize: Elasticsearch appears to lose writes–both updates and even non-conflicting inserts–during asymmetric partitions, symmetric partitions, overlapping partitions, disjoint partitions, and even partitions which only isolate a single node once. Its convergence times are slow and the cluster can repeatably deadlock, forcing an administrator to intervene before recovery.

I wish I had a better explanation for these problems, but I’ve already burned a hundred-odd hours on Elasticsearch and need to move on to other projects. I’ve got a few conjectures, though.

Elasticsearch’s cluster membership protocol is a mess, and Elasticsearch opts against using an external coordination service like Zookeeper for membership on the grounds that it has access to less information about the cluster’s ongoing operations. Elasticsearch has started to put work into their fault-tolerance mechanisms, but that work is still incomplete.

The thing is that leader election is a well-studied problem with literally dozens of formally described algorithms and none of that literature appears to have influenced Zendisco’s design. Leader election is not sufficient to ensure correctness, but using a peer-reviewed algorithm would be a heck of a lot better than the current morass of deadlocks and persistent split-brain.

Despite what the docs claim, I’m not convinced that Elasticsearch actually does synchronous replication correctly. It looks to me–and I haven’t verified this with a packet trace yet, so this is a bit shaky–that primary nodes can acknowledge writes made even when they’re isolated from the rest of the cluster. The patterns of write loss in the Jepsen histories are pretty darn suggestive. I’d expect to see a clear-cut pattern of timeouts and failures when a partition starts, and no successful writes until a new primary is elected. Instead, Elasticsearch continues to acknowledge writes just after a partition occurs. It might be worth investigating that write path to make sure it won’t ack a write until a majority confirm.

Speaking of majority, Shikhar Bhushan notes that Elasticsearch considers a quorum for two nodes to be a single node. This definition invalidates, well, basically every proof about non-dominating coterie consensus, but the Elasticsearch team doesn’t seem keen on changing it. I don’t think this affects the tests in Jepsen, which maintain three or five replicas, but it’s worth keeping in mind.

None of these changes would be sufficient for linearizability, incidentally. Distributed commit requires a real consensus protocol like Paxos, ZAB, Viewstamped Replication, Raft, etc. The Elasticsearch team is investigating these options; from unofficial murmurings, they may release something soon.

In the short term, Elasticsearch could provide better safety constraints by treating a collection of documents as a CRDT–specifically, an LWW-element set where the timestamps are document versions. This isn’t linearizable, but it would allow Elasticsearch to trivially recover inserts made on both sides of a cluster, and to preserve writes made on only one side of a partition. Both of these behaviors would significantly improve Elasticsearch’s fault tolerance, and is significantly simpler to implement than, say, Raft or Paxos.

## Recommendations for users

If you are an Elasticsearch user (as I am): good luck.

Some people actually advocate using Elasticsearch as a primary data store; I think this is somewhat less than advisable at present. If you can, store your data in a safer database, and feed it into Elasticsearch gradually. Have processes in place that continually traverse the system of record, so you can recover from ES data loss automatically.

Folks will tell you to set minimum_master_nodes to a majority of the cluster, and this is a good idea–but it won’t, by any means, buy you safety. I’m honestly not sure what will. Raising the replica count reduces the odds of losing inserts, but means more replicas to conflict during CAS and won’t guarantee safety even when a replica is present on every single node in the cluster.

The good news is that Elasticsearch is a search engine, and you can often afford the loss of search results for a while. Consider tolerating data loss; Elasticsearch may still be the best fit for your problem.

Moreover, none of this invalidates the excellent tooling available around Elasticsearch. People love Logstash and Kibana. The ES API is surprisingly straightforward, and being able to just spew JSON at a search system is great. Elasticsearch does a great job of making distributed search user-friendly. With a little work on correctness, they can make it safe, too.

This concludes the third round of Jepsen tests. I’d like to thank Comcast for funding this research, and to Shay Banon, Bob Poekert, Shikhar Bhushan, and Aaron France for their help in writing these tests and understanding Elasticsearch’s behavior. I am indebted to Camille Fournier, Cliff Moon, Coda Hale, JD Maturen, Ryan Zezeski, Jared Morrow, Kelly Sommers, Blake Mizerany, and Jeff Hodges for their feedback on both the conference talks and these posts, and to everyone who reached out with reports of production failures, paper recommendations, and so much more.

And finally, thank you, reader, for sticking with what was probably far more than you ever wanted to know about databases and consistency. I hope you can use the techniques from these posts to help verify the safety of your own distributed systems. Happy hunting.

# Call me maybe: etcd and Consul

In the previous post, we discovered the potential for data loss in RabbitMQ clusters. In this oft-requested installation of the Jepsen series, we’ll look at etcd: a new contender in the CP coordination service arena. We’ll also discuss Consul’s findings with Jepsen.

Like Zookeeper, etcd is designed to store small amounts of strongly-consistent state for coordination between services. It exposes a tree of logical nodes; each identified by a string key, containing a string value, and with a version number termed an index–plus, potentially, a set of child nodes. Everything’s exposed as JSON over an HTTP API.

Etcd is often used for service discovery, distributed locking, atomic broadcast, sequence numbers, and pointers to data in eventually consistent stores. Because etcd offers atomic compare-and-set by both value and version index, it’s a powerful primitive in building other distributed systems.

In this post, we’ll write a Jepsen test for etcd, and see whether it lives up to its consistency claims.

## Writing a client

A client, in Jepsen, applies a series of operations to one particular node in a distributed system. Our client will take invocations like {:process 2, :type :invoke, :f :cas, :value [1 2]}, try to change the value of a register from 1 to 2, and return a completion like {:process 2, :type :ok, :f :cas, :value [1 2]} if etcd acknowledges the compare-and-set. If you’re a little confused, now might be a good time to skim through the earlier discussion of strong consistency models.

So: first things first. We’ll define a new datatype, called CASClient, with two fields: a key k, and an etcd client client.

(defrecord CASClient [k client] 

Jepsen has a protocol–a suite of functions–for interacting with clients. We’ll define how CASClient supports that protocol by declaring the protocol name client/Client, followed by three functions from that protocol: setup!, invoke!, and teardown!.

 client/Client (setup! [this test node] 

The setup function takes three arguments: the CASClient itself (this), the test being run, and the name of the node this client should connect to. Think of a client like a stem cell: before the test runs, it lies latent, unspecialized. When the test starts, we’ll spawn a client for each node. The setup! function differentiates a latent client, returning an active client bound to one particular node. Some state, like the key k, will be inherited by the new client. Other state, like the database connection, will be set up for each new client independently.

 (let [client (v/connect (str "http://" (name node) ":4001"))] (v/reset! client k (json/generate-string nil)) (assoc this :client client))) 

In this namespace, we’ll use my Verschlimmbesserung etcd client and call its namespace v. v/connect creates a new Verschlimmbersserung client for the given node. We call v/reset! to initialize the key k to nil, json-encoded. Then, using assoc, we return a copy of this CASClient, but with the :client field replaced by the Verschlimmbersserung client.

Next, we’ll implement the Client protocol’s invoke! function, which takes a Client, a test, and an invocation to apply.

 (invoke! [this test op] 

Things get a little complicated now. We often say that an unexpected exception or a timeout means an operation failed–but in verifying strong consistency, we need to be a little more precise. We must distinguish between three outcomes:

1. :ok results, where the operation definitely occurred,
2. :fail results, where the operation definitely did not occur, and
3. :info results, where the operation might or might not have taken place.

Indeterminate results, like timeouts, are the bane of the model checker. We never know whether those operations might complete at some point hours or weeks later–so when a timeout occurs, we consider the process crashed and spawn a new one. That process is still concurrent with every subsequent operation in the history, which imposes a huge cost at verification time. Wherever possible, we want to declare definitively that an operation did or did not happen.

Reads are a special case: they don’t affect the state of the system, so as far as the model checker is concerned, an indeterminate read can always be interpreted as never having happened at all–e.g., a :fail state. We’re going to use this distinction in some error handling code later.

 ; Reads are idempotent; if they fail we can always assume they didn't ; happen in the history, and reduce the number of hung processes, which ; makes the knossos search more efficient (let [fail (if (= :read (:f op)) :fail :info)] 

Now, depending on the function :f of the invoke operation, we’ll either read, write, or compare-and-set the value at key k.

 (try+ (case (:f op) :read (let [value (-> client (v/get k {:consistent? true}) (json/parse-string true))] (assoc op :type :ok :value value)) :write (do (->> (:value op) json/generate-string (v/reset! client k)) (assoc op :type :ok)) :cas (let [[value value'] (:value op) ok? (v/cas! client k (json/generate-string value) (json/generate-string value'))] (assoc op :type (if ok? :ok :fail)))) 

For a read, we’ll take the client, get the key using an etcd consistent read, and parse the key as JSON. Then we’ll return a copy of the invocation, but with the type :ok and the :value obtained from etcd. Note that we’re using etcd’s consistent read option, which claims:

If your application wants or needs the most up-to-date version of a key then it should ensure it reads from the current leader. By using the consistent=true flag in your GET requests, etcd will make sure you are talking to the current master.

For a write, we’ll take the :value from the operation, serialize it to JSON, and call v/reset! to change the register to that new value.

For a compare-and-set (:cas), we’ll take a pair of values–old and new–and bind them to value and value'. We’ll serialize both to JSON, and call v/cas! to atomically set k to the new value iff it currently has the old value. v/cas! returns true if the CAS succeeded, and false if the CAS failed, so we return a :type of :ok or :fail depending on its return value ok?.

Finally, we’ll handle a few common error conditions, just to reduce the chatter in the logs.

 ; A few common ways etcd can fail (catch java.net.SocketTimeoutException e (assoc op :type fail :value :timed-out)) (catch [:body "command failed to be committed due to node failure\n"] e (assoc op :type fail :value :node-failure)) (catch [:status 307] e (assoc op :type fail :value :redirect-loop)) (catch (and (instance? clojure.lang.ExceptionInfo %)) e (assoc op :type fail :value e)) (catch (and (:errorCode %) (:message %)) e (assoc op :type fail :value e))))) 

If we’re doing a read, these error handlers will return :fail as a performance optimization. If we’re doing a write or CAS, they’ll return :info, letting Knossos know that those operations might or might not have taken place.

One last function from the Client protocol: teardown!, which releases any resources the clients might be holding on to. These clients are just stateless HTTP wrappers, so there’s nothing to do here.

 (teardown! [_ test])) 

That’s it, really. We’ll write a little function to create a latent instance of this datatype, and use it in our test! We’ll call our key "jepsen", and leave the client field blank–it’ll be filled in by calls to setup!.

(defn cas-client "A compare and set register built around a single etcd key." [] (CASClient. "jepsen" nil)) 

We need a model of an etcd register to go along with this client. The model’s job is to take an operation, apply it to the current state of the model, and return a new model state–or a special inconsistent state if the given operation can’t be applied. We’ll create a datatype called CASRegister, which has a single field called value.

(defrecord CASRegister [value] Model (step [r op] (condp = (:f op) :write (CASRegister. (:value op)) :cas (let [[cur new] (:value op)] (if (= cur value) (CASRegister. new) (inconsistent (str "can't CAS " value " from " cur " to " new)))) :read (if (or (nil? (:value op)) (= value (:value op))) r (inconsistent (str "can't read " (:value op) " from register " value)))))) 

Just like our invoke! function, CASRegister chooses what to do based on the operation’s function :f. For a write, it returns a new CASRegister wrapping the given value.

For a compare-and-set, it binds the current and new values, then checks whether its own value is equal to the operation’s current value. If it is, we return a new register with the new value. If it isn’t, we construct a special inconsistent result, explaining why the CAS operation won’t work.

When a read is invoked, the client may not know what value it’s reading. We allow the read to go through–returning the same model r–if the client doesn’t provide a value to be read, or if the value the client read is equal to our current value. If the client tries to read some specific value, and it’s not the current value of the register, though–that’s an inconsistent state.

Then, a quick constructor function that starts off with the value nil. Note that this initial value corresponds to the value we wrote to the etcd key when the clients start up; both the real system and the model have to start in the same state.

(defn cas-register "A compare-and-set register" ([] (cas-register nil)) ([value] (CASRegister. value))) 

With the client and model written, it’s time to combine both into a Jepsen test:

## Designing a test

We’ll start with a baseline noop-test, and override it with etcd-specific fields. We’re pulling in etcd’s db to automate setup and teardown of the cluster, the cas-client we wrote earlier, and model/cas-register–our singlethreaded model of a compare-and-set register. We’ll use two checkers: an HTML timeline visualization, and the linearizable checker, powered by Knossos. A special client, the :nemesis, introduces network failures by partitioning the cluster into randomly selected halves.

(deftest register-test (let [test (run! (assoc noop-test :name "etcd" :os debian/os :db (db) :client (cas-client) :model (model/cas-register) :checker (checker/compose {:html timeline/html :linear checker/linearizable}) :nemesis (nemesis/partition-random-halves) :generator (gen/phases (->> gen/cas (gen/delay 1) (gen/nemesis (gen/seq (cycle [(gen/sleep 5) {:type :info :f :start} (gen/sleep 5) {:type :info :f :stop}]))) (gen/time-limit 20)) (gen/nemesis (gen/once {:type :info :f :stop})) (gen/sleep 10) (gen/clients (gen/once {:type :invoke :f :read})))))] (is (:valid? (:results test))) (report/linearizability (:linear (:results test))))) 

The generator defines the sequence and schedule of operations. This test proceeds in phases–all clients must complete a phase before any can move to the next. We’ll start off with gen/cas, which emits a mix of random :read, :write, and :cas invocations.

(def cas "Random cas/read ops for a compare-and-set register over a small field of integers." (reify Generator (op [generator test process] (condp < (rand) 0.66 {:type :invoke :f :read} 0.33 {:type :invoke :f :write :value (rand-int 5)} 0 {:type :invoke :f :cas :value [(rand-int 5) (rand-int 5)]})))) 

We wrap that generator in gen/delay, adding an extra second of latency to each operation to slow down the test a bit. Meanwhile, the nemesis cycles through an infinite sequence of sleeping, starting a network partition, sleeping, then resolving the partition. We limit the entire phase to 20 seconds–etcd convergence times are quite fast.

In the next phase, the nemesis emits a single :stop operation, resolving the network partition. We sleep for 10 seconds, then ask each client to perform a final read, just to see how the system stabilized.

## Running the test

Etcd starts up fast. It converges in a matter of milliseconds, whereas many systems take 10 seconds or even minutes to detect failures. This is really convenient for testing–and arguably a nice property in production–but it also exposed a number of serious issues in etcd’s cluster state management: most notably, race conditions.

For instance, Issue 716 caused the primary to death-spiral almost every time I stood up a cluster, even with five or ten seconds between joining each node. The etcd team was incredibly responsive about fixing these bugs, but I’m kind of surprised to find problems like this in software that’s been released for almost a year. I’ve heard several anecdotal reports of other concurrency issues in goraft (the implementation of the underlying consensus protocol) which makes me a little nervous about trusting it, but it’s tough to turn anecdotes into reproducible failure cases, so I won’t dive into those here.

With some work, I was able to reliably stand up a cluster

Here’s one of the shorter cases in full. First, Jepsen stands up the etcd cluster on nodes :n1, :n2, etc, and spools up five worker threads.

INFO jepsen.system.etcd - :n4 etcd ready INFO jepsen.system.etcd - :n1 etcd ready INFO jepsen.system.etcd - :n5 etcd ready INFO jepsen.system.etcd - :n2 etcd ready INFO jepsen.system.etcd - :n3 etcd ready INFO jepsen.core - Worker 0 starting INFO jepsen.core - Worker 3 starting INFO jepsen.core - Worker 2 starting INFO jepsen.core - Worker 4 starting INFO jepsen.core - Worker 1 starting 

Each worker thread, representing a process, concurrently invokes a series of random operations against a single etcd register. Each process talks to a distinct etcd node for each request, but follows redirects to whatever node that node thinks is the current leader. They happen to start off all making reads of the initial value nil. Then process 0 begins a read, process 3 begins a compare-and-set from 2 to 4, which fails since the value is nil, and so on.

INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 4 :invoke :read nil INFO jepsen.util - 0 :invoke :read nil INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 3 :invoke :read nil INFO jepsen.util - 0 :ok :read nil INFO jepsen.util - 3 :ok :read nil INFO jepsen.util - 4 :ok :read nil INFO jepsen.util - 2 :ok :read nil INFO jepsen.util - 1 :ok :read nil INFO jepsen.util - 0 :invoke :read nil INFO jepsen.util - 3 :invoke :cas [2 4] INFO jepsen.util - 4 :invoke :cas [4 4] INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 0 :ok :read nil INFO jepsen.util - 2 :ok :read nil INFO jepsen.util - 1 :ok :read nil INFO jepsen.util - 4 :fail :cas [4 4] INFO jepsen.util - 3 :fail :cas [2 4] 

The nemesis process initiates a network partition, isolating :n5 from :n4 and :n1, isolating :n2 from :n4 and :n1, etc. Notice that it takes some time for the nemesis to make those changes to the network.

INFO jepsen.util - :nemesis :info :start nil INFO jepsen.util - 0 :invoke :write 4 INFO jepsen.util - 0 :ok :write 4 INFO jepsen.util - 3 :invoke :read nil INFO jepsen.util - 3 :ok :read 4 INFO jepsen.util - 2 :invoke :cas [0 4] INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 1 :ok :read 4 INFO jepsen.util - 4 :invoke :write 1 INFO jepsen.util - 2 :fail :cas [0 4] INFO jepsen.util - 4 :ok :write 1 INFO jepsen.util - 0 :invoke :read nil INFO jepsen.util - 0 :ok :read 1 INFO jepsen.util - 3 :invoke :read nil INFO jepsen.util - 3 :ok :read 1 INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 1 :ok :read 1 INFO jepsen.util - :nemesis :info :start "Cut off {:n5 #{:n4 :n1}, :n2 #{:n4 :n1}, :n3 #{:n4 :n1}, :n1 #{:n3 :n2 :n5}, :n4 #{:n3 :n2 :n5}}" 

We see a few operations time out–which we expect from a CP system.

INFO jepsen.util - 3 :info :cas :timed-out INFO jepsen.util - 1 :invoke :write 3 INFO jepsen.util - 1 :ok :write 3 INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 2 :ok :read 3 INFO jepsen.util - 4 :invoke :cas [2 3] INFO jepsen.util - 4 :fail :cas [2 3] INFO jepsen.util - 0 :invoke :write 0 INFO jepsen.util - 8 :invoke :cas [1 2] INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 1 :ok :read 3 INFO jepsen.util - 2 :invoke :write 2 INFO jepsen.util - 2 :ok :write 2 INFO jepsen.util - 4 :invoke :cas [2 3] INFO jepsen.util - 0 :info :write :timed-out INFO jepsen.util - 4 :ok :cas [2 3] INFO jepsen.util - 8 :info :cas :timed-out 

After a few cycles of isolating and reconnecting nodes, something interesting happens just as the network is cut off: a “Raft Internal Error”:

FO jepsen.util - 4 :info :cas {:status 500, :errorCode 300, :message "Raft Internal Error", :index 41} INFO jepsen.util - 10 :ok :write 0 INFO jepsen.util - :nemesis :info :start "Cut off {:n1 #{:n2 :n5}, :n4 #{:n2 :n5}, :n3 #{:n2 :n5}, :n5 #{:n3 :n4 :n1}, :n2 #{:n3 :n4 :n1}}" 

Another failure case: two nodes each think the other is the current leader, returning HTTP redirects to the other node in an infinite loop.

INFO jepsen.util - 1 :invoke :read nil INFO jepsen.util - 2 :invoke :read nil INFO jepsen.util - 2 :ok :read 0 INFO jepsen.util - 1 :fail :read :redirect-loop 

And the test completes. By and large, operations completed reliably with low latencies, and despite some failures, eyeballing the test things look correct. The Jepsen I wrote a year ago would have called these results A-OK.

INFO jepsen.util - 18 :ok :read 2 INFO jepsen.core - Worker 3 done INFO jepsen.core - Run complete, writing INFO jepsen.core - Analyzing INFO jepsen.core - Analysis complete 

Jepsen II, however, is not quite so forgiving.

## Results

The very first test I ran with reported a linearizability failure. I was so surprised I spent another week double-checking Knossos and Jepsen, then writing my own etcd client, to make sure I hadn’t made a mistake. Sure enough, etcd’s registers are not linearizable.

FAIL in (register-test) (etcd_test.clj:45) expected: (:valid? (:results test)) actual: false Not linearizable. Linearizable prefix was: 2 :invoke :read nil 4 :invoke :read nil 0 :invoke :read nil 1 :invoke :read nil 3 :invoke :read nil 0 :ok :read nil 3 :ok :read nil 4 :ok :read nil 2 :ok :read nil 1 :ok :read nil 0 :invoke :read nil 3 :invoke :cas [2 4] 4 :invoke :cas [4 4] 2 :invoke :read nil 1 :invoke :read nil 0 :ok :read nil 2 :ok :read nil 1 :ok :read nil 4 :fail :cas [4 4] 3 :fail :cas [2 4] 0 :invoke :read nil 0 :ok :read nil 2 :invoke :write 1 1 :invoke :cas [2 3] 2 :ok :write 1 1 :fail :cas [2 3] 4 :invoke :write 3 3 :invoke :write 1 4 :ok :write 3 3 :ok :write 1 0 :invoke :cas [4 1] 2 :invoke :write 2 1 :invoke :write 1 0 :fail :cas [4 1] 3 :invoke :read 1 4 :invoke :write 0 3 :ok :read 1 2 :ok :write 2 1 :ok :write 1 4 :ok :write 0 :nemesis :info :start nil 0 :invoke :write 4 0 :ok :write 4 3 :invoke :read 4 3 :ok :read 4 2 :invoke :cas [0 4] 1 :invoke :read 4 1 :ok :read 4 4 :invoke :write 1 2 :fail :cas [0 4] 4 :ok :write 1 0 :invoke :read 1 0 :ok :read 1 3 :invoke :read 1 3 :ok :read 1 1 :invoke :read 1 1 :ok :read 1 :nemesis :info :start "Cut off {:n5 #{:n4 :n1}, :n2 #{:n4 :n1}, :n3 #{:n4 :n1}, :n1 #{:n3 :n2 :n5}, :n4 #{:n3 :n2 :n5}}" 2 :invoke :cas [1 4] 4 :invoke :read 1 4 :ok :read 1 2 :ok :cas [1 4] Followed by inconsistent operation: 0 :invoke :read 1 

Why aren’t we allowed to read 1 from the register at this point? Knossos can provide us a litany of possible worlds, just prior to that fatal read. For instance, we might have ordered events like so: process 1 reads nil, process 3 reads nil, …

World with fixed history: 1 :invoke :read nil 3 :invoke :read nil 2 :invoke :read nil 4 :invoke :read nil 0 :invoke :read nil 2 :invoke :read nil 1 :invoke :read nil 0 :invoke :read nil 0 :invoke :read nil 2 :invoke :write 1 4 :invoke :write 3 3 :invoke :write 1 3 :invoke :read 1 4 :invoke :write 0 1 :invoke :write 1 2 :invoke :write 2 0 :invoke :write 4 3 :invoke :read 4 1 :invoke :read 4 4 :invoke :write 1 0 :invoke :read 1 3 :invoke :read 1 1 :invoke :read 1 4 :invoke :read 1 2 :invoke :cas [1 4] led to state: {:value 4} with pending operations: (and 12928 more worlds, elided here) 

But the key problem is that in all thirteen-thousand odd interpretations of this history, every one of those worlds led to a register with the value 4.

Inconsistent state transitions: ([{:value 4} "can't read 1 from register 4"]) 

Once that CAS goes through, a linearizable register can’t return the previous value for a read. This violates linearizability.

## What’s going on here?

Looking at the history just prior to that failure, we see that process 4 wrote 1 to the register, and several processes read that value before the partition occurred. It looks like the value was 1, a compare-and-set from 1 to 4 took place, but after that CAS completed, some process managed to read the previous value. In the consistency literature, this is called a stale read.

Stale reads are bad. They don’t just violate linearizability–they violate sequential consistency, causal consistency, read-your-write, monotonic writes, monotonic reads–basically everything you’d want from a single-valued register goes out the window. This is particularly surprising because Raft, the etcd consensus algorithm, guarantees that committed log entries are linearizable.

But etcd’s “consistent” reads don’t go through the Raft log.

Instead, they simply return the local state if the current node considers itself a leader. But Raft says nothing about guaranteeing leader exclusivity: multiple nodes can consider themselves the leader simultaneously.

So imagine two nodes, separated by a network partition, have the value 1. The node on top has just been elected leader for the most recent term, and accepts that CAS request, changing 1 to 4. It’s unable to propagate that change to the old leader because they’re separated by a network partition. The old leader goes on happily replying to reads with the old value, until it realizes it hasn’t received a heartbeat from a majority of peers in some time, and steps down.

Once the partition resolves, the old leader receives the new value 4 from the new leader, and the system continues on its way.

I want to be explicit, because some people have asserted that this behavior is “linearizable with respect to the Raft index”, even if it isn’t “linearizable in general”. It’s neither. An etcd “consistent read” can read a value from index 5, then index 4, then index 6, and so on. I think you might be able to recover sequential consistency by adding an FSM to the client that tracks the etcd index and tags all requests with a minimum-index constraint, but this is a.) optional, b.) not in any of the clients I know of, and c.) isn’t linearizable anyway.

## A note on Consul

Just after I found this bug in etcd, Hashicorp announced a new service-discovery project called Consul. A half-dozen people asked me what I thought of its design, and to my delight, its authors had already tested their system using Jepsen’s etcd test as a template. They reported:

As part of our Consul testing, we ran it against Jepsen to determine if any consistency issues could be uncovered. In our testing, Consul gracefully recovered from partitions without introducing any consistency issues.

This is not quite the whole story.

Jepsen actually did find a consistency issue. In fact, it found the same mistake that etcd made: “consistent” reads in Consul return the local state of any node that considers itself a leader, allowing stale reads. Their solution at the time was to change the leader timeout from 1 second to 300 milliseconds, side-stepping the race condition.

- LeaderLeaseTimeout: time.Second, + LeaderLeaseTimeout: 300 * time.Millisecond, 

Now, I’ve fought quite a few race conditions in my day, and adjusting the timeouts is a great nuclear option–but it doesn’t really guarantee correctness. High IO utilization and blocking syscalls can introduce surprising delays into processes at runtime. VMWare vmotion will happily pause a process for seconds, as will garbage collection. Go’s GC is not particularly sophisticated yet, and there are production reports of ten second garbage collection pauses. Bottom line: a seven-hundred millisecond pause is not gonna cut it. The best way to solve a race condition, in general, is to remove the time dependence from the algorithm altogether.

Future iterations of Jepsen may be somewhat more challenging with respect to clock assumptions.

## Good news, everyone!

I’ve corresponded with both the etcd and Consul teams about this, and the emerging consensus is to implement three types of reads, for varying performance/correctness needs:

• Anything-goes reads, where any node can respond with its last known value. Totally available, in the CAP sense, but no guarantees of monotonicity. Etcd does this by default, and Consul terms this “stale”.
• Mostly-consistent reads, where only leaders can respond, and stale reads are occasionally allowed. This is what etcd currently terms “consistent”, and what Consul does by default.
• Consistent reads, which require a round-trip delay so the leader can confirm it is still authoritative before responding. Consul now terms this consistent.

Consul has, I believe, already implemented these changes, and written comprehensive documentation for the tradeoffs involved. Etcd is still in process, but I think they’ll get to it soon.

The etcd and Consul teams both take consistency seriously, and have been incredibly responsive to bug reports. I’m very thankful for their help in getting both systems running, and for their care in finding good tradeoffs between latency and consistency. I’m very excited to see a spate of strongly-consistent systems emerging in the last couple years, and I look forward to watching both etcd and Consul evolve. It’s a good time to be a software engineer!

In particular, I’d like to thank Xiang Li, Armon Dadgar, Evan Phoenix, Peter Bailis, and Kelly Sommers for their help in this analysis. A big thanks as well to Comcast, whose research grant made this round of Jepsen verification achievable. Y'all rock.

Next up: Elasticsearch.

# Call me maybe: RabbitMQ

RabbitMQ is a distributed message queue, and is probably the most popular open-source implementation of the AMQP messaging protocol. It supports a wealth of durability, routing, and fanout strategies, and combines excellent documentation with well-designed protocol extensions. I’d like to set all these wonderful properties aside for a few minutes, however, to talk about using your queue as a lock service. After that, we’ll explore RabbitMQ’s use as a distributed fault-tolerant queue.

## Rabbit as a lock service

While I was working on building Knossos–Jepsen’s linearizability checker–a RabbitMQ blog post made the rounds of various news aggregators. In this post, the RabbitMQ team showed how one could turn RabbitMQ into a distributed mutex or semaphore service. I thought this was a little bit suspicious, because the RabbitMQ documentation is very clear that partitions invalidate essentially all Rabbit guarantees, but let’s go with it for a minute.

RabbitMQ provides a feature whereby a crashed consumer–or one that declares it was unable to process a message–may return the message to the queue with a negative-ack. The message will then be redelivered to some other consumer later. We can use a queue containing a single message as a shared mutex–a lock that can only be held by a single consumer process at a time.

To acquire the lock, we attempt to consume the message from the queue. When we get the message, we hold the lock. When we wish to release the lock, we issue a negative-ack for the message, and Rabbit puts it back in the queue. It may be some time before another process comes to get the message from the queue, but even in the limiting case, where another process is waiting immediately, a linearizable queue guarantees the safety of this mutex. In order to successfully acquire the mutex, the mutex has to be free in the queue. In order to be free, the mutex had to have been released by another process, and if released, that process has already agreed to give up the lock–so at no point can two processes hold the mutex at the same time.

RabbitMQ, however, is not a linearizable queue.

It can’t be linearizable, because as a queue, Rabbit needs to be tolerant of client failures. Imagine that a client process crashes. That process is never going to send Rabbit a negative-ack message. Rabbit has to infer the process has crashed because it fails to respond to heartbeat messages, or because the TCP connection drops, and, when it decides the process has crashed, it re-enqueues the message for delivery to a second process.

This is still a safe mutex, because a truly crashed process can’t do anything, so it has essentially given up the lock. RabbitMQ can safely hand the message to another process, allowing that process to recover the lock.

But then an ugly thought occurs: in an asynchronous network, reliable failure detectors are really darn hard. RabbitMQ can’t tell the difference between a client that’s crashed, and one that’s simply unresponsive. Perhaps the network failed, or perhaps the node is undergoing a GC pause, or the VM hiccuped, or the thread servicing RabbitMQ crashed but the thread using the mutex is still running, etc etc etc.

When this happens, Rabbit assumes the process crashed, and sensibly re-enqueues the message for another process to consume. Now our mutex is held by two processes concurrently. It is, in short, no longer mutually exclusive.

## A demonstration

I’ve written a basic mutex client using this technique in Jepsen. We’ll invoke this client with a stream of alternating :acquire and :release operations, like so:

(deftest mutex-test (let [test (run! (assoc noop-test :name "rabbitmq-mutex" :os debian/os :db db :client (mutex) :checker (checker/compose {:html timeline/html :linear checker/linearizable}) :model (model/mutex) :nemesis (nemesis/partition-random-halves) :generator (gen/phases (->> (gen/seq (cycle [{:type :invoke :f :acquire} {:type :invoke :f :release}])) gen/each (gen/delay 180) (gen/nemesis (gen/seq (cycle [(gen/sleep 5) {:type :info :f :start} (gen/sleep 100) {:type :info :f :stop}]))) (gen/time-limit 500)))))] (is (:valid? (:results test))) (report/linearizability (:linear (:results test))))) 

We’re starting out with a basic test skeleton called noop-test, and merging in a slew of options that tell Jepsen how to set up, run, and validate the test. We’ll use the DB and client from RabbitMQ’s namespace. Then we’ll check the results using both an HTML visualization and Jepsen’s linearizability checker, powered by Knossos. Our model for this system is a simple mutex

(defrecord Mutex [locked?] Model (step [r op] (condp = (:f op) :acquire (if locked? (inconsistent "already held") (Mutex. true)) :release (if locked? (Mutex. false) (inconsistent "not held"))))) 

… which can be acquired and released, but never double-acquired or double-released. To create failures we’ll use the partition-random-halves nemesis: a special Jepsen client that cuts the network into randomly selected halves. Then we give a generator: a monadic structure that determines what operations are emitted and when.

 :generator (->> (gen/seq (cycle [{:type :invoke :f :acquire} {:type :invoke :f :release}])) gen/each (gen/delay 180) (gen/nemesis (gen/seq (cycle [(gen/sleep 5) {:type :info :f :start} (gen/sleep 100) {:type :info :f :stop}]))) (gen/time-limit 500)))))] 

We start with an infinite sequence of alternating :acquire and :release operations, wrap it in a generator using gen/seq, then scope that generator to each client independently using gen/each. We make each client wait for 180 seconds before the next operation by using gen/delay, to simulate holding the lock for some time.

Meanwhile, on the nemesis client, we cycle through four operations: sleeping for five seconds, emitting a :start operation, sleeping for 100 seconds, and emitting a :stop. This creates a random network partition that lasts a hundred seconds, resolves it for five seconds, then creates a new partition.

Finally, we limit the entire test to 500 seconds. During the test, we’ll see partition messages in the RabbitMQ logs, like

=ERROR REPORT==== 10-Apr-2014::13:16:08 === ** Node rabbit@n3 not responding ** ** Removing (timedout) connection ** =INFO REPORT==== 10-Apr-2014::13:16:29 === rabbit on node rabbit@n5 down =ERROR REPORT==== 10-Apr-2014::13:16:45 === Mnesia(rabbit@n1): ** ERROR ** mnesia_event got {inconsistent_database, running_partitioned_network, rabbit@n3}

When the test completes, Jepsen tells us that the mutex failed to linearize. The linearizable prefix is the part of the history where Knossos was able to find some valid linearization. The first line tells us that the :nemesis process reported an :info message with function :start (and no value). A few lines later, process 1 :invokes the :acquire function, concurrently with the other four processes. Most of those processes result in a :fail op, but process 1 sucessfully acquires the lock with an :ok.

Note that Knossos fills in the values for invocations with the known values for their completions, which is why some invocations–like the failed acquire attempts near the end of the history–have values “from the future”.

Not linearizable. Linearizable prefix was: :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n4 :n5) (:n1 :n3 :n2)]" :nemesis :info :stop nil :nemesis :info :stop "fully connected" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n1 :n5) (:n4 :n2 :n3)]" 1 :invoke :acquire 1 3 :invoke :acquire nil 0 :invoke :acquire nil 2 :invoke :acquire nil 4 :invoke :acquire nil 3 :fail :acquire nil 2 :fail :acquire nil 1 :ok :acquire 1 :nemesis :info :stop nil 4 :info :acquire "indeterminate: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@n2' of durable queue 'jepsen.semaphore' in vhost '/' is down or inaccessible, class-id=60, method-id=70), null, \"\"}" :nemesis :info :stop "fully connected" 0 :info :acquire "indeterminate: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@n2' of durable queue 'jepsen.semaphore' in vhost '/' is down or inaccessible, class-id=60, method-id=70), null, \"\"}" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n2 :n4) (:n1 :n5 :n3)]" :nemesis :info :stop nil :nemesis :info :stop "fully connected" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n3 :n2) (:n1 :n5 :n4)]" 3 :invoke :release :not-held 2 :invoke :release :not-held 3 :fail :release :not-held 2 :fail :release :not-held 1 :invoke :release nil 1 :ok :release nil 9 :invoke :acquire "clean connection shutdown; reason: Attempt to use closed channel" 9 :fail :acquire "clean connection shutdown; reason: Attempt to use closed channel" 5 :invoke :acquire "clean connection shutdown; reason: Attempt to use closed channel" 5 :fail :acquire "clean connection shutdown; reason: Attempt to use closed channel" :nemesis :info :stop nil :nemesis :info :stop "fully connected" :nemesis :info :start nil :nemesis :info :start "partitioned into [(:n1 :n4) (:n3 :n5 :n2)]" 3 :invoke :acquire nil 2 :invoke :acquire 1 2 :ok :acquire 1 1 :invoke :acquire 2 

Next, Jepsen tells us which operation invalidated every consistent interpretation of the history.

Followed by inconsistent operation: 1 :ok :acquire 2 

Process 2 successfully acquired the lock in the final lines of the linearizable prefix. It was OK for process 1 to try to acquire the lock, so long as that invocation didn’t go through until process 2 released it. However, process 1’s acquisition succeeded before the lock was released! Jepsen can show us the possible states of the system just prior to that moment:

Last consistent worlds were: ---------------- World from fixed history: 1 :invoke :acquire 1 1 :invoke :release nil 2 :invoke :acquire 1 and current state #jepsen.model.Mutex{:locked true} with pending operations: 3 :invoke :acquire nil 1 :invoke :acquire 2 0 :invoke :acquire nil 4 :invoke :acquire nil --------------------------------------------- 

In this world, the lock was acquired by 1, released by 1, and acquired by 2. That led to the state Mutex{:locked true}, with four outstanding acquire ops in progress. But when process 1 successfully acquired the lock, we were forced to eliminate this possibility. Why? Because

Inconsistent state transitions: ([{:locked true} "already held"]) 

The state {:locked true} couldn’t be followed by a second :acquire op, because the lock was already held.

This shouldn’t be surprising: RabbitMQ is designed to ensure message delivery, and its recovery semantics require that it deliver messages more than once. This is a good property for a queue! It’s just not the right fit for a lock service.

Thinking a little deeper, FLP and Two-Generals suggests that in the presence of a faulty process or network, the queue and the consumer can’t agree on whether or not to consume a given message. Acknowledge before processing the message, and a crash can cause data loss. Acknowledge after processing, and a crash can cause duplicate delivery. No distributed queue can offer exactly-once delivery–the best they can do is at-least-once or at-most-once.

So, the question becomes: does RabbitMQ offer at-least-once delivery in the presence of network partitions?

## Rabbit as a queue

For this test, we’ll use a different kind of client one that takes :enqueue and :dequeue operations, and applies them to a RabbitMQ queue. We’ll be using durable, triple-mirrored writes across our five-node cluster, with the publisher confirms extension enabled so we only consider messages successful once acked by RabbitMQ. Here’s the generator for this test case:

 :generator (gen/phases (->> (gen/queue) (gen/delay 1/10) (gen/nemesis (gen/seq (cycle [(gen/sleep 60) {:type :info :f :start} (gen/sleep 60) {:type :info :f :stop}]))) (gen/time-limit 360)) (gen/nemesis (gen/once {:type :info, :f :stop})) (gen/log "waiting for recovery") (gen/sleep 60) (gen/clients (gen/each (gen/once {:type :invoke :f :drain}))))))] 

This test proceeds in several distinct phases: all clients must complete a phase before moving, together, to the next. In the first phase, we generate random :enqueue and :dequeue requests for sequential integers, with a 10th of a second delay in between ops. Meanwhile, the nemesis cuts the network into random halves every sixty seconds, then repairs it for sixty seconds, and so on. This proceeds for 360 seconds in total.

In the next phase, the nemesis repairs the network. We log a message, then sleep for sixty seconds to allow the cluster to stabilize. I’ve reduced the Erlang net_tick times and timeouts to allow for faster convergence, but the same results hold with the stock configuration and longer failure durations.

Finally, each process issues a single :drain operation, which the client uses to generate a whole series of :dequeue ops, for any remaining messages in the queue. This ensures we should see every enqueued message dequeued at least once.

Rabbit has several distribution mechanisms, so I want to be specific: we’re testing clustering, not the Federation or Shovel systems. We’re aiming for the safest, most consistent settings, so let’s consult Rabbit’s documentation about distribution:

The column on the right suggests that we can choose either consistency and availability, or consistency and partition tolerance. We’ve talked about how sacrificing partition tolerance leads to terrible things happening, so let’s opt for consistency and partition tolerance. The CP link points to https://www.rabbitmq.com/partitions.html#cp-mode, which says:

In pause-minority mode RabbitMQ will automatically pause cluster nodes which determine themselves to be in a minority (i.e. fewer or equal than half the total number of nodes) after seeing other nodes go down. It therefore chooses partition tolerance over availability from the CAP theorem. This ensures that in the event of a network partition, at most the nodes in a single partition will continue to run.

CP mode sounds like the safest option, so we’ll enable pause_minority in our configuration, and expect to see failures from the minority nodes. Rabbit can’t guarantee exactly-once delivery, so it can’t really be CP, but shutting down the minority component should reduce duplicate deliveries as compared to allowing every node to respond to dequeues. Theoretically every node could accept enqueues during the partition (sacrificing the relative order of delivery), so we’ll hope to see that behavior as well.

During the partition, Jepsen shows that processes talking to the majority partition can successfully enqueue and dequeue messages, but processes talking to the minority component fail.

INFO jepsen.util - 15 :invoke :dequeue nil INFO jepsen.util - 15 :ok :dequeue 2935 INFO jepsen.util - 2551 :invoke :enqueue 3519 INFO jepsen.util - 2551 :ok :enqueue 3519 INFO jepsen.util - 2501 :invoke :dequeue nil WARN jepsen.core - Process 2501 indeterminate java.util.concurrent.ExecutionException: java.io.IOException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:202) ... Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND - home node 'rabbit@n1' of durable queue 'jepsen.queue' in vhost '/' is down or inaccessible, class-id=60, method-id=70), null, ""} 

Process 2501 has crashed: it’s uncertain whether its enqueue suceeded or not. Because the enqueued message might succeed at some later time, we log an :info operation rather than an :ok or :fail. Because processes are singlethreaded in this formalism, we abandon that client and start a new client with a unique process ID. Process 2501 will never appear again in this history–but process 2506 will replace it and carry on performing operations against that node in the cluster.

At the end of the test, Jepsen heals the cluster, waits, and drains the queue:

INFO jepsen.util - 2896 :invoke :drain nil INFO jepsen.util - 2896 :ok :dequeue 3724 INFO jepsen.util - 9 :invoke :drain nil INFO jepsen.util - 15 :invoke :drain nil INFO jepsen.util - 9 :ok :dequeue 3733 INFO jepsen.util - 15 :ok :dequeue 3730 INFO jepsen.util - 273 :invoke :drain nil INFO jepsen.util - 273 :ok :dequeue 3741 INFO jepsen.util - 2872 :invoke :drain nil INFO jepsen.util - 2872 :ok :dequeue 3746 INFO jepsen.util - 9 :ok :drain :exhausted INFO jepsen.util - 2896 :ok :drain :exhausted INFO jepsen.util - 15 :ok :drain :exhausted INFO jepsen.util - 2872 :ok :drain :exhausted INFO jepsen.util - 273 :ok :drain :exhausted INFO jepsen.core - Worker 1 done INFO jepsen.core - Worker 4 done INFO jepsen.core - Worker 0 done INFO jepsen.core - Worker 3 done INFO jepsen.core - Worker 2 done INFO jepsen.core - Run complete, writing 

We run two checkers on the resulting history. The first, :queue, asserts that every enqueue is balanced by exactly one dequeue. It fails quickly: 487 is dequeued twice in this history, after one node crashes. We expect this behavior from a robust queue.

FAIL in (rabbit-test) (rabbitmq_test.clj:77) expected: (:valid? (:results test)) actual: false {:valid? false, :queue {:valid? false, :error "can't dequeue 487"}, 

The :total-queue checker, though, is more forgiving for lossy systems. It tells us about four kinds of operations:

• OK messages are enqueued and dequeued successfully.
• Recovered messages are where we didn’t know if the enqueue was successful, but it was dequeued nonetheless.
• Unexpected messages are those which came out of the queue despite never having been enqueued. This includes cases where the same message is dequeued more than once.
• Lost messages were successfully enqueued but never came back out. This is the worst type of failure: the loss of acknowledged writes.
 :total-queue {:valid? false, :lost #{2558 3359 3616 3173 1858 2030 2372 3135 3671 3534 3358 2855 3251 3429 2615 3248 2479 1976 2430 3478 3693 2388 3174 3484 3638 2813 3280 2282 2475 3239 2973 1984 3630 2264 2523 2565 2462 3278 3425 ... lots more lines ... 3313 3413 3443 2048 3513 2705 3392 3001 2215 3097 3364 3531 2605 2411 2220 2042 1923 2314 3592 3538 3128 2801 3636 1861 3500 3143 3276 1991 3343 3656 3233 3611 3244 3717 3314 2922 3404 3708}, :unexpected #{487 497 491 510 493 490 508 504 505 502 495 506 500 496 501 498 507 494 489 492 503 509 499 488}, :recovered #{519 521 529 510 527 518 512 517 516 515 523 531 525 528 522 1398 520 524 513 509 511}, :ok-frac 786/1249, :unexpected-frac 8/1249, :lost-frac 1312/3747, :recovered-frac 7/1249}} 

The fractions all have the same denominator: attempted enqueues. 2358 of 3747 attempted enqueues were successfully dequeued–which we might expect, given the pause_minority mode shut down 2/5 nodes for a good part of the test. We also saw 24 duplicate deliveries, which, again, we expect from a distributed queue, and 28 recovered writes from indeterminate enqueues–we’ll take as many of those as we can get.

The real problem is those 1312 lost messages: RabbitMQ lost ~35% of acknowledged writes.

You guessed right, folks. When a RabbitMQ node rejoins the cluster, it wipes its local state and adopts whatever the current primary node thinks the queue should contain. Are there any constraints on which node takes precedence? As far as I can ascertain, no–you can induce arbitrary data loss by carefully manipulating the order of partitions.

This is not a theoretical problem. I know of at least two RabbitMQ deployments which have hit this in production.

## Recommendations

If you use RabbitMQ clustering, I recommend you disable automatic partition handling. pause_minority reduces availability but still allows massive data loss. autoheal improves availability but still allows for massive data loss. Rabbit advises that you use ignore only if your network is “really reliable”, but I suspect it is the only mode which offers a chance of preventing the loss of acknowledged messages.

If you use the ignore partition mode, Rabbit will allow primary replicas on both sides of a partition, and those nodes will run independently. You can’t dequeue messages written to the other node, but at least neither will overwrite the other’s state. When the network partition recovers, you can isolate one of the nodes from all clients, drain all of its messages, and enqueue them into a selected primary. Finally, restart that node and it’ll pick up the primary’s state. Repeat the process for node which was isolated, and you’ll have a single authoritative cluster again–albeit with duplicates for each copy of the message on each node.

Alternatively, you can eschew RabbitMQ clustering altogether, and connect all nodes via Federation or the Shovel: an external process that ferries messages from one cluster to another. The RabbitMQ team recommends these for unreliable links, like those between datacenters, but if you’re concerned about partitions in your local network, you might choose to deploy them in lieu of clustering. I haven’t figured out how to use either of those systems in lieu of clustering yet. The Shovel, in particular, is a single point of failure.

To the RabbitMQ team, I suggest that the autoheal and pause_minority recover by taking the union of the messages extant on both nodes, rather than blindly destroying all data on one replica. I know this may be a big change to make, given the way Rabbit uses mnesia–but I submit that duplicate delivery and reordering are almost certainly preferable to message loss.

## To recap

We used Knossos and Jepsen to prove the obvious: RabbitMQ is not a lock service. That investigation led to a discovery hinted at by the documentation: in the presence of partitions, RabbitMQ clustering will not only deliver duplicate messages, but will also drop huge volumes of acknowledged messages on the floor. This is not a new result, but it may be surprising if you haven’t read the docs closely–especially if you interpreted the phrase “chooses Consistency and Partition Tolerance” to mean, well, either of those things.

I’d like to conclude by mentioning that RabbitMQ does documentation right. Most of the distributed systems I research say nothing about failure modes–but Rabbit’s docs have an entire section devoted to partition tolerance, reliable delivery, and truly comprehensive descriptions of the various failure modes in the API and distribution layer. I wish more distributed systems shared Rabbit’s integrity and attention to detail.

This research was made possible by a generous grant from Comcast’s open-source fund, and by the invaluable assistance of Alvaro Videla from the RabbitMQ team. I’m also indebted to Michael Klishnin and Alex P for their hard work on the Langohr RabbitMQ client. Thanks for reading!

Next up: etcd and Consul.

# Computational techniques in Knossos

Earlier versions of Jepsen found glaring inconsistencies, but missed subtle ones. In particular, Jepsen was not well equipped to distinguish linearizable systems from sequentially or causally consistent ones. When people asked me to analyze systems which claimed to be linearizable, Jepsen could rule out obvious classes of behavior, like dropping writes, but couldn’t tell us much more than that. Since users and vendors are starting to rely on Jepsen as a basic check on correctness, it’s important that Jepsen be able to identify true linearization errors.

To understand why Jepsen was not a complete test of linearizability, we have to understand the structure of its original tests. Jepsen assumed, originally, that every system could be modeled as a set of integers. Each client would gradually add a sequence of integers–disjoint from all the other client sets–to the database’s set; then perform a final read. If any elements which had supposedly succeeded were missing, we know the system dropped data.

The original Jepsen tests were designed for AP systems, like Riak, without a linear order; using a set is appropriate because its contents are fundamentally unordered, and because addition to the set is associative and idempotent. To test a linearizable system, we implement set addition by performing a compare-and-set, replacing the old set with the current value plus the number being written. If a given CAS was successful, then that element should appear in the final read.

This does verify sequential consistency, and to varying degrees linearizability, but has limited power. The database may choose, for instance, to delay the visibility of changes, so long as they become visible before the final read. We can’t test operations other than a CAS. We can’t, for instance, test deletions. It’s also not clear how to verify systems like mutexes, queues, or semaphores.

Furthermore, if a test does fail, it’s not clear why. A missing number from the final set might be caused by a problem with that particular CAS–or a CAS executed hours later which happened to destroy the effects of a preceding write. Ideally, we’d like to know exactly why the system failed to linearize. With this in mind, I set out to design a linearizability checker suitable for analyzing both formal models and real software with no internal visibility.

## Knossos

In the introduction to Knossos, I couched Knossos as a model checker, motivated by a particular algorithm discussed on the Redis mailing list. This was slightly disingenuous: in fact, I designed Knossos as a model checker for any type of history, including those recorded from real databases. This means that Jepsen can generate a series of random operations, execute them against a database, and verify that the resulting history is valid with respect to some model.

Given a sequence of operations that a database might go through–say, two processes attempting to acquire a mutex:

{:process 1, :type :invoke, :f :acquire, :value nil} {:process 2, :type :invoke, :f :acquire, :value nil} {:process 1, :type :ok, :f :acquire, :value nil} {:process 2, :type :fail :f :acquire, :value "lock failed; already held"} 

… and a singlethreaded model of the system, like

(defrecord Mutex [locked?] Model (step [mutex op] (condp = (:f op) :acquire (if locked? (inconsistent "already held") (Mutex. true)) :release (if locked? (Mutex. false) (inconsistent "not held"))))) 

… Knossos can identify if the given concurrent history linearizes–that is, whether there exists some equivalent history in which every operation appears to take place atomically, in a well-defined order, between the invocation and completion times.

Linearizability, like sequential and serializable consistency, requires that every operation take place in some specific order; that there appears to be only one “true” state for the system at any given time. Therefore we can model any linearizable system as a single state, plus a function, called step, which applies an operation to that state and returns a new state.

In Clojure, we represent this model with a simple protocol, called Model, which defines a function (step current-model-state operation), and returns the new state. In our mutex example, there are four possibilities, depending on whether the operation is :acquire or :release, and whether the state locked? is true. If we try to lock an unlocked mutex, we return a new Mutex with the state true. If we try to lock a mutex which is already locked, we return a special kind of state: an inconsistent state.

Inconsistent states allow us to verify that a singlethreaded history is valid. We simply (reduce step initial-state operations); if the the result is inconsistent, we know that sequence of operations was prohibited by the model. The model formally expresses our definition of the allowable causal histories.

## The plot thickens

But we don’t have a singlethreaded history to test. We have a multithreaded history, with any number of operations in play concurrently. Each client is invoking, waiting for, and then discovering the result of its operations. Our history contains pairs of :invoke, :ok messages, when an operation succeeds, or :invoke, :fail when the operation is known to not have taken place, or :invoke, :info, when we simply don’t know what happened.

If an operation times out, or the server returns an indeterminate response, we may never find out whether the operation really took place. In the history to the right, process 5 has hung and will never recover. Its operation could take place at any time, even years into the future. In general, a hung process is concurrent with every other subsequent operation.

Given a model, we know how to test if a particular sequence of operations is valid. But in a concurrent history, the ordering is ambiguous; each operation could take place at any time between its invocation and completion. One possible interleaving might be read 1, write 1, read 2, write 2, which is obviously incorrect. On the other hand, we could evaluate write 1, read 1, write 2, read 2 instead–which is a valid history for a register. This history is linearizable–but in order to prove that fact, we have to find a particular valid order.

Imagine something like a game of hopscotch: one must land on each cell in turn, always moving from left to right, finding a path in which the model’s constraints hold. Where there are many cells at the same time, finding a path becomes especially difficult. We must consider every possible permutation of those concurrent cells, which is O(n!). That’s the kind of hopscotch that, even when played by computer, makes one re-evaluate one’s life choices.

So what do we do, presented with a huge space of possibilities?

## Exploit degeneracy

I’m a degenerate sort of person, so my first inclination is to look for symmetries in the state space. The key observation to make is that whether a given operation is valid or not depends solely on the current state of the model, not its history.

step(state, op) -> state' 

It doesn’t matter how we got to the state; if you give me two registers containing the value 2, and ask me to apply the same operation to both, we only need to check one of the registers, because the results will be equivalent!

Unlike a formal model-checker or proof assistant, Knossos doesn’t know the structure of the system it’s analyzing; it can’t perform symmetry reduction based on the definition of step. What we can do, however, is look for cases where we come back to the same state and the same future series of operations–and when that occurs, drop all but one of the cases immediately–and this turns out to be equivalent to a certain class of symmetry reduction. In particular, we can compact interchangeable orders like concurrent reads, or writes that lead to the same value, etc. We keep a cache of visited worlds and avoid exploring any that have been seen before.

## Laziness

Remember, we’re looking for any linearization, not all of them. If we can find a shortcut by not evaluating some highly-branching history, by not taking some expensive path, we can skip huge parts of the search. Like a lightning bolt feeling its way down the path of least resistance, we evaluate only those paths which seem easiest–coming back to the hard ones later. If the history is truly not linearizable, we’re forced to return to those expensive branches and check them, but if the history is valid, we can finish as soon as a single path is found.

Lazy evaluation is all about making control flow explicit instead of implicit. We use a data structure to describe where to explore next, instead of following the normal program flow. In Knossos, we represent the exploration of a particular order of operations as a world, which sits at some index along the multithreaded history. Each world carries with it a fixed history–the specific order of operations that occurred in that possible universe. The fixed history leads to a current model state. Finally, each world has a set of pending operations: operations that have been invoked, but have not yet taken effect.

For example, a world might have a fixed history of lock, unlock, lock, leading to a model state where locked is true, and a second lock attempt might be pending but not yet applied. An unlock operation could arrive and allow the pending lock to take place.

By representing the entire state of the computation as a data structure, we can write a single function that takes a world and explores it, returning a set of potential future worlds. We can explore those worlds in parallel.

## Parallelization

Because our states are immutable representations of the computation, and the function we use to explore any given state is pure and deterministic, we can trivially parallelize the exploration process. Early versions of Knossos reduced over each operation in the history, applying that operation to every outstanding world by whacking it with a parallel map.

This parallelization strategy has a serious drawback, though: by exploring the state space one index at a time, we effectively perform a breadth-first search. We want to take shortcuts through the state space; running many searches at once. We don’t just want depth-first, either; instead, we want to explore those worlds which have the lowest branching factor, because those worlds are the cheapest to explore.

So instead of exploring the history one operation at a time, we spawn lots of threads and have each consume from a priority queue of worlds, ranked by how awful those worlds are to explore. As each explorer thread discovers new consequent worlds, it inserts them back into the pool. If any thread finds a world that encompasses every operation in the history, we’ve demonstrated the history is linearizable.

We pay some cost in synchronization: queues aren’t cheap, and the java.util.concurrent.BlockingPriorityQueue has some particularly nasty contention costs for both enqueues and dequeues. Luckily, the queue will usually contain plenty of elements, so we can stripe the queue into several subqueues, each with thread affinity. Affinity for each queue reduces lock contention, which dramatically reduces the time threads spend waiting to enqueue or dequeue worlds. When a thread exhausts its local queue, it steals worlds from its neighbors.

This approach costs us some degree of memory locality: transferring records through the queue tends to push them out of the CPU cache. We can tune how far each explorer thread will take a particular world to reduce the locality cost: if work is too chunky, threads can starve awaiting worlds to explore–but if work is too fine-grained, synchronization and cache misses dominate.

## Memoization

Making control flow explicit (some might even say monadic) allows us to memoize computation as well. At RICON East, in 2013, Margo Seltzer gave a phenomenal talk on automatically parallelizing singlethreaded x86 programs. She pointed out that x86 can be thought of as a very large, very complicated, function that transforms a bit-vector of all the registers and all of memory into some subsequent state–depending on the instruction pointer, contents of registers, etc. It’s a very large value, but if you compress it and make even some modest predictions you can cache the results of computations that haven’t even happened yet, allowing the program to jump forward when it encounters a known state.

This works because parallel programs usually don’t change the entire memory space; they often read and write only a small portion of memory. for(i = 0; i < 100; i++) { arr[i]++ }, for instance, independently increments each number in arr. In that sense, the memory space is degenerate outside each particular element. That degeneracy allows speculative execution to have a chance of predicting an equivalent future state of the program: we can increment each number concurrently.

In Knossos we have a similarly degenerate state space; all fixed histories may be collapsed so long as the model and pending operations are identical. We also have a speculative and lazy execution strategy: operations are simultaneously explored at various points in the multiverse. Hence we can apply a similar memoization strategy: by caching visited worlds, we can avoid exploring equivalent paths twice.

In fact we don’t even need to store the results of the exploration, simply that we have reached that world. Think of exploring a maze with several friends, all looking for a path through. When anyone reaches a dead end, they can save time for everyone by coloring in the path they took. When someone comes to a branch in the maze, they only take the paths that nobody has colored in. We simply abort any exploration of a world equivalent to one already visited. This optimization is nondeterministic but synchronization-free, allowing memoization checks to be extremely cheap. Even though cache hitrates are typically low, each hit prunes an exponential number of descendant worlds, dramatically reducing runtimes.

## Immutability and performance

When we explore a world, we’ll typically encounter many branching paths. Given two concurrent writes a and b, we need to explore [], [a], [b], [a b], and [b a], and in turn, each of those worlds will fork into hundreds, then thousands, then millions of consequent worlds. We have to make a lot of copies.

At this point in the essay, Haskell enthusiasts are nodding their heads sagely and muttering things about Coyoneda diffeomorphisms and trendofunctors. Haskell offers excellent support for immutable data structures and parallel execution of pure functions, which would make it an ideal choice for building this kind of checker.

But I am, sadly, not a Haskell wizard. When you get right down to it, I’m more of a Clojure Sith Lord. And as it turns out, this is a type of problem that Clojure is also well-suited for. We express the consistency model as a pure function over immutable models, and use Clojure’s immutable maps, vectors, and sets to store the state of each world, its histories, its pending operations, and so on. Forking the world into distinct paths doesn’t require copying the entire state; rather, Clojure uses a reference to the original data structure, and stores a delta on top. We can fork millions of worlds cheaply.

Because worlds are immutable, we can share them freely between threads. Because the functions that explore a world, returning subsequent possible worlds, are pure, we can explore worlds on any thread, at any time, and take advantage of memoization. But in order to execute that search process in parallel, we need that priority queue of worlds-at-the-edge: a fundamentally mutable data structure. The memoizing cache is also mutable: it must be, to share state between threads. We also need some book-keeping state: how far has the algorithm explored; have we reached the end; how large is the cache.

So as a layer atop the immutable core, we make limited use of mutable structures: a striped java.util.concurrent.PriorityQueue for keeping track of which worlds are up next, a concurrent hashmap to memoize results, Clojure’s atoms for bookkeeping, and some java.util.concurrent.atomic references for primitive CAS. Because this code is wildly nondeterministic, it’s the most difficult portion of Knossos to reason about and debug–yet that nondeterminism is a critical degree of freedom for parallel execution. By broadening the space of allowable execution orders, we reduce the need for inter-core synchronization.

Reducing synchronization is especially important because while I was working on Knossos, Comcast offered me a research grant specifically for Jepsen. As one does when offered unlimited resources by a galactic empire, I thought big.

I used Comcast’s grant to build a 24-core (48 HT) Xeon with 128GB of ECC; effectively demolishing the parallelism and heap barriers that limited earlier verification efforts. Extensive profiling with Yourkit (another great supporter of open-source projects) helped reduce lock and CAS contention which limited scalability to ~4 cores; a few weeks work removed almost all thread stalls and improved performance by two orders of magnitude.

The result is that Knossos can check 5-process, 150–200-element histories in a matter of minutes, not days–and it can do it on 48 cores.

There are several optimizations I haven’t made yet; for instance, detecting crashed processes and optimistically inserting a world in which that crashed process' operation never takes place. However, Knossos at this stage is more than capable of detecting linearization errors in real-world histories.

Proud of this technological terror I’d constructed, I consulted the small Moff Tarkin that lives in my head on what database to test next. “You would prefer another target? An open-source target? Then name the distributed system!”

# Strong consistency models

Network partitions are going to happen. Switches, NICs, host hardware, operating systems, disks, virtualization layers, and language runtimes, not to mention program semantics themselves, all conspire to delay, drop, duplicate, or reorder our messages. In an uncertain world, we want our software to maintain some sense of intuitive correctness.

Well, obviously we want intuitive correctness. Do The Right Thing™! But what exactly is the right thing? How might we describe it? In this essay, we’ll take a tour of some “strong” consistency models, and see how they fit together.

## Correctness

There are many ways to express an algorithm’s abstract behavior–but just for now, let’s say that a system is comprised of a state, and some operations that transform that state. As the system runs, it moves from state to state through some history of operations.

For instance, our state might be a variable, and the operations on the state could be the writes to, and reads from, that variable. In this simple Ruby program, we write and read a variable several times, printing it to the screen to illustrate the reads.

x = "a"; puts x; puts x x = "b"; puts x x = "c" x = "d"; puts x 

We already have an intuitive model of this program’s correctness: it should print “aabd”. Why? Because each of the statements happen in order. First we write the value a, then read the value a, then read the value a, then write the value b, and so on.

Once we set a variable to some value, like a, reading it should return a, until we change the value again. Reading a variable returns the most recently written value. We call this kind of system–a variable with a single value–a register.

We’ve had this model drilled into our heads from the first day we started writing programs, so it feels like second nature–but this is not the only way variables could work. A variable could return any value for a read: a, d, or the moon. If that happened, we’d say the system was incorrect, because those operations don’t align with our model of how variables are supposed to work.

This hints at a definition of correctness for a system: given some rules which relate the operations and state, the history of operations in the system should always follow those rules. We call those rules a consistency model.

We phrased our rules for registers as simple English statements, but they could be arbitrarily complicated mathematical structures. “A read returns the value from two writes ago, plus three, except when the value is four, in which case the read may return either cat or dog” is a consistency model. As is “Every read always returns zero”. We could even say “There are no rules at all; every operation is permitted”. That’s the easiest consistency model to satisfy; every system obeys it trivially.

More formally, we say that a consistency model is the set of all allowed histories of operations. If we run a program and it goes through a sequence of operations in the allowed set, that particular execution is consistent. If the program screws up occasionally and goes through a history not in the consistency model, we say the history was inconsistent. If every possible execution falls into the allowed set, the system satisfies the model. We want real systems to satisfy “intuitively correct” consistency models, so that we can write predictable programs.

## Concurrent histories

Now imagine a concurrent program, like one written in Node.js or Erlang. There are multiple logical threads of control, which we term “processes”. If we run a concurrent program with two processes, each of which works with the same register, our earlier register invariant could be violated.

There are two processes at work here: call them “top” and “bottom”. The top process tries to write a, read, read. The bottom process, meanwhile, tries to read, write b, read. Because the program is concurrent, the operations from these two processes could interleave in more than one order–so long as the operations for a single process happen in the order that process specifies. In this particular case, top writes a, bottom reads a, top reads a, bottom writes b, top reads b, and bottom reads b.

In this light, the concept of concurrency takes on a different shape. We might imagine every program as concurrent by default–when executed, operations could happen in any order. A thread, a process–in the logical sense, anyway–is a constraint over the history: operations belonging to the same thread must take place in order. Logical threads impose a partial order over the allowed operations.

Even with that order, our register invariant–from the point of view of an individual process–no longer holds. The process on top wrote a, read a, then read b–which is not the value it wrote. We must relax our consistency model to usefully describe concurrency. Now, a process is allowed to read the most recently written value from any process, not just itself. The register becomes a place of coordination between two processes; they share state.

## Light cones

Howerver, this is not the full story: in almost every real-world system, processes are distant from each other. An uncached value in memory, for instance, is likely on a DIMM thirty centimeters away from the CPU. It takes light over a full nanosecond to travel that distance–and real memory accesses are much slower. A value on a computer in a different datacenter could be thousands of kilometers–hundreds of milliseconds–away. We just can’t send information there any faster; physics, thus far, forbids it.

This means our operations are no longer instantaneous. Some of them might be so fast as to be negligible, but in full generality, operations take time. We invoke a write of a variable; the write travels to memory, or another computer, or the moon; the memory changes state; a confirmation travels back; and then we know the operation took place.

The delay in sending messages from one place to another implies ambiguity in the history of operations. If messages travel faster or slower, they could take place in unexpected orders. Here, the bottom process invokes a read when the value is a. While the read is in flight, the top process writes b–and by happenstance, its write arrives before the read. The bottom process finally completes its read and finds b, not a.

This history violates our concurrent register consistency model. The bottom process did not read the current value at the time it invoked the read. We might try to use the completion time, rather than the invocation time, as the “true time” of the operation, but this fails by symmetry as well; if the read arrives before the write, the process would receive a when the current value is b.

In a distributed system–one in which it takes time for an operation to take place–we must relax our consistency model again; allowing these ambiguous orders to happen.

How far must we go? Must we allow all orderings? Or can we still impose some sanity on the world?

## Linearizability

On careful examination, there are some bounds on the order of events. We can’t send a message back in time, so the earliest a message could reach the source of truth is, well, instantly. An operation cannot take effect before its invocation.

Likewise, the message informing the process that its operation completed cannot travel back in time, which means that no operation may take effect after its completion.

If we assume that there is a single global state that each process talks to; if we assume that operations on that state take place atomically, without stepping on each other’s toes; then we can rule out a great many histories indeed. We know that each operation appears to take effect atomically at some point between its invocation and completion.

We call this consistency model linearizability; because although operations are concurrent, and take time, there is some place–or the appearance of a place–where every operation happens in a nice linear order.

The “single global state” doesn’t have to be a single node; nor do operations actually have to be atomic. The state could be split across many machines, or take place in multiple steps–so long as the external history, from the point of view of the processes, appears equivalent to an atomic, single point of state. Often, a linearizable system is made up of smaller coordinating processes, each of which is itself linearizable; and those processes are made up of carefully coordinated smaller processes, and so on, down to linearizable operations provided by the hardware.

Linearizability has powerful consequences. Once an operation is complete, everyone must see it–or some later state. We know this to be true because each operation must take place before its completion time, and any operation invoked subsequently must take place after the invocation–and by extension, after the original operation itself. Once we successfully write b, every subsequently invoked read must see b–or some later value, if more writes occur.

We can use the atomic constraint of linearizability to mutate state safely. We can define an operation like compare-and-set, in which we set the value of a register to a new value if, and only if, the register currently has some other value. We can use compare-and-set as the basis for mutexes, semaphores, channels, counters, lists, sets, maps, trees–all kinds of shared data structures become available. Linearizability guarantees us the safe interleaving of changes.

Moreover, linearizability’s time bounds guarantee that those changes will be visible to other participants after the operation completes. Hence, linearizability prohibits stale reads. Each read will see some current state between invocation and completion; but not a state prior to the read. It also prohibits non-monotonic reads–in which one reads a new value, then an old one.

Because of these strong constraints, linearizable systems are easier to reason about–which is why they’re chosen as the basis for many concurrent programming constructs. All variables in Javascript are (independently) linearizable; as are volatile variables in Java, atoms in Clojure, or individual processes in Erlang. Most languages have mutexes and semaphores; these are linearizable too. Strong assumptions yield strong guarantees.

But what happens if we can’t satisfy those assumptions?

## Sequential consistency

If we allow processes to skew in time, such that their operations can take effect before invocation, or after completion–but retain the constraint that operations from any given process must take place in that process' order–we get a weaker flavor of consistency: sequential consistency.

Sequential consistency allows more histories than linearizability–but it’s still a useful model: one that we use every day. When a user uploads a video to YouTube, for instance, YouTube puts that video into a queue for processing, then returns a web page for the video right away. We can’t actually watch the video at that point; the video upload takes effect a few minutes later, when it’s been fully processed. Queues remove synchronous behavior while (depending on the queue) preserving order.

Many caches also behave like sequentially consistent systems. If I write a tweet on Twitter, or post to Facebook, it takes time to percolate through layers of caching systems. Different users will see my message at different times–but each user will see my operations in order. Once seen, a post shouldn’t disappear. If I write multiple comments, they’ll become visible sequentially, not out of order.

## Causal consistency

We don’t have to enforce the order of every operation from a process. Perhaps, only causally related operations must occur in order. We might say, for instance, that all comments on a blog post must appear in the same order for everyone, and insist that any reply be visible to a process only after the post it replies to is visible. If we encode those causal relationships like “I depend on operation X” as an explicit part of each operation, the database can delay making operations visible until it has all the operation’s dependencies.

This is weaker than ordering every operation from the same process–operations from the same process with independent causal chains could execute in any relative order–but prevents many unintuitive behaviors.

## Serializable consistency

If we say that the history of operations is equivalent to one that took place in some single atomic order–but say nothing about the invocation and completion times–we obtain a consistency model known as serializability. This model is both much stronger and much weaker than you’d expect.

Serializability is weak, in the sense that it permits many types of histories, because it places no bounds on time or order. In the diagram to the right, it’s as if messages could be sent arbitrarily far into the past or future, that causal lines are allowed to cross. In a serializable database, a transaction like read x is always allowed to execute at time 0, when x had not yet been initialized. Or it might be delayed infinitely far into the future! The transaction write 2 to x could execute right now, or it could be delayed until the end of time, never appearing to occur.

For instance, in a serializable system, the program

x = 1 x = x + 1 puts x 

is allowed to print nil, 1, or 2; because the operations can take place in any order. This is a surprisingly weak constraint! Here, we assume that each line represents a single operation and that all operations succeed.

On the other hand, serializability is strong, in the sense that it prohibits large classes of histories, because it demands a linear order. The program

print x if x = 3 x = 1 if x = nil x = 2 if x = 1 x = 3 if x = 2 

can only be ordered in one way. It doesn’t happen in the same order we wrote, but it will reliably change x from nil -> 1 -> 2 -> 3, and finally print 3.

Because serializability allows arbitrary reordering of operations (so long as the order appears atomic), it is not particularly useful in real applications. Most databases which claim to provide serializability actually provide strong serializability, which has the same time bounds as linearizability. To complicate matters further, what most SQL databases term the SERIALIZABLE consistency level actually means something weaker, like repeatable read, cursor stability, or snapshot isolation.

## Consistency comes with costs

We’ve said that “weak” consistency models allow more histories than “strong” consistency models. Linearizability, for example, guarantees that operations take place between the invocation and completion times. However, imposing order requires coordination. Speaking loosely, the more histories we exclude, the more careful and communicative the participants in a system must be.

You may have heard of the CAP theorem, which states that given consistency, availability, and partition tolerance, any given system may guarantee at most two of those properties. While Eric Brewer’s CAP conjecture was phrased in these informal terms, the CAP theorem has very precise definitions:

1. Consistency means linearizability, and in particular, a linearizable register. Registers are equivalent to other systems, including sets, lists, maps, relational databases, and so on, so the theorem can be extended to cover all kinds of linearizable systems.

2. Availability means that every request to a non-failing node must complete successfully. Since network partitions are allowed to last arbitrarily long, this means that nodes cannot simply defer responding until after the partition heals.

3. Partition tolerance means that partitions can happen. Providing consistency and availability when the network is reliable is easy. Providing both when the network is not reliable is provably impossible. If your network is not perfectly reliable–and it isn’t–you cannot choose CA. This means that all practical distributed systems on commodity hardware can guarantee, at maximum, either AP or CP.

“Hang on!” you might exclaim. “Linearizability is not the end-all-be-all of consistency models! I could work around the CAP theorem by providing sequential consistency, or serializability, or snapshot isolation!”

This is true; the CAP theorem only says that we cannot build totally available linearizable systems. The problem is that we have other proofs which tell us that you cannot build totally available systems with sequential, serializable, repeatable read, snapshot isolation, or cursor stability–or any models stronger than those. In this map from Peter Bailis' HAT not CAP paper, models shaded in red cannot be fully available.

If we relax our notion of availability, such that client nodes must always talk to the same server, some types of consistency become achievable. We can provide causal consistency, PRAM, and read-your-writes consistency.

If we demand total availability, then we can provide monotonic reads, monotonic writes, read committed, monotonic atomic view, and so on. These are the consistency models provided by distributed stores like Riak and Cassandra, or ANSI SQL databases on the lower isolation settings. These consistency models don’t have linear orders like the diagrams we’ve drawn before; instead, they provide partial orders which come together in a patchwork or web. The orders are partial because they admit a broader class of histories.

## A hybrid approach

Some algorithms depend on linearizability for safety. If we want to build a distributed lock service, for instance, linearizability is required; without hard time boundaries, we could hold a lock from the future or from the past. On the other hand, many algorithms don’t need linearizability. Eventually consistent sets, lists, trees, and maps, for instance, can be safely expressed as CRDTs even in “weak” consistency models.

Stronger consistency models also tend to require more coordination–more messages back and forth–to ensure their operations occur in the correct order. Not only are they less available, but they can also impose higher latency constraints. This is why modern CPU memory models are not linearizable by default–unless you explicitly say so, modern CPUs will reorder memory operations relative to other cores, or worse. While more difficult to reason about, the performance benefits are phenomenal. Geographically distributed systems, with hundreds of milliseconds of latency between datacenters, often make similar tradeoffs.

So in practice, we use hybrid data storage, mixing databases with varying consistency models to achieve our redundancy, availability, performance, and safety objectives. “Weaker” consistency models wherever possible, for availability and performance. “Stronger” consistency models where necessary, because the algorithm being expressed demands a stricter ordering of operations. You can write huge volumes of data to S3, Riak or Cassandra, for instance, then write a pointer to that data, linearizably, to Postgres, Zookeeper or Etcd. Some databases admit multiple consistency models, like tunable isolation levels in relational databases, or Cassandra and Riak’s linearizable transactions, which can help cut down on the number of systems in play. Bottom line, though: anyone who says their consistency model is the only right choice is likely trying to sell something. You can’t have your cake and eat it too.

Armed with a more nuanced understanding of consistency models, I’d like to talk about how we go about verifying the correctness of a linearizable system. In the next Jepsen post, we’ll discuss the linearizability checker I’ve built for testing distributed systems: Knossos.

For a more formal definition of these models, try Dziuma, Fatourou, and Kanellou’s Survey on consistency conditions

# Call me maybe: Redis redux

In a recent blog post, antirez detailed a new operation in Redis: WAIT. WAIT is proposed as an enhancement to Redis' replication protocol to reduce the window of data loss in replicated Redis systems; clients can block awaiting acknowledgement of a write to a given number of nodes (or time out if the given threshold is not met). The theory here is that positive acknowledgement of a write to a majority of nodes guarantees that write will be visible in all future states of the system.

As I explained earlier, any asynchronously replicated system with primary-secondary failover allows data loss. Optional synchronous replication, antirez proposes, should make it possible for Redis to provide strong consistency for those operations.

WAIT means that if you run three nodes A, B, C where every node contains a Sentinel instance and a Redis instance, and you “WAIT 1” after every operation to reach the majority of slaves, you get a consistent system.

WAIT can be also used, by improving the failover procedure, in order to have a strong consistent system (no writes to the older master from the point the failure detection is positive, to the end of the failover when the configuration is updated, or alternative, disconnect the majority of slaves you can reach during the failure detection so that every write will fail during this time).

Antirez later qualified these claims:

I understand this not the “C” consistency of “CAP” but, before: the partition with clients and the (old) master partitioned away would receive writes that gets lost. after: under certain system models the system is consistent, like if you assume that crashed instances never start again. Of course, the existence of synchronous replication does not prove that the system is linearizable; only some types of failover preserve the ordering of writes.

As I showed in Call me maybe: Redis, Redis Sentinel will enter split-brain during network partitions, causing significant windows of data loss. Exactly how much data loss depends on the sentinel configuration and the failure topology. Antirez finally suggested that if we replace Redis Sentinel with a strongly consistent coordination service for failover, Redis WAIT could provide full linearizability.

## The failover proposal

In a five-node cluster, assume every write is followed by WAIT 2 to ensure that a majority of nodes have received the write. In the event of a failure, a strong external coordinator goes through the following election process:

1. Totally partition the old primary P1.
2. Of all reachable nodes, identify the node with the highest replication offset. Let that node be P2.
3. Promote P2.
4. Inform all reachable nodes that they are to follow P2.
5. Have all reachable clients switch to the new primary.

There are several serious problems with this design. I hinted at these issues in the mailing list with limited success. Kelly Sommers pointed out repeatedly that this design has the same issues as Cassandra’s CL.ALL. Replication alone does not ensure linearizability; we have to be able to roll back operations which should not have happened in the first place. If those failed operations can make it into our consistent timeline in an unsafe way, perhaps corrupting our successful operations, we can lose data.

… surprisingly I think that transactional rollbacks are totally irrelevant.

Ultimately I was hoping that antirez and other contributors might realize why their proposal for a custom replication protocol was unsafe nine months ago, and abandon it in favor of an established algorithm with a formal model and a peer-reviewed proof, but that hasn’t happened yet. Redis continues to accrete homegrown consensus and replication algorithms without even a cursory nod to formal analysis.

OK, fine. Let’s talk about the failover coordinator.

## The coordinator

Redis Sentinel is not linearizable; nor are its proposed improvements. Whatever failover system you’re planning to use here is going to need something stronger. In fact, we can’t even guarantee safety using a strong coordination service like ZooKeeper to serialize the failover operations, because ZooKeeper cannot guarantee the mutual exclusion of two services in the presence of message delays and clock skews. Let’s paper over that issue by introducing large delays and carefully ordering our timeouts.

It gets worse. Even if we did have a perfect mutex around the coordinator, two coordinators could issue messages to the same Redis nodes which arrive out of order. TCP does not guarantee ordering between two distinct TCP streams, which means we might see coordinator A initiate a failover process then time out halfway; followed by coordinator B which begins the failover process, only to be interrupted on some nodes by messages en-route through the network from coordinator A. Don’t believe me? TCP message delays have been reported in excess of ninety seconds. That one took out Github.

It gets even worse. If the original primary P1 is isolated from the coordinator, the coordinator will not be able to force P1 to step down. Indeed, P1 could remain a primary for the entire duration of a failover, accepting writes, making state changes, and attempting to replicate those changes to other nodes. This is dangerous because we cannot atomically guarantee that the new majority of nodes will reject those writes.

1. A client writes to P1, which replicates to secondaries S2, S3, S4, and S5.
2. The coordinator attempts to elect a new primary, and sees S2, S3, S4, and S5.
3. Without loss of generality, assume S2 has the highest replication offset. The coordinator promotes S2 to P2.
4. P1 receives acks from S3, S4, and S5, and, having reached a majority, returns success to the client.
5. The coordinator reparents S3, S4, and S5 to P2, destroying the write.

You might try to solve this by forcing S2–S5 into a read-only, non-replicating mode before attempting to promote a new primary, but that gets into a whole other morass of issues around multiple state transitions and partial failures. Suffice it to say: it’s difficult to solve this by simply pausing nodes first. Maybe impossible? I’m not sure.

Typically, replication protocols solve this problem by guaranteeing that writes from S1 can not be accepted after S2–S5 acknowledge to the coordinator that they will participate in a new cohort. This often takes the form of a ballot (Paxos), epoch (ZAB, Viewstamped Replication), or term (RAFT). Redis has no such construct, and antirez seems to eschew it as unnecessary:

In this model, it is possible to reach linearizability? I believe, yes, because we removed all the hard part, for which the strong protocols like Raft use epochs.

This brings us to a different, but related series of problems.

## The servers

By using the offset in the replication log as the determining factor in which nodes are promotable, the proposed failover design opens the door for significant data loss.

Imagine the following sequence:

1. The primary P1, with log offset O1, becomes isolated from S3, S4, and S5.
2. Clients writing to P1 see their operations using WAIT 2 fail.
3. S3 is promoted to P3, with offset O1=O3. Clients writing to P3 see their writes succeed, replicated to S4 and S5.
4. More operations occur on P1 than on P3. O1 becomes greater than O3.
5. The partition heals; the coordinator can see both P1 and P3.
6. The coordinator sees that O1 is higher than O3, and chooses P1 as the new primary.
7. P3 is demoted, and all its acknowledged writes are destroyed.

Don’t believe me? Here, let’s try it. Here’s a function which implements (more or less) the proposed coordinator algorithm. Note that we’re not demoting the original primary because it may not be reachable.

(defn elect! "Forces an election among the given nodes. Picks the node with the highest replication offset, promotes it, and re-parents the secondaries." [nodes] (let [highest (highest-node nodes)] (log "Promoting" highest) (with-node highest (redis/slaveof "no" "one")) (doseq [node (remove #{highest} nodes)] (log "Reparenting" node "to" highest) (with-node node (redis/slaveof highest 6379))))) 

And in the test, we’ll use WAIT to ensure that only writes which are successfully replicated to 2 or more replicas are considered successful:

 (add [app element] (try (redis/with-conn pool spec (redis/sadd key element)) ; Block for 2 secondaries (3 total) to ack. (let [acks (redis/with-conn pool spec (taoensso.carmine.protocol/send-request! "WAIT" 2 1000))] (if (< acks 2) (do (log "not enough copies: " acks) error) ok)) (catch Exception e (if (->> e .getMessage (re-find #"^READONLY")) error (throw e)))) 

I’m gonna punt on informing clients which node is the current primary; we’ll just issue set-add requests to each node independently. Jepsen only cares about whether successful writes are lost, so we’ll let those writes fail and log ‘em as unsuccessful.

Initially, the offset for all 5 nodes is 15. Writes complete successfully on P1 and fail on S2–S5.

We cut off P1 and S2 from S3, S4, and S5. S3, S4, and S5 all have equal offsets (1570), so we promote S3 to P3. As soon as the partition takes effect, writes to P1 begin to fail–we see not enough copies: 1, and an :error status for write 110, 115, and so on. Latencies on P1 jump to 1 second, since that’s how long we’re blocking for using WAIT.

Writes complete successfully on P3, since it can see a majority of nodes: itself, S4, and S5. We heal the partition and initiate a second election. Since P1’s offset (8010) is higher than P3’s (6487), we preserve P1 as a primary and demote all other nodes to follow it. All P3’s writes accepted during the partition are silently destroyed.

Note that there’s actually a window here where writes can successfully take place on either of P1 or P2 in a mixed sequence, depending on the order in which the secondaries are reparented. Both 560 and 562 complete successfully, even though 562 was written to S3, which was demoted at that point in time. Some weird opportunity for timing anomalies there.

These results are catastrophic. In a partition which lasted for roughly 45% of the test, 45% of acknowledged writes were thrown away. To add insult to injury, Redis preserved all the failed writes in place of the successful ones.

Two bugs amplify this problem. Note that this is the unstable branch, so this isn’t a huge deal right now:

First, Redis secondaries return -1 for their offset when they detect the primary is down. Returning a special status code makes sense… but not if you’re using the offset to determine which nodes become the primary. This could cause the highest nodes to appear the lowest, and vice versa. If a fresh node has offset 0, and all other nodes return offset -1, this could cause a cluster to erase all data ever written to it.

Second, Redis resets the replication offset to zero every time a node is promoted. Again, a reasonable choice in isolation, but it actually maximizes the chances that this particular failure mode will occur. The current design is biased towards data loss.

Even if these bugs were corrected, the problem could still occur. All that’s required is for more operations to happen on P1 than P3 after the two diverge.

## Going forward

Distributed systems design is really hard, but engineers continue to assume otherwise:

However I think that distributed systems are not super hard, like kernel programming is not super hard, like C system programming is not super hard. Everything new or that you don’t do in a daily basis seems super hard, but it is actually different concepts that are definitely things everybody here in this list can master.

For sure a few months of exposure will not make you able to provide work like Raft or Paxos, but the basics can be used in order to try to design practical systems, that can be improved over time.

I assert just the opposite: we need formal theory, written proofs, computer verification, and experimental demonstration that our systems make the tradeoffs we think they make. Throughout the Redis criticism thread and discussion on Twitter, I see engineers assuming that they understand the tradeoffs despite the presence of gaping holes in the system’s safety net.

This behavior endangers users.

These list threads and blog posts are the sources that users come to, years later, to understand the safety properties of our systems. They’ll read our conjectures and idle thoughts and tease out some gestalt, and use that to build their systems on top of ours. They’ll miss subtle differences in phrasing and they won’t read every reply. Most won’t do any reading at all; they’re not even aware that these problems could exist.

Engineers routinely characterize Redis’s reliability as “rock solid”.

This is part of why I engage in these discussions so vocally. As systems engineers, we continually struggle to erase the assumption of safety before that assumption causes data loss or downtime. We need to clearly document system behaviors so that users can make the right choices.

We must understand our systems in order to explain them–and distributed systems are hard to understand. That’s why it’s so important that we rely on formal models, on proofs, instead of inventing our own consensus protocols–because much of the hard work of understanding has been done already. We can build on that work. Implementing a peer-reviewed paper is vastly simpler than trying to design and verify an algorithm from scratch–or worse, evolving one piecemeal, comprised of systems which encode subtly different assumptions about their responsibilities to the world. Those designs lead to small gaps which, viewed from the right angle, become big enough to drive a truck through.

I wholeheartedly encourage antirez, myself, and every other distributed systems engineer: keep writing code, building features, solving problems–but please, please, use existing algorithms, or learn how to write a proof.

# Call me maybe: Strangeloop Hangout

Since the Strangeloop talks won’t be available for a few months, I recorded a new version of the talk as a Google Hangout.

# Call me maybe: Cassandra

Previously on Jepsen, we learned about Kafka’s proposed replication design.

Cassandra is a Dynamo system; like Riak, it divides a hash ring into a several chunks, and keeps N replicas of each chunk on different nodes. It uses tunable quorums, hinted handoff, and active anti-entropy to keep replicas up to date. Unlike the Dynamo paper and some of its peers, Cassandra eschews vector clocks in favor of a pure last-write-wins approach.

## Some Write Loses

If you read the Riak article, you might be freaking out at this point. In Riak, last-write-wins resulted in dropping 30-70% of writes, even with the strongest consistency settings (R=W=PR=PW=ALL), even with a perfect lock service ensuring writes did not occur simultaneously. To understand why, I’d like to briefly review the problem with last-write-wins in asynchronous networks.

In this causality diagram, two clients (far left and far right) add the elements “a”, “b”, and “c” to a set stored in an LWW register (middle line). The left client adds a, which is read by both clients. One client adds b, constructing the set [a b]. The other adds c, constructing the set [a c]. Both write their values back. Because the register is last-write-wins, it preserves whichever arrives with the highest timestamp. In this case, it’s as if the write from the client on the left never even happened. However, it could just as easily have discarded the write from the right-hand client. Without a strong external coordinator, there’s just no way to tell whose data will be preserved, and whose will be thrown away.

Again: in an LWW register, the only conditions under which you can guarantee your write will not be silently ignored are when the register’s value is immutable. If you never change the value, it doesn’t matter which copy you preserve.

Vector clocks avoid this problem by identifying conflicting writes, and allowing you to merge them together.

Because there’s no well-defined order for potential conflicts, the merge function needs to be associative, commutative, and idempotent. If it satisfies those three properties (in essence, if you can merge any values in any order and get the same result), the system forms a semilattice known as a CRDT, and you recover a type of order-free consistency known as lattice consistency. Last-write-wins is a particular type of CRDT–albeit, not a particularly good one, because it destroys information nondeterministically.

Early in Cassandra’s history, Cassandra chose not to implement vector clocks for performance reasons. Vclocks (typically) require a read before each write. By using last-write-wins in all cases, and ignoring the causality graph, Cassandra can cut the number of round trips required for a write from 2 to 1, and obtain a significant speedup. The downside is that there is no safe way to modify a Cassandra cell.

Some people claim you can serialize updates to a cell by perfectly synchronizing your clocks, using ConsistencyLevel.QUORUM or ALL, and using an external lock service to prevent simultaneous operations. Heck, the official Cassandra documentation even claims this:

As we’ll see throughout this post, the Cassandra documentation can be less than accurate. Here’s a Jepsen test which mutates the same cell repeatedly, using perfectly synchronized clocks, QUORUM consistency, and a perfect lock service:

lein run lock cassandra ... Writes completed in 200.036 seconds 2000 total 1009 acknowledged 724 survivors 285 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 1 3 6 8 11 13 ... 1986 1988 1991 1993 1996 1998 0.5045 ack rate 0.2824579 loss rate 0.0 unacknowledged but successful rate Losing 28% of your supposedly committed data is not serializable by any definition. Next question. ## CQL and CRDTs Without vector clocks, Cassandra can’t safely change a cell–but writing immutable data is safe. Consequently, Cassandra has evolved around those constraints, allowing you to efficiently journal thousands of cells to a single row, and to retrieve them in sorted order. Instead of modifying a cell, you write each distinct change to its own UUID-keyed cell. Then, at read time, you read all the cells back and apply a merge function to obtain a result. Cassandra’s query language, CQL, provides some collection-oriented data structures around this model: sets, lists, maps, and so forth. They’re CRDTs, though the semantics don’t align with what you’ll find in the INRIA paper–no G-sets, 2P-sets, OR-sets, etc. However, some operations are safe–for instance, adding elements to a CQL set: 0 unrecoverable timeouts Collecting results. Writes completed in 200.036 seconds 2000 total 2000 acknowledged 2000 survivors All 2000 writes succeeded. :-D That’s terrific! This is the same behavior we saw with G-sets in Riak. However, not all CQL collection operations are intuitively correct. In particular, I’d be wary of the index-based operations for lists, updating elements in a map, and any type of deletions. Deletes are implemented by writing special tombstone cells, which declare a range of other cells to be ignored. Because Cassandra doesn’t use techniques like OR-sets, you can potentially delete records that haven’t been seen yet–even delete writes from the future. Cassandra users jokingly refer to this behavior as “doomstones”. The important thing to remember is that because there are no ordering constraints on writes, one’s merge function must still be associative and commutative. Just as we saw with Riak, AP systems require you to reason about order-free data structures. In fact, Cassandra and Riak are (almost) formally equivalent in their consistency semantics–the primary differences are in the granularity of updates, in garbage collection/history compaction, and in performance. Bottom line: CQL collections are a great idea, and you should use them! Read the specs carefully to figure out whether CQL operations meet your needs, and if they don’t, you can always write your own CRDTs on top of wide rows yourself. ## Counters If you’re familiar with CRDTs, you might be wondering whether Cassandra’s counter type is a PN-counter–a commutative, monotonic data structure which can be incremented and decremented in an eventually consistent way. The answer is no: Cassandra (via Twitter, politics, etc), wound up with a less safe type of data structure. Consequently, Cassandra counters will over- or under-count by a wide range during a network partition. If partitioned for about half of the test run, I found counters could drift by up to 50% of the expected value. Here’s a relatively well-behaved run, drifting by less than a percent. 10000 total 9700 acknowledged 9921 survivors ## Isolation In Coming up in Cassandra 1.1: Row Level Isolation, and Atomic batches in Cassandra 1.2, DataStax asserts that a write which updates multiple keys in the same row will be atomic and isolated. Cassandra 1.1 guarantees that if you update both the login and the password in the same update (for the same row key) then no concurrent read may see only a partial update. Full row-level isolation is now in place so that writes to a row are isolated to the client performing the write and are not visible to any other user until they are complete. From a transactional ACID (atomic, consistent, isolated, durable) standpoint, this enhancement now gives Cassandra transactional AID support. We know what “atomic” means: either all of the changes in the transaction complete, or none of them do. But what does “isolated” mean? Isolated in the sense of ACID? Let’s ask Hacker News what they think Cassandra’s isolation provides: Peter Bailis pointed me at two really excellent papers on isolation and consistency, including Berenson et al’s “A Critique of ANSI SQL Isolation Levels”–I really recommend digging into them if you’re curious about this problem. Isolation comes in many flavors, or strengths, depending on what sorts of causal histories are allowed. Serializability is one of the strongest: all transactions appear to occur in a single well-defined non-interleaved order. Cursor Stability (CS) and Snapshot Isolation (SI) are somewhat weaker. ANSI SQL defines four levels of isolation, which really have more to do with the historical behavior of various database systems than with behavior that any sane person would consider distinguishible, so I’m not going to get into the details–but suffice it to say that there are a range of phenomena which are prohibited by those isolation levels. In order from least to most awful: • P4: Lost Update • P3: Phantom • P2: Fuzzy read • P1: Dirty read • P0: Dirty write ANSI SQL’s SERIALIZABLE level prohibits P3-P0; REPEATABLE READ prohibits P2 and below, READ COMMITTED prohibits P1 and below, and READ UNCOMMITTED only prohibits P0. P0, or “dirty write” is especially important because all isolation levels must prohibit it. In P0, one transaction modifies some data; then a second transaction also modifies that data, before the first transaction commits. We never want writes from two different transactions to be mixed together, because it might violate integrity relationships which each transaction held independently. For instance, we might write [x=1, y=1] in one transaction, and [x=2, y=2] in a different transaction, assuming that x will always be equal to y. P0 allows those transactions to result in [x=1, y=2], or [x=2, y=1]. Cassandra allows P0. The key thing to remember here is that in Cassandara, the order of writes is completely irrelevant. Any write made to the cluster could eventually wind up winning, if it has a higher timestamp. But–what happens if Cassandra sees two copies of a cell with the same timestamp? It picks the lexicographically bigger value. That means that if the values written to two distinct cells don’t have the same sort order (which is likely), Cassandra could pick final cell values from different transactions. For instance, we might write [1 -1] and [2 -2]. 2 is greater than 1, so the first cell will be 2. But -1 is bigger than -2, so -1 wins in the second cell. The result? [2 -1]. “But,” you might protest, “In order for that to happen, you’d need two timestamps to collide. It’s really unlikely that two writes will get the same microsecond-resolution timestamp, right? I’ve never seen it happen in my cluster.” Well, it depends. If we assume N writes per second by Poisson processes to the same row, the probability of any given read seeing a conflicting value grows as the writes come closer together. rate probability of conflict/read ------------------------------------ 1 1.31E-7 10 5.74E-6 100 5.30E-5 1000 5.09E-4 10000 0.00504 100000 0.0492 1000000 0.417  So if you do 100,000 writes/sec, on any given read you’ve got a 5% chance of seeing corrupt data. If you do 10 writes/sec and 1 read/sec, in each day you’ve got about a 1/3 chance of seeing corrupt data in any given day. What if you write many rows over time–maybe 2 writes to each row, separated by a mean delta of 100 milliseconds? Then the theoretical probability of any given row being corrupt is about 5 × 10-6. That’s a pretty small probability–and remember, most applications can tolerate some small degree of corrupt data. Let’s confirm it with an experiment: 10000 total 9899 acknowledged 9942 survivors 58 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 127 253 277 339 423 434 ... 8112 8297 8650 8973 9096 9504 101 unacknowledged writes found! ヽ(´ー｀)ノ 1059 1102 1139 1142 1143 1158 ... 2701 2720 2721 2800 2815 2860 0.9899 ack rate 0.0058591776 loss rate 0.01020305 unacknowledged but successful rate Note that “writes lost” here means corrupted rows: entirely missing rows are treated as successes. Roughly 1 in 200 rows were corrupt! That’s way worse than 10-6! What gives? It turns out that somewhere in this maze of software, either Cassandra, the DataStax Java driver, or Cassaforte is taking the current time in milliseconds and tacking on three zeroes to the end, calling it good. The probability of millisecond conflicts is significantly higher than microsecond conflicts, which is why we saw so much corrupt data. Long story short, Cassandra row isolation is probabilistic at best; and remember, the only reason you actually want isolation is because you plan on doing two operations at the same time. If you rely on isolation, in any sense of the word, in Cassandra, you need to consider your tolerance for data corruption, and verify that you’re actually generating timestamps with the expected distribution. A strong external coordinator which guarantees unique timestamps might be of use. ## Lightweight Transactions In Cassandra 2.0.0, Lightweight Transactions offer linearizable consistency for compare-and-set operations. The implementation is based on naive Paxos–requiring four round trips for each write–but the performance can be improved with time. The important thing is that Cassandra is first to have a distributed linearizable data store, or something. That said, sometimes you really do need linearizable operations. That’s why we added lightweight transactions in Cassandra 2.0 This is a sign of Cassandra maturing — Cassandra 1.0 (released October 2011) was the fulfilment of its designers original vision; Cassandra 2.0 takes it in new directions to make it even more powerful. Open source has had the reputation of producing good imitations, but not innovation. Perhaps Cassandra’s origins as a hybrid of Dynamo and Bigtable did not disprove this, but Apache Cassandra’s development of lightweight transactions and CQL are true industry firsts. The first thing you’ll notice if you try to test the new transaction system is that the Java driver doesn’t support it. It’ll throw some weird exceptions like “unknown consistency level SERIAL”, because it doesn’t support the v2 native Cassandra protocol yet. So you’ll need to use the Python Thrift client, or, in my case, get a patched client from DataStax. The second thing you’ll notice is deadlocks. In my Jepsen tests, the cluster would go unresponsive after the first 10 or so transactions–and it would never recover. Any further attempts to modify a cell via transaction would spin endlessly in failed transactions, until I manually truncated the system.paxos table. You can’t make this shit up. So you confer with DataStax for a while, and they manage to reproduce and fix the bug: #6029 (Lightweight transactions race render primary key useless), and #5985 (Paxos replay of in progress update is incorrect). You start building patched versions of Cassandra. git checkout paxos-fixed-hopefully Let’s give it a whirl. In this transaction test, we perform repeated compare-and-set operations against a single cell, retrying failed attempts for up to 10 seconds. The first thing you’ll notice is that those four round-trips aren’t exactly lightweight, which means that at 50 transactions/sec, the majority of transaction attempts time out: But we’re less concerned with performance or availability than safety. Let’s slow down the test to 5 transactions/sec to reduce contention, and check: are lightweight transactions actually linearizable? 2000 total 829 acknowledged 827 survivors 3 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ (102 1628 1988) 1 unacknowledged writes found! ヽ(´ー｀)ノ (283) 0.4145 ack rate 0.0036188178 loss rate 0.0012062726 unacknowledged but successful rate No. Cassandra lightweight transactions are not even close to correct. Depending on throughput, they may drop anywhere from 1-5% of acknowledged writes–and this doesn’t even require a network partition to demonstrate. It’s just a broken implementation of Paxos. In addition to the deadlock bug, these Jepsen tests revealed #6012 (Cassandra may accept multiple proposals for a single Paxos round) and #6013 (unnecessarily high false negative probabilities). Paxos is notoriously difficult to implement correctly. The Chubby authors note: Our tests start in safety mode and inject random failures into the system. After running for a predetermined period of time, we stop injecting failures and give the system time to fully recover. Then we switch the test to liveness mode. The purpose for the liveness test is to verify that the system does not deadlock after a sequence of failures. This test proved useful in finding various subtle protocol errors, including errors in our group membership implementation, and our modifications to deal with corrupted disks…. We found additional bugs, some of which took weeks of simulated execution time (at extremely high failure rates) to find. Our hooks can be used to crash a replica, disconnect it from other replicas for a period of time or force a replica to pretend that it is no longer the master. This test found five subtle bugs in Chubby related to master failover in its first two weeks. And in particular, I want to emphasize: By their very nature, fault-tolerant systems try to mask problems. Thus they can mask bugs or configuration problems while insidiously lowering their own fault-tolerance. The bugs I found were low-hanging fruit: anyone who ran a few hundred simple transactions could reproduce them, even without causing a single node or network failure. Why didn’t DataStax catch this in the release process? Why publish glowing blog posts and smug retrospectives if the most fundamental safety properties of the application haven’t been trivially verified? And if I hadn’t reported these bugs, how many users do you suppose would have been subject to silent data loss or corruption in prod? I can’t say this strongly enough: One way or another, software is always tested: either by the maintainers, by users, or by applications in production. One of my goals in this series is to push database vendors to test their software prior to release, so that we can all enjoy safer, faster systems. If you’re writing a database, please try to verify its correctness experimentally. You don’t need to do a perfect job–testing is tough!–but a little effort can catch 90% of the bugs. ## Final thoughts DataStax and the open-source community around Cassandra have been working hard on the AP storage problem for several years, and it shows. Cassandra runs on thousand-node clusters and accepts phenomenal write volume. It’s extraordinarily suited for high-throughput capture of immutable or otherwise log-oriented data, and its AAE and tunable durability features work well. It is, in short, a capable AP datastore, and though I haven’t deployed it personally, many engineers I respect recommend it from their production experience wholeheartedly. Jonathan Ellis, Aleksey Yeschenko‎, and Patrick McFadin were all very helpful in helping me understand Cassandra’s model, and I hope that I have depicted it accurately here. Any errors are mine alone. I’m especially thankful that they volunteered so much of their time on nights and weekends to help someone tear apart their hard work, and that they’ve fixed the bugs I’ve found so quickly. Reproducing and fixing distributed systems bugs is an especially challenging task, and it speaks to the skill of the entire Cassandra team. DataStax has adapted some of these Jepsen tests for use in their internal testing process, and, like Basho, may use Jepsen directly to help test future releases. I’m optimistic that they’ll notify users that the transactional features are unsafe in the current release, and clarify their documentation and marketing. Again, there’s nothing technically wrong with many of the behaviors I’ve discussed above–they’re simply subtle, and deserve clear exposition so that users can interpret them correctly. I’m looking forward to watching a good database improve. # Call me maybe: Kafka In the last Jepsen post, we learned about NuoDB. Now it’s time to switch gears and discuss Kafka. Up next: Cassandra. Kafka is a messaging system which provides an immutable, linearizable, sharded log of messages. Throughput and storage capacity scale linearly with nodes, and thanks to some impressive engineering tricks, Kafka can push astonishingly high volume through each node; often saturating disk, network, or both. Consumers use Zookeeper to coordinate their reads over the message log, providing efficient at-least-once delivery–and some other nice properties, like replayability. In the upcoming 0.8 release, Kafka is introducing a new feature: replication. Replication enhances the durability and availability of Kafka by duplicating each shard’s data across multiple nodes. In this post, we’ll explore how Kafka’s proposed replication system works, and see a new type of failure. Here’s a slide from Jun Rao’s overview of the replication architecture. In the context of the CAP theorem, Kafka claims to provide both serializability and availability by sacrificing partition tolerance. Kafka can do this because LinkedIn’s brokers run in a datacenter, where partitions are rare. Note that the claimed behavior isn’t impossible: Kafka could be a CP system, providing “bytewise identical replicas” and remaining available whenever, say, a majority of nodes are connected. It just can’t be fully available if a partition occurs. On the other hand, we saw that NuoDB, in purporting to refute the CAP theorem, actually sacrificed availability. What happens to Kafka during a network partition? ## Design Kafka’s replication design uses leaders, elected via Zookeeper. Each shard has a single leader. The leader maintains a set of in-sync-replicas: all the nodes which are up-to-date with the leader’s log, and actively acknowledging new writes. Every write goes through the leader and is propagated to every node in the In Sync Replica set, or ISR. Once all nodes in the ISR have acknowledged the request, the leader considers it committed, and can ack to the client. When a node fails, the leader detects that writes have timed out, and removes that node from the ISR in Zookeeper. Remaining writes only have to be acknowledged by the healthy nodes still in the ISR, so we can tolerate a few failing or inaccessible nodes safely. So far, so good; this is about what you’d expect from a synchronous replication design. But then there’s this claim from the replication blog posts and wiki: “with f nodes, Kafka can tolerate f-1 failures”. This is of note because most CP systems only claim tolerance to n/2-1 failures; e.g. a majority of nodes must be connected and healthy in order to continue. Linkedin says that majority quorums are not reliable enough, in their operational experience, and that tolerating the loss of all but one node is an important aspect of the design. Kafka attains this goal by allowing the ISR to shrink to just one node: the leader itself. In this state, the leader is acknowledging writes which have been only been persisted locally. What happens if the leader then loses its Zookeeper claim? The system cannot safely continue–but the show must go on. In this case, Kafka holds a new election and promotes any remaining node–which could be arbitrarily far behind the original leader. That node begins accepting requests and replicating them to the new ISR. When the original leader comes back online, we have a conflict. The old leader is identical with the new up until some point, after which they diverge. Two possibilities come to mind: we could preserve both writes, perhaps appending the old leader’s writes to the new–but this would violate the linear ordering property Kafka aims to preserve. Another option is to drop the old leader’s conflicting writes altogether. This means destroying committed data. In order to see this failure mode, two things have to happen: 1. The ISR must shrink such that some node (the new leader) is no longer in the ISR. 2. All nodes in the ISR must lose their Zookeeper connection. For instance, a lossy NIC which drops some packets but not others might isolate a leader from its Kafka followers, but break the Zookeeper connection slightly later. Or the leader could be partitioned from the other kafka nodes by a network failure, and then crash, lose power, or be restarted by an administrator. Or there could be correlated failures across multiple nodes, though this is less likely. In short, two well-timed failures (or, depending on how you look at it, one complex failure) on a single node can cause the loss of arbitrary writes in the proposed replication system. I want to rephrase this, because it’s a bit tricky to understand. In the causality diagram to the right, the three vertical lines represent three distinct nodes, and time flows downwards. Initially, the Leader (L) can replicate requests to its followers in the ISR. Then a partition occurs, and writes time out. The leader detects the failure and removes nodes 2 and 3 from the ISR, then acknowledges some log entries written only to itself. When the leader loses its Zookeeper connection, the middle node becomes the new leader. What data does it have? We can trace its line upwards in time to see that it only knows about the very first write made. All other writes on the original leader are causally disconnected from the new leader. This is the reason data is lost: the causal invariant between leaders is violated by electing a new node once the ISR is empty. I suspected this problem existed from reading the JIRA ticket, but after talking it through with Jay Kreps I wasn’t convinced I understood the system correctly. Time for an experiment! ## Results First, I should mention that Kafka has some parameters that control write consistency. The default behaves like MongoDB: writes are not replicated prior to acknowledgement, which allows for higher throughput at the cost of safety. In this test, we’ll be running in synchronous mode: (producer/producer {"metadata.broker.list" (str (:host opts) ":9092") "request.required.acks" "-1" ; all in-sync brokers "producer.type" "sync" "message.send.max_retries" "1" "connect.timeout.ms" "1000" "retry.backoff.ms" "1000" "serializer.class" "kafka.serializer.DefaultEncoder" "partitioner.class" "kafka.producer.DefaultPartitioner"})  With that out of the way, our writes should be fully acknowledged by the ISR once the client returns from a write operation successfully. We’ll enqueue a series of integers into the Kafka cluster, then isolate a leader using iptables from the other Kafka nodes. Latencies spike initially, while the leader waits for the missing nodes to respond. A few requests may fail, but the ISR shrinks in a few seconds and writes begin to succeed again. We’ll allow that leader to acknowledge writes independently, for a time. While these writes look fine, they’re actually only durable on a single node–and could be lost if a leader election occurs. Then we totally partition the leader. ZK detects the leader’s disconnection and the remaining nodes will promote a new leader, causing data loss. Again, a brief latency spike: At the end of the run, Kafka typically acknowledges 98–100% of writes. However, half of those writes (all those made during the partition) are lost. Writes completed in 100.023 seconds 1000 total 987 acknowledged 468 survivors 520 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 130 131 132 133 134 135 ... 644 645 646 647 648 649 1 unacknowledged writes found! ヽ(´ー｀)ノ (126) 0.987 ack rate 0.52684903 loss rate 0.0010131713 unacknowledged but successful rate ## Discussion Kafka’s replication claimed to be CA, but in the presence of a partition, threw away an arbitrarily large volume of committed writes. It claimed tolerance to F-1 failures, but a single node could cause catastrophe. How could we improve the algorithm? All redundant systems have a breaking point. If you lose all N nodes in a system which writes to N nodes synchronously, it’ll lose data. If you lose 1 node in a system which writes to 1 node synchronously, that’ll lose data too. There’s a tradeoff to be made between how many nodes are required for a write, and the number of faults which cause data loss. That’s why many systems offer per-request settings for durability. But what choice is optimal, in general? If we wanted to preserve the all-nodes-in-the-ISR model, could we constrain the ISR in a way which is most highly available? It turns out there is a maximally available number. From Peleg and Wool’s overview paper on quorum consensus: It is shown that in a complete network the optimal availability quorum system is the majority (Maj) coterie if p < ½. In particular, given uniformly distributed element failure probabilities smaller than ½ (which realistically describes most homogenous clusters), the worst quorum systems are the Single coterie (one failure causes unavailability), and the best quorum system is the simple Majority (provided the cohort size is small). Because Kafka keeps only a small number (on the order of 1-10) replicas, Majority quorums are provably optimal in their availability characteristics. You can reason about this from extreme cases: if we allow the ISR to shrink to 1 node, the probability of a single additional failure causing data loss is high. If we require the ISR include all nodes, any node failure will make the system unavailable for writes. If we assume failures are partially independent, the probability of two failures goes like 1 - (1-p)2, which is much smaller than p. This superlinear failure probability at both ends is why bounding the ISR size in the middle has the lowest probability of failure. I made two recommendations to the Kafka team: 1. Ensure that the ISR never goes below N/2 nodes. This reduces the probability of a single node failure causing the loss of commited writes. 2. In the event that the ISR becomes empty, block and sound an alarm instead of silently dropping data. It’s OK to make this configurable, but as an administrator, you probably want to be aware when a datastore is about to violate one of its constraints–and make the decision yourself. It might be better to wait until an old leader can be recovered. Or perhaps the administrator would like a dump of the to-be-dropped writes which could be merged back into the new state of the cluster. Finally, remember that this is pre-release software; we’re discussing a candidate design, not a finished product. Jay Kreps and I discussed the possibility of a “stronger safety” mode which does bound the ISR and halts when it becomes empty–if that mode makes it into the next release, and strong safety is important for your use case, check that it is enabled. Remember, Jun Rao, Jay Kreps, Neha Narkhede, and the rest of the Kafka team are seasoned distributed systems experts–they’re much better at this sort of thing than I am. They’re also contending with nontrivial performance and fault-tolerance constraints at LinkedIn–and those constraints shape the design space of Kafka in ways I can’t fully understand. I trust that they’ve thought about this problem extensively, and will make the right tradeoffs for their (and hopefully everyone’s) use case. Kafka is still a phenomenal persistent messaging system, and I expect it will only get better. The next post in the Jepsen series explores Cassandra, an AP datastore based on the Dynamo model. # Call me maybe: NuoDB Previously on Jepsen, we explored Zookeeper. Next up: Kafka. NuoDB came to my attention through an amazing mailing list thread by the famous database engineer Jim Starkey, in which he argues that he has disproved the CAP theorem: The CAP conjecture, I am convinced, is false and can be proven false. The CAP conjecture has been a theoretical millstone around the neck of all ACID systems. Good riddance. This is the first wooden stake for the heart of the noSQL movement. There are more coming. I, and every database user on the planet, not to mention a good part of the distributed systems research community, would love to find a counterexample which disproves the CAP theorem. For that matter, I’m tremendously excited about the possibilities of causal and lattice consistency, which we know are achievable in asynchronous networks. So I was curious: what was NimbusDB (now named NuoDB) up to? How does their consistency model work? I usually try to understand a new system by reading the documentation, scanning for words like “safety”, “order”, “serializability”, “linearizability”, “consistency”, “conflict”, and “replica”. I keep notes as I go. Here are a few excerpts from my first six hours trying to figure out NuoDB’s consistency invariants: In particular, I want to draw attention to this excerpt: If the CAP theorem means that all surviving nodes must be able to continue processing without communication after a network failure, than NUODB is not partition resistant. This is kind of an odd statement to make, because Gilbert and Lynch’s proof defines “availability” as “every request received by a non-failing node in the system must result in a response.” That would seem to imply that NuoDB does not satisfy CAP availability. If partition resistance includes the possibility for a surviving subset of the chorus to sing on, then NUODB refutes the CAP theorem. We know systems exist in which a surviving subset of nodes continue processing during a partition. They are consistent with the CAP theorem because in those systems (e.g. Zookeeper) some requests to non-failing nodes do not succeed. Claiming this “refutes the CAP theorem” is incoherent. This isn’t getting us anywhere. To figure out how NuoDB actually behaves, we’ll need to set up a cluster and test it ourselves. ## Operational notes Setting up a NuoDB cluster turned out to be more difficult than I anticipated. For starters, there are race conditions in the cluster join process. Each node has a seed node to join to, which determines the cluster it will become a part of. If that seed is inaccessible at startup, the node will quietly become a part of a new, independent cluster–and will not, as far as I can tell, join the original cluster even if the node becomes accessible later. Consequently, performing a cold start is likely to result in several independent clusters, up to and including every node considering itself the sole node in its own cluster. This is a catastrophic outcome: if any clients manage to connect to one of these isolated clusters, their operations will almost certainly disagree with the other clusters. You’ll see conflicting row values, broken primary keys, invalid foreign key relationships, and so on. I have no idea how you go about repairing that kind of damage without simply dropping all the writes on one side of the split-brain. You can join a node to itself. This is easy to do accidentally if you, say, deploy the same seed node to every node’s configuration file. The consequences are… interesting. There are also race conditions in database creation. For instance, if you create and delete the same simple table a few times in succession, you can back yourself into this corner, where you can neither use, delete, nor recreate a table, short of nuking the entire cluster: I’ve talked with the NuoDB team about these bugs, and they’re working on fixing them. Hopefully they won’t be present in future releases. Finally, be aware that restarting a crashed NuoDB node does not restore its transaction managers or storage managers; if you do a naive rolling restart, all the data vanishes. In my conversations with NuoDB’s engineering staff, it looks like this is actually intended behavior for their customers' use cases. The cluster also doesn’t set up failover replicas when nodes become unavailable, so it’s easy to accidentally lose all the storage nodes if your membership shifts. NuoDB plans to improve that behavior in future releases. ## What happens during partition? In This NuoDB test, we check the consistency of compare-and-set updates to a single cell, by having transactions compete at the SERIAL consistency level to read, update, and write a vector of numbers. Note that this test does not check multi-key linearizability, or, for that matter, exclude behaviors like P4 or P3. During a partition, with the Java driver, you could see a variety of failure modes: • “Duplicate value in unique index SEQUENCES..PRIMARY_KEY” • End of stream reached • Broken pipe • Connection reset • Indefinite latency And I do mean indefinite. I haven’t actually found an upper limit to how long NuoDB will block for. As far as I can tell, when a node is inaccessible, operations will queue up for as long as the partition lasts. Moreover, they block globally: no subset of the cluster, even though a fully connected majority component existed, responded during partition. Perhaps because all operations are queued without timeout, it takes a long time for NuoDB latencies to recover after the partition resolves. In my tests, latencies continued to spike well into the 30-60 second range for as many as 1500 seconds after the partition ended. I haven’t found an upper limit for this behavior, but eventually, something somewhere must run out of ram. ## Results NuoDB typically acknowledged 55% of writes in my tests–most, but not all, writes made during the partition failed due to CaS conflict and were not retried after Jepsen’s internal timeout. The good news is that all acknowledged writes made at the SERIAL consistency level were present in the final dataset: no dropped writes. There were also a trivial fraction of false negatives, which is typical for most CP systems. This indicates that NUODB is capable of preserving some sort of linear order over CaS operations to a single cell, even in the presence of a partition. Note that NuoDB isn’t fully CP, because it does not enforce serializability for all write operations–just “local transaction order”. I’m not exactly sure how the local orders interact, and whether there are practical scenarios which would violate serializability but be allowed by NuoDB’s local transaction invariants. So far I haven’t been able to construct a test to demonstrate the difference. Does NuoDB refute the CAP theorem? Of course it doesn’t. By deferring all operations until the partition resolves, NuoDB is not even close to available. In fact, it’s a good deal less available than more consistent systems: Zookeeper, for example, remains available on all nodes connected to a majority component. NuoDB is another example of the adage that systems which purport to be CA or CAP usually sacrifice availability or consistency when a partition does occur–and often in spectacular ways. Blocking all writes during partition is, according to the NuoDB team, intended behavior. However, there is experimental liveness detection code in the most recent release, which will hopefully allow NuoDB to begin timing out requests to inaccessible nodes. I haven’t been able to test that code path yet, but future releases may enable it by default. If you are considering using NuoDB, be advised that the project’s marketing and documentation may exceed its present capabilities. Try to enable the liveness detection code, and set up your own client timeouts to avoid propagating high latencies to other systems. Try to build backpressure hints into your clients to reduce the requests against NuoDB during failure; the latency storm which persists after the network recovers is proportional to the backlog of requests. Finally, be aware of the operational caveats mentioned earlier: monitor your nodes carefully, restart their storage and transaction managers as appropriate, and verify that newly started nodes have indeed joined the cluster before exposing them to clients. Finally, I want to note (as always) that the presence of bugs does not mean that the NuoDB engineers are incompetent–in fact, I want to assert the opposite. In my discussions with the NuoDB team I’ve found them to be friendly, capable, aware of the product’s limitations, and doing their best to solve a difficult problem within constraints of time, budget, and complexity. Given time, I’m sure they’ll get past these initial hurdles. From one employee: I only hope you’ll footnote that crazy CAP rambling with the disclaimer that no one at NuoDB today actually agrees with Jim’s comments in that thread. In the next post, we’ll learn about Kafka 0.8’s proposed replication model. # Call me maybe: Zookeeper In this Jepsen post, we’ll explore Zookeeper. Up next: NuoDB. Zookeeper, or ZK for short, is a distributed CP datastore based on a consensus protocol called ZAB. ZAB is similar to Paxos in that it offers linearizable writes and is available whenever a majority quorum can complete a round, but unlike the Paxos papers, places a stronger emphasis on the role of a single leader in ensuring the consistency of commits. Because Zookeeper uses majority quorums, in an ensemble of five nodes, any two can fail or be partitioned away without causing the system to halt. Any clients connected to a majority component of the cluster can continue to make progress safely. In addition, the linearizability property means that all clients will see all updates in the same order–although clients may drift behind the primary by an arbitrary duration. This safety property comes at a cost: writes must be durably written to a disk log on a majority of nodes before they are acknowledged. In addition, the entire dataset must fit in memory. This means that Zookeeper is best deployed for small pieces of state where linearizability and high availability is critical. Often, ZK is used to track consistent pointers to larger, immutable data stored in a different (perhaps AP) system; combining the safety and scalability advantages of both. At the same time, this strategy reduces the availability for writes, since there are two systems to fail, and one of them (ZK) requires majority quorums. ## ZNode linearizability In this test, five clients use a Curator DistributedAtom to update a list of numbers. The list is stored as a single serialized znode, and updates are applied via a CaS loop: atomically reading, decoding, appending the appropriate number, enoding, and writing back iff the value has not changed. (let [curator (framework (str (:host opts) ":2181") "jepsen") path "/set-app" state (distributed-atom curator path [])] (reify SetApp (setup [app] (reset!! state [])) (add [app element] (try (swap!! state conj element) ok (catch org.apache.zookeeper.KeeperExceptionConnectionLossException e error))) (results [app] @state) (teardown [app] (delete! curator path))))) 

Initially, the ZK leader is n1. During the test, we partition [n1 n2] away from [n3 n4 n5], which means the leader cannot commit to a majority of nodes–and consequently, writes immediately block:

After 15 seconds or so, a new leader is elected in the majority component, and writes may proceed again. However, only the clients which can see one of [n3 n4 n5] can write: clients connected to [n1 n2] time out while waiting to make contact with the leader:

When the partition is resolved, writes on [n1 n2] begin to succeed right away; the leader election protocol is stable, so there is no need for a second transition during recovery.

Consequently, in a short test (~200 seconds, ~70 second partition, evenly distributed constant write load across all nodes) ZK might offer 78% availability, asymptotically converging on 60% (3/5 nodes) availability as the duration of the partition lengthens. ZK has never dropped an acknowledged write in any Jepsen test. It also typically yields 0-2 false positives: likely due to writes proxied through n1 and n2 just prior to the partition, such that the write committed, but the acknowledgement was not received by the proxying node.

As with any experiment, we can only disconfirm hypotheses. This test demonstrates that in the presence of a partition and leader election, Zookeeper is able to maintain the linearizability invariant. However, there could be other failure modes or write patterns which would not preserve linearizability–I just haven’t been able to find them so far. Nonetheless, this is a positive result: one that all CP datastores should aim for.

## Recommendations

Use Zookeeper. It’s mature, well-designed, and battle-tested. Because the consequences of its connection model and linearizability properties are subtle, you should, wherever possible, take advantage of tested recipes and client libraries like Curator, which do their best to correctly handle the complex state transitions associated with session and connection loss.

Also keep in mind that linearizable state in Zookeeper (such as leader election) does not guarantee the linearizability of a system which uses ZK. For instance, a cluster which uses ZK for leader election might allow multiple nodes to be the leader simultaneously. Even if there are no simultaneous leaders at the same wall-clock time, message delays can result in logical inconsistencies. Designing CP systems, even with a strong coordinator, requires carefully coupling the operations in the system to the underlying coordinator state.

Next up: NuoDB.

# Automating Jepsen

If you, as a database vendor, implement a few features in your API, I can probably offer repeatable automated tests of your DB’s partition tolerance through Jepsen.

The outcome of these tests would be a set of normalized metrics for each DB like “supports linearizability”, “available for writes when a majority partition exists”, “available for writes when no majority available”, “fraction of writes successful”, “fraction of writes denied”, “fraction of writes acked then lost”, “95th latency during condition X”, and so forth. I’m thinking this would be a single-page web site–a spreadsheet, really–making it easy to compare and contrast DBs and find one that fits your safety needs.

At a minimum, I need to know:

• After initial startup, when is the database stable and ready to accept writes?
• For a given key, which node (if any) is the primary replica?
• For a given key, which node (if any) are secondary replicas?
• After partitions end, when has the database fully recovered? (e.g. has it completed handoff, replayed oplogs, etc)

I also need totally automated, reliable scripting of DB installation and provisioning. Many DBs make it really tough to join nodes from the shell.

This is gonna take several months of my time and a nontrivial amount of money for hardware. I’m looking at a few options, from physical hardware in my garage to renting EC2 compute nodes. EC2 means anybody could, in theory, run these benchmarks themselves–but there are a ton of moving pieces involved, it takes a lot more work to set up, and VM performance is really variable. Ideally, someone out there has five or six identical boxes they don’t need any more–maybe leftover desktops, 1Us from a decommissioned colo, whatever. They don’t have to be all that fast, but I’m hitting the limits of what I can do on virtualized infrastructure.

If you want to make this happen, and can help make the necessary API improvements, write automation scripts for Jepsen, provide hardware or hosting, etc., please email aphyr@aphyr.com.

# The network is reliable

I’ve been discussing Jepsen and partition tolerance with Peter Bailis over the past few weeks, and I’m honored to present this post as a collaboration between the two of us. We’d also like to extend our sincere appreciation to everyone who contributed their research and experience to this piece.

Network partitions are a contentious subject. Some claim that modern networks are reliable and that we are too concerned with designing for theoretical failure modes. They often accept that single-node failures are common but argue that we can reliably detect and handle them. Conversely, others subscribe to Peter Deutsch’s Fallacies of Distributed Computing and disagree. They attest that partitions do occur in their systems, and that, as James Hamilton of Amazon Web Services neatly summarizes, “network partitions should be rare but net gear continues to cause more issues than it should.” The answer to this debate radically affects the design of distributed databases, queues, and applications. So who’s right?

A key challenge in this dispute is the lack of evidence. We have few normalized bases for comparing network and application reliability–and even less data. We can track link availability and estimate packet loss, but understanding the end-to-end effect on applications is more difficult. The scant evidence we have is difficult to generalize: it is often deployment-specific and closely tied to particular vendors, topologies, and application designs. Worse, even when an organization has clear picture of their network’s behavior, they rarely share specifics. Finally, distributed systems are designed to resist failure, which means noticeable outages often depend on complex interactions of failure modes. Many applications silently degrade when the network fails, and resulting problems may not be understood for some time–if they are understood at all.

As a result, much of what we know about the failure modes of real-wold distributed systems is founded on guesswork and rumor. Sysadmins and developers will swap stories over beers, but detailed, public postmortems and comprehensive surveys of network availability are few and far between. In this post, we’d like to bring a few of these stories together. We believe this is a first step towards a more open and honest discussion of real-world partition behavior, and, ultimately, more robust distributed systems design.

## Rumblings from large deployments

To start off, let’s consider evidence from big players in distributed systems: companies running globally distributed infrastructure with hundreds of thousands of nodes. Of all of the data we have collected, these reports best summarize operation in the large, distilling the experience of operating what are likely the biggest distributed systems ever deployed. Their publications (unlike many of the case studies we will examine later) often capture aggregate system behavior and large-scale statistical trends, and indicate (often obliquely) that partitions are a significant concern in their deployments.

### The Microsoft Datacenter Study

A team from the University of Toronto and Microsoft Research studied the behavior of network failures in several of Microsoft’s datacenters. They found an average failure rate of 5.2 devices per day and 40.8 links per day with a median time to repair of approximately five minutes (and up to one week). While the researchers note that correlating link failures and communication partitions is challenging, they estimate a median packet loss of 59,000 packets per failure. Perhaps more concerning is their finding that network redundancy improves median traffic by only 43%; that is, network redundancy does not eliminate many common causes of network failure.

### HP Enterprise Managed Networks

A joint study between researchers at University of California, San Diego and HP Labs examined the causes and severity of network failures in HP’s managed networks by analyzing support ticket data. “Connectivity”-related tickets accounted for 11.4% of support tickets (14% of which were of the highest priority level), with a median incident duration of 2 hours and 45 minutes for the highest priority tickets and and a median duration of 4 hours 18 minutes for all priorities.

Google’s paper describing the design and operation of Chubby, their distributed lock manager, outlines the root causes of 61 outages over 700 days of operation across several clusters. Of the nine outages that lasted greater than 30 seconds, four were caused by network maintenance and two were caused by “suspected network connectivity problems.”

### Google’s Design Lessons from Distributed Systems

In Design Lessons and Advice from Building Large Scale Distributed Systems, Jeff Dean suggests that a typical first year for a new Google cluster involves:

• 5 racks going wonky (40-80 machines seeing 50% packet loss)
• 8 network maintenances (4 might cause ~30-minute random connectivity losses)
• 3 router failures (have to immediately pull traffic for an hour)

While Google doesn’t tell us much about the application-level consequences of their network partitions, “Lessons From Distributed Systems” suggests they are a significant concern, citing the challenge of “[e]asy-to-use abstractions for resolving conflicting updates to multiple versions of a piece of state” as useful in “reconciling replicated state in different data centers after repairing a network partition.”

### Amazon Dynamo

Amazon’s Dynamo paper frequently cites the incidence of partitions as a driving design consideration. Specifically, the authors note that they rejected designs from “traditional replicated relational database systems” because they “are not capable of handling network partitions.”

### Yahoo! PNUTS/Sherpa

Yahoo! PNUTS/Sherpa was designed as a distributed database operating out of multiple, geographically distinct sites. Originally, PNUTS supported a strongly consistent “timeline consistency” operation, with one master per data item. However, the developers noted that, in the event of “network partitioning or server failures,” this design decision was too restrictive for many applications:

The first deployment of Sherpa supported the timeline-consistency model — namely, all replicas of a record apply all updates in the same order — and has API-level features to enable applications to cope with asynchronous replication. Strict adherence leads to difficult situations under network partitioning or server failures. These can be partially addressed with override procedures and local data replication, but in many circumstances, applications need a relaxed approach.“

## Application-level failures

Not all partitions originate in the physical network. Sometimes dropped or delayed messages are a consequence of crashes, race conditions, OS scheduler latency, or overloaded processes. The following studies highlight the fact that partitions–wherein the system delays or drops messages–can occur at any layer of the software stack.

### CPU use and service contention

Bonsai.io discovered high CPU and memory use on an ElasticSearch node combined with difficulty connecting to various cluster components, likely a consequence of an "excessively high number of expensive requests being allowed through to the cluster.”

They restarted the cluster, but on restarting the cluster partitioned itself into two independent components. A subsequent cluster restart resolved the partition, but customers complained they were unable to delete or create indices. The logs revealed that servers were repeatedly trying to recover unassigned indices, which “poisoned the cluster’s attempt to service normal traffic which changes the cluster state.” The failure led to 20 minutes of unavailability and six hours of degraded service.

Bonsai concludes by noting that large-scale ElasticSearch clusters should use dedicated nodes which handle routing and leader election without serving normal requests for data, to prevent partitions under heavy load. They also emphasize the importance of request throttling and setting proper quorum values.

### Long GC pauses and IO

Stop-the-world garbage collection and blocking for disk IO can cause runtime latencies on the order of seconds to minutes. As Searchbox IO and dozens of other production users have found, GC pressure in an ElasticSearch cluster can cause secondary nodes to declare a primary dead and to attempt a new election. Because their configuration used a low value of zen.minimum_master_nodes, ElasticSearch was able to elect two simultaneous primaries, leading to inconsistency and downtime. Even with minimum_master_nodes larger than a majority, ElasticSearch does not prevent nodes from taking part in multiple network components; GC pauses and high IO_WAIT times due to IO can cause split brain, write loss, and index corruption.

### MySQL overload and a Pacemaker segfault

Github relies heavily on Pacemaker and Heartbeat: programs which coordinate cluster resources between nodes. They use Percona Replication Manager, a resource agent for Pacemaker, to replicate their MySQL database between three nodes.

On September 10th, 2012, a routine database migration caused unexpectedly high load on the MySQL primary. Percona Replication Manager, unable to perform health checks against the busy MySQL instance, decided the primary was down and promoted a secondary. The secondary had a cold cache and performed poorly. Normal query load on the node caused it to slow down, and Percona failed back to the original primary. The operations team put Pacemaker into maintenance-mode, temporarily halting automatic failover. The site appeared to recover.

The next morning, the operations team discovered that the standby MySQL node was no longer replicating changes from the primary. Operations decided to disable Pacemaker’s maintenance mode to allow the replication manager to fix the problem.

Upon attempting to disable maintenance-mode, a Pacemaker segfault occurred that resulted in a cluster state partition. After this update, two nodes (I’ll call them ‘a’ and ‘b’) rejected most messages from the third node (‘c’), while the third node rejected most messages from the other two. Despite having configured the cluster to require a majority of machines to agree on the state of the cluster before taking action, two simultaneous master election decisions were attempted without proper coordination. In the first cluster, master election was interrupted by messages from the second cluster and MySQL was stopped.

In the second, single-node cluster, node ‘c’ was elected at 8:19 AM, and any subsequent messages from the other two-node cluster were discarded. As luck would have it, the ‘c’ node was the node that our operations team previously determined to be out of date. We detected this fact and powered off this out-of-date node at 8:26 AM to end the partition and prevent further data drift, taking down all production database access and thus all access to github.com.

The partition caused inconsistency in the MySQL database–both between the secondary and primary, and between MySQL and other data stores like Redis. Because foreign key relationships were not consistent, Github showed private repositories to the wrong users' dashboards and incorrectly routed some newly created repos.

Github thought carefully about their infrastructure design, and were still surprised by a complex interaction of partial failures and software bugs. As they note in the postmortem:

… if any member of our operations team had been asked if the failover should have been performed, the answer would have been a resounding no.

Distributed systems are hard.

## NICs and drivers

### BCM5709 and friends

Unreliable NIC hardware or drivers are implicated in a broad array of partitions. Marc Donges and Michael Chan bring us a thrilling report of the popular Broadcom BCM5709 chipset abruptly dropping inbound but not outbound packets to a machine. Because the NIC dropped inbound packets, the node was unable to service requests. However, because it could still send heartbeats to its hot spare via keepalived, the spare considered the primary alive and refused to take over. The service was unavailable for five hours and did not recover without a reboot.

Sven Ulland followed up, reporting the same symptoms with the BCM5709S chipset on Linux 2.6.32-41squeeze2. Despite pulling commits from mainline which supposedly fixed a similar set of issues with the bnx2 driver, they were unable to resolve the issue until version 2.6.38.

Since Dell shipped a large number of servers with the BCM5709, the impact of these firmware bugs was widely observed. For instance, the 5709 and some chips had a bug in their 802.3x flow control code causing them to spew PAUSE frames when the chipset crashed or its buffer filled up. This problem was magnified by the BCM56314 and BCM56820 switch-on-a-chip devices (a component in a number of Dell’s top-of-rack switches), which, by default, spewed PAUSE frames at every interface trying to communicate with the offending 5709 NIC. This led to cascading failures on entire switches or networks.

The bnx2 driver could also cause transient or flapping network failures, as described in this ElasticSearch split brain report. Meanwhile, the the Broadcom 57711 was notorious for causing extremely high latencies under load with jumbo frames, a particularly thorny issue for ESX users with iSCSI-backed storage.

### Intel 82574 Packet of Death

A motherboard manufacturer failed to flash the EEPROM correctly for their Intel 82574 based system. The result was a very hard to diagnose error where an inbound SIP packet of a particular structure would disable the NIC. Only a cold restart would bring the system back to normal.

### A GlusterFS partition caused by a driver bug

After a scheduled upgrade, CityCloud noticed unexpected network failures in two distinct GlusterFS pairs, followed by a third. Suspecting link aggregation, CityCloud disabled the feature on their switches and allowed self-healing operations to proceed.

Roughly 12 hours later, the network failures returned on one node. CityCloud identified the cause as a driver issue and updated the downed node, returning service. However, the outage resulted in data inconsistency between GlusterFS pairs:

As the servers lost storage abruptly there were certain types of Gluster issues where files did not match each other on the two nodes in each storage pair. There were also some cases of data corruption in the VMs filesystems due to VMs going down in an uncontrolled way.

## Datacenter network failures

Individual network interfaces can fail, but they typically appear as single-node outages. Failures located in the physical network are often more nefarious. Switches are subject to power failure, misconfiguration, firmware bugs, topology changes, cable damage, and malicious traffic. Their failure modes are accordingly diverse:

### Power failure on both redundant switches

As Microsoft’s SIGCOMM paper suggests, redundancy doesn’t always prevent link failure. When a power distribution unit failed and took down one of two redundant top-of-rack switches, Fog Creek lost service for a subset of customers on that rack but remained consistent and available for most users. However, the other switch in that rack also lost power for undetermined reasons. That failure isolated the two neighboring racks from one another, taking down all On Demand services.

### Switch split-brain caused by BPDU flood

During a planned network reconfiguration to improve reliability, Fog Creek suddenly lost access to their network.

A network loop had formed between several switches.

The gateways controlling access to the switch management network were isolated from each other, generating a split-brain scenario. Neither were accessible due to a sudden traffic flood.

The flood was the result of a multi-switch BPDU (bridge protocol data unit) flood, indicating a spanning-tree flap. This is most likely what was changing the loop domain.

According to the BPDU standard, the flood shouldn’t have happened. But it did, and this deviation from the system’s assumptions resulted in two hours of total service unavailability.

### Bridge loops, misconfiguration, broken MAC caches

In an effort to address high latencies caused by a daisy-chained network topology, Github installed a set of aggregation switches in their datacenter. Despite a redundant network, the installation process resulted in bridge loops, and switches disabled links to prevent failure. This problem was quickly resolved, but later investigation revealed that many interfaces were still pegged at 100% capacity.

While investigating that problem, a misconfigured switch triggered aberrant automatic fault detection behavior: when one link was disabled, the fault detector disabled all links. This caused 18 minutes of hard downtime. The problem was later traced to a firmware bug preventing switches from updating their MAC address caches correctly, which forced them to broadcast most packets to every interface.

### Mystery RabbitMQ partitions

Sometimes, nobody knows why a system partitions. This RabbitMQ failure seems like one of those cases: few retransmits, no large gaps between messages, and no clear loss of connectivity between nodes. Upping the partition detection timeout to 2 minutes reduced the frequency of partitions but didn’t prevent them altogether.

### DRBD split-brain

When a two-node cluster partitions, there are no cases in which a node can reliably declare itself to be the primary. When this happens to a DRBD filesystem, as one user reported, both nodes can remain online and accept writes, leading to divergent filesystem-level changes. The only realistic option for resolving these kinds of conflicts is to discard all writes not made to a selected component of the cluster.

### A NetWare split-brain

Short-lived failures can lead to long outages. In this Usenet post to novell.support.cluster-services, an admin reports their two-node failover cluster running Novell NetWare experienced transient network outages. The secondary node eventually killed itself, and the primary (though still running) was no longer reachable by other hosts on the network. The post goes on to detail a series of network partition events correlated with backup jobs!

### MLAG, Spanning Tree, and STONITH

Github writes great postmortems, and this one is no exception. On December 22nd, 2012, a planned software update on an aggregation switch caused some mild instability during the maintenance window. In order to collect diagnostic information about the instability, the network vendor killed a particular software agent running on one of the aggregation switches.

Github’s aggregation switches are clustered in pairs using a feature called MLAG, which presents two physical switches as a single layer 2 device. The MLAG failure detection protocol relies on both ethernet link state and a logical heartbeat message exchanged between nodes. When the switch agent was killed, it was unable to shut down the ethernet link. Unlucky timing confused the MLAG takeover, preventing the still-healthy agg switch from handling link aggregation, spanning-tree, and other L2 protocols as normal. This forced a spanning-tree leader election and reconvergence for all links, blocking all traffic between access switches for 90 seconds.

The 90-second network partition caused fileservers using Pacemaker and DRBD for HA failover to declare each other dead, and to issue STONITH (Shoot The Other Node In The Head) messages to one another. The network partition delayed delivery of those messages, causing some fileserver pairs to believe they were both active. When the network recovered, both nodes shot each other at the same time. With both nodes dead, files belonging to the pair were unavailable.

To prevent filesystem corruption, DRBD requires that administrators ensure the original primary node is still the primary node before resuming replication. For pairs where both nodes were primary, the ops team had to examine log files or bring the node online in isolation to determine its state. Recovering those downed fileserver pairs took five hours, during which Github service was significantly degraded.

## Hosting providers

Running your own datacenter can be cheaper and more reliable than using public cloud infrastructure, but it also means you have to be a network and server administrator. What about hosting providers, which rent dedicated or virtualized hardware to users and often take care of the network and hardware setup for you?

### An undetected GlusterFS split-brain

Freistil IT hosts their servers with a colocation/managed-hosting provider. Their monitoring system alerted Freistil to 50–100% packet loss localized to a specific datacenter. The network failure, caused by a router firmware bug, returned the next day. Elevated packet loss caused the GlusterFS distributed filesystem to enter split-brain undetected:

Unfortunately, the malfunctioning network had caused additional problems which we became aware of in the afternoon when a customer called our support hotline because their website failed to deliver certain image files. We found that this was caused by a split-brain situation on the storage cluster “stor02″ where changes made on node “stor02b” weren’t reflected on “stor02a” and the self-heal algorithm built into the Gluster filesystem was not able to resolve this inconsistency between the two data sets.

Repairing that inconsistency led to a “brief overload of the web nodes because of a short surge in network traffic.”

### An anonymous hosting provider

From what we can gather informally, all the major managed hosting providers experience regular network failures. One company running 100-200 nodes on a major hosting provider reported that in a 90-day period the provider’s network went through five distinct periods of partitions. Some partitions disabled connectivity between the provider’s cloud network and the public internet, and others separated the cloud network from the provider’s internal managed-hosting network. The failures caused unavailability, but because this company wasn’t running any significant distributed systems across those partitioned networks, there was no observed inconsistency or data loss.

### Pacemaker/Heartbeat split-brain

A post to Linux-HA details a long-running partition between a Heartbeat pair, in which two Linode VMs each declared the other dead and claimed a shared IP for themselves. Successive posts suggest further network problems: emails failed to dispatch due to DNS resolution failure, and nodes reported “network unreachable.” In this case, the impact appears to have been minimal–in part because the partitioned application was just a proxy.

## Cloud networks

Large-scale virtualized environments are notorious for transient latency, dropped packets, and full-blown network partitions, often affecting a particular software version or availability zone. Sometimes the failures occur between specific subsections of the provider’s datacenter, revealing planes of cleavage in the underlying hardware topology.

### An isolated MongoDB primary on EC2

In a comment on Call me maybe: MongoDB, Scott Bessler observed exactly the same failure mode Kyle demonstrated in the Jepsen post:

“Prescient. The w=safe scenario you show (including extra fails during rollback/re-election) happened to us today when EC2 West region had network issues that caused a network partition that separated PRIMARY from its 2 SECONDARIES in a 3 node replset. 2 hours later the old primary rejoined and rolled back everything on the new primary. Our bad for not using w=majority.”

This partition caused two hours of write loss. From our conversations with large-scale MongoDB users, we gather that network events causing failover on EC2 are common. Simultaneous primaries accepting writes for multiple days are not unknown.

### Mnesia split-brain on EC2

EC2 outages can leave two nodes connected to the internet but unable to see each other. This type of partition is especially dangerous, as writes to both sides of a partitioned cluster can cause inconsistency and lost data. That’s exactly what happened to this Mnesia cluster, which diverged overnight. Their state wasn’t critical, so the operations team simply nuked one side of the cluster. They conclude: “the experience has convinced us that we need to prioritize up our network partition recovery strategy”.

### EC2 instability causing MongoDB and ElasticSearch unavailability

Network disruptions in EC2 can affect only certain groups of nodes. For instance, this report of a total partition between the frontend and backend stacks states that their the web servers lose their connections to all backend instances for a few seconds, several times a month. Even though the disruptions were short, cluster convergence resulted in 30-45 minute outages and a corrupted index for ElasticSearch. As problems escalated, the outages occurred “2 to 4 times a day.”

### VoltDB split-brain on EC2

One VoltDB user reports regular network failures causing replica divergence but also indicates that their network logs included no dropped packets. Because this cluster had not enabled split-brain detection, both nodes ran as isolated primaries, causing significant data loss.

### ElasticSearch discovery failure on EC2

Another EC2 split-brain: a two-node cluster failed to converge on “roughly 1 out of 10 startups” when discovery messages took longer than three seconds to exchange. As a result, both nodes would start as primaries with the same cluster name. Since ElasticSearch doesn’t demote primaries automatically, split-brain persisted until administrators intervened. Upping the discovery timeout to 15 seconds resolved the issue.

### RabbitMQ and ElasticSearch on Windows Azure

There are a few scattered reports of Windows Azure partitions, such as this account of a RabbitMQ cluster which entered split-brain on a weekly basis. There’s also this report of an ElasticSearch split-brain, but since Azure is a relative newcomer compared to EC2, descriptions of its network reliability are limited.

### AWS EBS outage

On April 21st, 2011, Amazon Web Services went down for over 12 hours, causing hundreds of high-profile web sites to go offline. As a part of normal AWS scaling activities, Amazon engineers shifted traffic away from a router in the Elastic Block Store (EBS) network in a single US-East Availability Zone (AZ).

The traffic shift was executed incorrectly and rather than routing the traffic to the other router on the primary network, the traffic was routed onto the lower capacity redundant EBS network. For a portion of the EBS cluster in the affected Availability Zone, this meant that they did not have a functioning primary or secondary network because traffic was purposely shifted away from the primary network and the secondary network couldn’t handle the traffic level it was receiving. As a result, many EBS nodes in the affected Availability Zone were completely isolated from other EBS nodes in its cluster. Unlike a normal network interruption, this change disconnected both the primary and secondary network simultaneously, leaving the affected nodes completely isolated from one another.

The partition coupled with aggressive failure-recovery code caused a mirroring storm, which led to network congestion and triggered a previously unknown race condition in EBS. EC2 was unavailable for roughly 12 hours, and EBS was unavailable or degraded for over 80 hours.

The EBS failure also caused an outage in Amazon’s Relational Database Service. When one AZ fails, RDS is designed to fail over to a different AZ. However, 2.5% of multi-AZ databases in US-East failed to fail over due to “stuck” IO.

The primary cause was that the rapid succession of network interruption (which partitioned the primary from the secondary) and “stuck” I/O on the primary replica triggered a previously un-encountered bug. This bug left the primary replica in an isolated state where it was not safe for our monitoring agent to automatically fail over to the secondary replica without risking data loss, and manual intervention was required.“

This correlated failure caused widespread outages for clients relying on AWS. For example, Heroku reported between 16 and 60 hours of unavailability for their users' databases.

### Isolated Redis primary on EC2

On July 18, 2013, Twilio’s billing system, which stores account credits in Redis, failed.. A network partition isolated the Redis primary from all billing secondaries. Because Twilio did not promote a new secondary, writes to the primary remained consistent. However, when the primary became visible to the secondaries again, all secondaries initiated a full resynchronization with the primary simultaenously. This overloaded the primary, causing services which relied on the Redis primary to fail.

The ops team restarted the Redis primary to address the high load; but on restarting, the Redis primary reloaded an incorrect configuration file which caused it to become a slave of itself. The primary entered read-only mode, which stopped all billing system writes. With all account balances at zero, and read-only, every Twilio call caused the billing system to automatically re-charge customer credit cards. 1.1% of customers were overbilled, for roughly 40 minutes. Appointment Reminder, for example, reported that every SMS message and phone call they issued resulted in a $500 charge to their credit card, which stopped accepting charges after$3500.

Twilio recovered the billing state from an independent billing system–a relational datastore–and after some hiccups, restored proper service, including credits to affected users.

While we have largely focused on failures over local area networks (or near-local networks), wide area network (WAN) failures are also common–if less frequently documented. These failures are particularly interesting because there are often fewer redundant WAN routes and because systems guaranteeing high availability (and disaster recovery) often require distribution across multiple datacenters. Accordingly, graceful degradation under partitions or increased latency is especially important for geographically widespread services.

### PagerDuty

PagerDuty designed their system to remain available in the face of node, datacenter, or even provider failure; their services are replicated between two EC2 regions and a datacenter hosted by Linode. On April 13, 2013, an AWS peering point in northern California degraded, causing connectivity issues for one of PagerDuty’s EC2 nodes. As latencies between AWS availability zones rose, the notification dispatch system lost quorum and stopped dispatching messages entirely.

Even though PagerDuty’s infrastructure was designed with partition tolerance in mind, correlated failures due to a shared peering point between two datacenters caused 18 minutes of unavailability, dropping inbound API requests and delaying queued pages until quorum was re-established.

### CENIC Study

Researchers at the University of California, San Diego quantitatively analyzed five years of operation in the CENIC wide-area network, which contains over two hundred routers across California. By cross-correlating link failures and additional external BGP and traceroute data, they discovered over 508 "isolating network partitions” that caused connectivity problems between hosts. Average partition duration ranged from 6 minutes for software-related failures to over 8.2 hours for hardware-related failures (median 2.7 and 32 minutes; 95th percentile of 19.9 minutes and 3.7 days, respectively).

## Global routing failures

Despite the high level of redundancy in internet systems, some network failures take place on a globally distributed scale.

### Cloudflare

CloudFlare runs 23 datacenters with redundant network paths and anycast failover. In response to a DDoS attack against one of their customers, their operations team deployed a new firewall rule to drop packets of a specific size. Juniper’s FlowSpec protocol propagated that rule to all CloudFlare edge routers–but then:

What should have happened is that no packet should have matched that rule because no packet was actually that large. What happened instead is that the routers encountered the rule and then proceeded to consume all their RAM until they crashed.

Recovering from the failure was complicated by routers which failed to reboot automatically, and inaccessible management ports.

Even though some data centers came back online initially, they fell back over again because all the traffic across our entire network hit them and overloaded their resources.

CloudFlare monitors their network carefully and the ops team had immediate visibility into the failure. However, coordinating globally distributed systems is complex, and calling on-site engineers to find and reboot routers by hand takes time. Recovery began after 30 minutes, and was complete after an hour of unavailability.

### Juniper routing bug

A firmware bug introduced as a part of an upgrade in Juniper Networks’s routers caused outages in Level 3 Communications’s networking backbone. This subsequently knocked services like Time Warner Cable, RIM BlackBerry, and several UK internet service providers offline.

### Global BGP outages

There have been several global Internet outages related to BGP misconfiguration. Notably, in 2008, Pakistan Telecom, responding to a government edict to block YouTube.com, incorrectly advertised its (blocked) route to other provides, which hijacked traffic from the site and briefly rendered it unreachable. In 2010, a group of Duke University researchers achieved a similar effect by testing an experimental flag in the BGP protocol. Similar incidents occurred in 2006 (knocking sites like Martha Stewart Living and The New York Times offline), in 2005 (where a misconfiguration in Turkey attempted in a redirect for the entire internet), and in 1997.

## Where do we go from here?

This post is meant as a reference point–to illustrate that, according to a wide range of accounts, partitions occur in many real-world environments. Processes, servers, NICs, switches, local and wide area networks can all fail, and the resulting economic consequences are real. Network outages can suddenly arise in systems that are stable for months at a time, during routine upgrades, or as a result of emergency maintenance. The consequences of these outages range from increased latency and temporary unavailability to inconsistency, corruption, and data loss. Split-brain is not an academic concern: it happens to all kinds of systems–sometimes for days on end. Partitions deserve serious consideration.

On the other hand, some networks really are reliable. Engineers at major financial firms report that despite putting serious effort into designing systems that gracefully tolerate partitions, their networks rarely, if ever, exhibit partition behavior. Cautious engineering (and lots of money) can prevent outages.

However, not all organizations can afford the cost or operational complexity of highly reliable networks. From Google and Amazon (who operate commodity and/or low-cost hardware due to sheer scale) to one-man startups built on shoestring budgets, communication-isolating network failures are a real risk.

It’s important to consider this risk before a partition occurs–because it’s much easier to make decisions about partition tolerance on a whiteboard than to redesign, re-engineer, and upgrade a complex system in a production environment–especially when it’s throwing errors at your users. For some applications, failure is an option–but you should characterize and explicitly account for it as a part of your design.

We invite you to contribute your own experiences with or without network partitions. Open a pull request on https://github.com/aphyr/partitions-post, leave a comment, write a blog post, or release a post-mortem. Data will inform this conversation, future designs, and, ultimately, the availability of the systems we all depend on.

# Asynchronous replication with failover

In response to my earlier post on Redis inconsistency, Antirez was kind enough to help clarify some points about Redis Sentinel's design.

First, I'd like to reiterate my respect for Redis. I've used Redis extensively in the past with good results. It's delightfully fast, simple to operate, and offers some of the best documentation in the field. Redis is operationally predictable. Data structures and their performance behave just how you'd expect. I hear nothing but good things about the clarity and quality of Antirez' C code. This guy knows his programming.

I think Antirez and I agree with each other, and we're both saying the same sorts of things. I'd just like to expand on some of these ideas a bit, and generalize to a broader class of systems.

First, the distributed system comprised of Redis and Redis Sentinel cannot be characterized as consistent. Nor can MongoDB with anything less than WriteConcern.MAJORITY, or MySQL with asynchronous replication, for that matter. Antirez writes:

What I'm saying here is that just the goal of the system is:

1) To promote a slave into a master if the master fails.
2) To do so in a reliable way.

Redis Sentinel does reliably promote secondaries into primaries. It is so good at this that it can promote two, three, or all of your secondaries into primaries concurrently, and keep them in that state indefinitely. As we've seen, having causally unconnected primaries in this kind of distributed system allows for conflicts–and since Redis Sentinel will destroy the state on an old primary when it becomes visible to a quorum of Sentinels, this can lead to arbitrary loss of acknowledged writes to the system.

Ok I just made clear enough that there is no such goal in Sentinel to turn N Redis instances into a distributed store,

If you use any kind of failover, your Redis system is a distributed store. Heck, reading from secondaries makes Redis a distributed store.

So you can say, ok, Sentinel has a limited scope, but could you add a feature so that when the master feels in the minority it no longer accept writes? I don't think it's a good idea. What it means to be in the minority for a Redis master monitored by Sentinels (especially given that Redis and Sentinel are completely separated systems)?

Do you want your Redis master stopping to accept writes when it is no longer able to replicate to its slaves?

Yes. This is required for a CP system with failover. If you don't do it, your system can and will lose data. You cannot achieve consistency in the face of a partition without sacrificing availability. If you want Redis to be AP, then don't destroy the data on the old primaries by demoting them. Preserve conflicts and surface them to the clients for merging.

You could do this as an application developer by setting every Redis node to be a primary, and writing a proxy layer which uses, say, consistent hashing and active anti-entropy to replicate writes between nodes. Take a look at Antirez's own experiments in this direction. If you want a CP system, you could follow Datomic's model and use immutable shared-structure values in Redis, combined with, say, Zookeeper for mutable state.

## Why topology matters

Antirez recommends a different approach to placing Sentinels than I used in my Redis experiments:

… place your Sentinels and set your quorum so that you are defensive enough against partitions. This way the system will activate only when the issue is really the master node down, not a network problem. Fear data loss and partitions? Have 10 Linux boxes? Put a Sentinel in every box and set quorum to 8.

I… can't parse this statement in a way that makes sense. Adding more boxes to a distributed system doesn't reduce the probability of partitions–and more to the point, trying to determine the state of a distributed system from outside the system itself is fundamentally flawed.

I mentioned that having the nodes which determine the cluster state (the Sentinels) be separate from the nodes which actually perform the replication (the Redis servers) can lead to worse kinds of partitions. I'd like to explain a little more, because I'm concerned that people might actually be doing this in production.

In this image, S stands for Sentinel, R stands for a Redis server, and C stands for Client. A box around an R indicates that node is a primary, and where it is able to replicate data to a secondary Redis server, an arrow is shown on that path. Lines show open network connections, and the jagged border shows a network partition.

Let's say we place our sentinels on 3 nodes to observe a three-node cluster. In the left-hand scenario, the majority of Sentinels are isolated, with two servers, from the clients. They promote node 2 to be a new primary, and it begins replicating to node 3. Node 1, however, is still a primary. Clients will continue writing to node 1, even though a.) its durability guarantees are greatly diminished–if it dies, all writes will be lost, and b.) the node doesn't have a quorum, so it cannot safely accept writes. When the partition resolves, the Sentinels will demote node 1 to a secondary and replace its data with the copy from N2, effectively destroying all writes during the partition.

On the right-hand side, a fully connected group of Sentinels can only see one Redis node. It's not safe to promote that node, because it doesn't have a majority and servers won't demote themselves when isolated, but the sentinels do it anyway. This scenario could be safely available to clients because a majority is present, but Redis Sentinel happily creates a split-brain and obliterates the data on the first node at some later time.

If you take Antirez' advice and colocate the sentinels with your clients, we can still get in to awful states. On the left, an uneven partition between clients and servers means we elect a minority Redis server as the primary, even though it can't replicate to any other nodes. The majority component of the servers can still accept writes, but they're doomed: when the clients are able to see those nodes again, they'll wipe out all the writes that took place on those 2 nodes.

On the right, we've got the same partition topology I demonstrated in the Redis post. Same deal: split brain means conflicting writes and throwing away data.

If you encounter intermittent or rolling partitions (which can happen in the event of congestion and network failover), shifting quorums coupled with the inability of servers to reason about their own cluster state could yield horrifying consequences, like every node being a primary at the same time. You might be able to destroy not only writes that took place during the partition, but all data ever written–not sure if the replication protocol allows this or if every node just shuts down.

Bottom line: if you're building a distributed system, you must measure connectivity in the distributed system itself, not by what you can see from the outside. Like we saw with MongoDB and Riak, it's not the wall-clock state that matters–it's the logical messages in the system. The further you get from those messages, the wider your windows for data loss.

## It's not just Sentinel

I assert that any system which uses asynchronous primary-secondary replication, and can change which node is the primary, is inconsistent. Why? If you write an operation to the primary, and then failover occurs before the operation is replicated to the node which is about to become the new primary, the new primary won't have that operation. If your replication strategy is to make secondaries look like the current primary, the system isn't just inconsistent, but can actually destroy acknowledged operations.

Here's a formal model of a simple system which maintains a log of operations. At any stage, one of three things can happen: we can write an operation to the primary, replicate the log of the primary to the secondary, or fail over:

------------------------------ MODULE failover ------------------------------ EXTENDS Naturals, Sequences, TLC CONSTANT Ops \* N1 and N2 are the list of writes made against each node VARIABLES n1, n2 \* The list of writes acknowledged to the client VARIABLE acks \* The current primary node VARIABLE primary \* The types we allow variables to take on TypeInvariant == /\ primary \in {1, 2} /\ n1 \in Seq(Ops) /\ n2 \in Seq(Ops) /\ acks \in Seq(Ops) \* An operation is acknowledged if it has an index somewhere in acks. IsAcked(op) == \E i \in DOMAIN acks : acks[i] = op \* The system is *consistent* if every acknowledged operation appears, \* in order, in the current primary's oplog: Consistency == acks = SelectSeq((IF primary = 1 THEN n1 ELSE n2), IsAcked) \* We'll say the system is *potentially consistent* if at least one node \* has a superset of our acknowledged writes in order. PotentialConsistency == \/ acks = SelectSeq(n1, IsAcked) \/ acks = SelectSeq(n2, IsAcked) \* To start out, all oplogs are empty, and the primary is n1. Init == /\ primary = 1 /\ n1 = <<>> /\ n2 = <<>> /\ acks = <<>> \* A client can send an operation to the primary. The write is immediately \* stored on the primary and acknowledged to the client. Write(op) == IF primary = 1 THEN /\ n1' = Append(n1, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n2, primary>> ELSE /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n1, primary>> \* For clarity, we'll have the client issues unique writes WriteSomething == \E op \in Ops : ~IsAcked(op) /\ Write(op) \* The primary can *replicate* its state by forcing another node \* into conformance with its oplog Replicate == IF primary = 1 THEN /\ n2' = n1 /\ UNCHANGED <<n1, acks, primary>> ELSE /\ n1' = n2 /\ UNCHANGED <<n2, acks, primary>> \* Or we can failover to a new primary. Failover == /\ IF primary = 1 THEN primary' = 2 ELSE primary = 1 /\ UNCHANGED <<n1, n2, acks>> \* At each step, we allow the system to either write, replicate, or fail over Next == \/ WriteSomething \/ Replicate \/ Failover

This is written in the TLA+ language for describing algorithms, which encodes a good subset of ZF axiomatic set theory with first-order logic and the Temporal Law of Actions. We can explore this specification with the TLC model checker, which takes our initial state and evolves it by executing every possible state transition until it hits an error:

This protocol is inconsistent. The fields in red show the state changes during each transition: in the third step, the primary is n2, but n2's oplog is empty, instead of containing the list <<2>>. In fact, this model fails the PotentiallyConsistent invariant shortly thereafter, if replication or a write occurs. We can also test for the total loss of writes; it fails that invariant too.

That doesn't mean primary-secondary failover systems must be inconsistent. You just have to ensure that writes are replicated before they're acknowledged:

\* We can recover consistency by making the write protocol synchronous SyncWrite(op) == /\ n1' = Append(n1, op) /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED primary \* This new state transition satisfies both consistency constraints SyncNext == \/ \E op \in Ops : SyncWrite(op) \/ Replicate \/ Failover

And in fact, we don't have to replicate to all nodes before ack to achieve consistency–we can get away with only writing to a quorum, if we're willing to use a more complex protocol like Paxos.

## The important bit

So you skimmed the proof; big deal, right? The important thing that it doesn't matter how you actually decide to do the failover: Sentinel, Mongo's gossip protocol, Heartbeat, Corosync, Byzantine Paxos, or a human being flipping the switch. Redis Sentinel happens to be more complicated than it needs to be, and it leaves much larger windows for write loss than it has to, but even if it were perfect the underlying Redis replication model is fundamentally inconsistent. We saw the same problem in MongoDB when we wrote with less than WriteConcern.MAJORITY. This affects asynchronous replication in MySQL and Postgres. It affects DRBD (yeaaaahhh, this can happen to your filesystem). If you use any of this software, you are building an asynchronous distributed system, and there are eventualities that have to be acknowledged.

Look guys, there's nothing new here. This is an old proof and many mature software projects (for instance, DRBD or RabbitMQ) explain the inconsistency and data-loss consequences of a partition in their documentation. However, not everyone knows. In fact, a good number of people seem shocked.

Why is this? I think it might be because software engineering is a really permeable field. You can start out learning Rails, and in two years wind up running four distributed databases by accident. Not everyone chose or could afford formal education, or was lucky enough to have a curmudgeonly mentor, or happened to read the right academic papers or find the right blogs. Now they might be using Redis as a lock server, or storing financial information in MongoDB. Is this dangerous? I honestly don't know. Depends on how they're using the system.

I don't view this so much as an engineering problem as a cultural one. Knives still come with sharp ends. Instruments are still hard for beginners to play. Not everything can or should be perfectly safe–or accessible. But I think we should warn people about what can happen, up front.

Tangentially: like many cultures, much of our collective understanding about what is desirable or achievable in distributed systems is driven by advertising. Yeah, MongoDB. That means you. ;-)

## Bottom line

I don't mean to be a downer about all this. Inconsistency and even full-out data loss aren't the end of the world. Asynchronous replication is a good deal faster, both in bulk throughput and client latencies. I just think we lose sight, occasionally, of what that means for our production systems. My goal in writing Jepsen has been to push folks to consider their consistency properties carefully, and to explain them clearly to others. I think that'll help us all build safer systems. :)

# Call me maybe: final thoughts

Previously in Jepsen, we discussed Riak. Now we'll review and integrate our findings.

We started this series with an open problem.

Notorious computer expert Joe Damato explains: “Literally no one knows.”

We've pushed the boundaries of our knowledge a little, though. By building a simple application which models a sequence of causally dependent writes, recording a log of that app's view of the world, and comparing that log to the final state of the database, we were able to verify–and challenge–our assumptions about the behavior of various distributed systems. In this talk we discussed one particular type of failure mode: a stable network partition which isolated one or more primary nodes–and explored its consequences in depth.

In each case, the system did something… odd. Maybe we hadn't fully thought through the consequences of the system, even if they were documented. Maybe the marketing or documentation were misleading, or flat-out lies. We saw design flaws, like the Redis Sentinel protocol. Some involved bugs, like MongoDB's WriteConcern.MAJORITY treating network errors as successful acknowledgements. Other times we uncovered operational caveats, like Riak's high latencies before setting up fallback vnodes. In each case, the unexpected behavior led to surprising new information about the challenge of building correct distributed systems.

In this series, we chose a simple network failure which we know happens to real production systems. The test encoded specific assumptions about concurrency, throughput, latency, timeout, error handling, and conflict resolution. The results demonstrate one point in a high-dimensional parameter space. The fraction of dropped writes in these Jepsen's demos can vary wildly for all these reasons, which means we can't make general assertions about how bad the possibility of write loss really is. Mongo could lose almost all your writes, or none at all. It completely depends on the nature of your network, application, server topology, hardware, load, and the failure itself.

To apply these findings to your systems–especially in fuzzy, probabilistic ways–you'll need to measure your assumptions about how your system behaves. Write an app that hits your API and records responses. Cause some failures and see whether the app's log of what happened lines up with the final state of the system. The results may be surprising.

Measurement isn't something you do just once. Ideally, your production systems should be instrumented continuously for performance and correctness. Some of these failure modes leave traces you can detect.

Some people claim that partitions don't happen to them. If you run in EC2 or other virtualized environments, noisy neighbors and network congestion/failures are a well-known problem. Running your own hardware doesn't make you immune either: Amazon, with some of the best datacenter engineers on the planet, considers partitions such a major problem that they were willing to design and build Dynamo. You are probably not Amazon.

Even if your network is reliable, logical failures can be partitions, too. Nodes which become so busy they fail to respond to heartbeats are a common cause of failover. Virtual machines can do all kinds of weird things to your network and clocks. Restoring from a backup can look like a partition resolving. These failures are hard to detect, so many people don't know they even occurred. You just… get slow for a while, or run across data corruption, weeks or years later, and wonder how what happened.

## Aiming for correctness

We've learned a bunch of practical lessons from these examples, and I'd like to condense them briefly:

Network errors mean “I don't know,” not “It failed.” Make the difference between success, failure, and indeterminacy explicit in your code and APIs. Consider extending consistency algorithms through the boundaries of your systems. Hand TCP clients ETags or vector clocks. Extend CRDTs to the browser itself.

Even well-known, widely deployed algorithms like two-phase commit have some caveats, like false negatives. SQL transactional consistency comes in several levels. You're probably not using the stronger ones, and if you are, your code needs to handle conflicts. It's not usually a big deal, but keep it on your mental checklist.

Certain problems are hard to solve well, like maintaining a single authoritative record of data with primary failover. Consistency is a property of your data, not of your nodes. Avoid systems which assume node consensus implies data consistency.

Wall clocks are only useful for ensuring responsiveness in the face of deadlock, and even then they're not a positive guarantee of correctness. Our clocks were completely synchronized in this demo and we still lost data. Even worse things can happen if a clock gets out of sync, or a node pauses for a while. Use logical clocks on your data. Distrust systems which rely on the system time, unless you're running GPS or atomic clocks on your nodes. Measure your clock skew anyway.

Avoid home-grown distributed algorithms. Where correctness matters, rely on techniques with a formal proof and review in the literature. There's a huge gulf between theoretically correct algorithm and living breathing software–especially with respect to latency–but a buggy implementation of a correct algorithm is typically better than a correct implementation of a terrible algorithm. Bugs you can fix. Designs are much harder to re-evaluate.

Choose the right design for your problem space. Some parts of your architecture demand consistency, and there is software for that. Other parts can sacrifice linearizability while remaining correct, like CRDTs. Sometimes you can afford to lose data entirely. There is often a tradeoff between performance and correctness: think, experiment, and find out.

Restricting your system with particular rules can make it easier to attain safety. Immutability is an incredibly useful property, and can be combined with a mutable CP data store for powerful hybrid systems. Use idempotent operations as much as possible: it enables all sorts of queuing and retry semantics. Go one step further, if practical, and use full CRDTs.

Preventing write loss in some weakly consistent databases, like Mongo, requires a significant latency tradeoff. It might be faster to just use Postgres. Sometimes buying ridiculously reliable network and power infrastructure is cheaper than scaling out. Sometimes not.

Replication between availability zones or between data centers is much more likely to fail than a rack or agg switch in your DC. Microsoft estimates their WAN links offer 99.5% availability, IIRC, and their LANs at 99.95%. Design your system accordingly.

## Embracing failure

All this analysis, measuring, and designing takes hard work. You may not have the money, experience, hardware, motivation, or time. Every system entails risk, and not quantifying that risk is a strategy in itself.

With that in mind, consider allowing your system to drop data. Spew data everywhere and repair it gradually with bulk processes. Garbage-collect structures instead of ensuring their correctness every time. Not everyone needs correct behavior right now. Some people don't ever need correct behavior. Look at the Facebook feed, or Twitter's DM light.

Code you can reason about is better than code you can't. Rely on libraries written and tested by other smart people to reduce the insane quantity of stuff you have to understand. If you don't get how to test that your merge function is associative, commutative, and idempotent, maybe you shouldn't be writing your own CRDTs just yet. Implementing two-phase commit on top of your database may be a warning sign.

Consistent, highly available systems are usually slow. There are proofs about the minimum number of network hops required to commit an operation in a CP system. You may want to trade correctness for performance for cost reasons, or to deliver a more responsive user experience.

I hope this work inspires you to test and improve your own distributed systems. The only reason I can talk about these mistakes is because I keep making them, over and over again. We're all in this together. Good luck. :)

http://github.com/aphyr/jepsen

## Thanks

Jepsen has consumed almost every hour of my life outside work for the last three months. I'm several hundred hours into the project now–and I couldn't have done it without the help and encouragement of friends and strangers.

My sincerest thanks to my fellow Boundary alumni Dietrich Featherston and Joe Damato for the conversations which sparked this whole endeavor. Salvatore Sanfilippo, Jordan West, Evan Vigil-McClanahan, Jared Rosoff, and Joseph Blomstedt were instrumental in helping me understand how these databases actually work. Stephen Strowes and someone whose name I've sadly forgotten helped me demonstrate partitions on a local cluster in the first place. My deepest appreciation to the Postgres team, the Redis project, 10Gen and Basho for their support, and for making cool databases available to everyone for free.

Sean Cribbs and Reid Draper clued me in to CRDTs and the problems of LWW. Tom Santero and Mark Phillips invited me to give this talk at RICON East. Jepsen wouldn't have existed without their encouragement, and I am continuously indebted to the pair. Zach Tellman, John Muellerleile, Josh O'Brien, Jared Morrow, and Ryan Zezeski helped refine my arguments and slides.

Hope I didn't forget anyone–if so, please drop me a line. Thanks for reading.

# Call me maybe: Riak

Previously in Jepsen, we discussed MongoDB. Today, we’ll see how last-write-wins in Riak can lead to unbounded data loss.

So far we’ve examined systems which aimed for the CP side of the CAP theorem, both with and without failover. We learned that primary-secondary failover is difficult to implement safely (though it can be done; see, for example, ZAB or Raft). Now I’d like to talk about a very different kind of database–one derived from Amazon’s Dynamo model.

Amazon designed Dynamo with the explicit goals of availability and partition tolerance–and partition-tolerant systems automatically handle node failure. It’s just a special kind of partition. In Dynamo, all nodes are equal participants in the cluster. A given object is identified by a key, which is consistently hashed into N slots (called “partitions”; not to be confused with a network partition) on a ring. Those N slots are claimed by N (hopefully distinct) nodes in the cluster, which means the system can, once data is replicated, tolerate up to N-1 node failures without losing data.

When a client reads from a Dynamo system, it specifys an R value: the number of nodes required to respond for a read to be successful. When it writes, it can specify W: the number of nodes which have to acknowledge the write. There’s also DW for “durable write”, and others. Riak has sometimes referred to these as “tunable CAP controls”: if you choose R=W=1, your system will be available even if all but one node fail–but you may not read the latest copy of data. If R + W is greater than N/2, you’re “guaranteed to read acknowledged writes”, with caveats. The defaults tend to be R=W=quorum, where quorum is N/2+1.

Dynamo handles partitions by healing the ring. Each connected set of machines establishes a set of fallback vnodes, to handle the portions of the ring which are no longer accessible. Once failover is complete, a Dynamo cluster split into two disjoint components will have two complete hash rings, and (eventually, as repair completes) 2 * N copies of the data (N in each component). When the partition heals, the fallback vnodes engage in hinted handoff, giving their data back to the original “primary” vnodes.

Since any node can accept writes for its portion of the keyspace, a Dynamo system can theoretically achieve 100% availability, even when the network fails entirely. This comes with two drawbacks. First, if no copy of a given object is available in an isolated set of nodes, that part of the cluster can accept writes for that object, but the first reads will return 404. If you’re adding items to a shopping cart and a partition occurs, your cart might appear to be empty. You could add an item to that empty cart, and it’d be stored, but depending on which side of the partition you talk to, you might see 20 items or just one.

When the partition heals, we have a new problem: it’s not clear which version of an object is authoritative. Dynamo employs a causality-tracing algorithm called vector clocks, which means it knows which copies of an object have been overwritten by updates, and which copies are actually conflicts–causally unconnected–due to concurrent writes.

Concurrent. We were talking about partitions, right? Two writes are concurrent if they happen in different components and can’t see each other’s changes, because the network didn’t let them communicate.

Well that’s interesting, because we’re also used to concurrency being a property of normal database systems. If two people read an object, then write it back with changes, those writes will also conflict. In a very real sense, partitions are just really big windows of concurrency. We often handle concurrent writes in relational databases with multi-version concurrency control or locks, but we can’t use locks here because the time horizons could be minutes or hours, and there’s no safe way to distribute a lock algorithm over a partition. We need a different approach. We need to be able to merge arbitrary conflicting objects for Dynamo to work. From the paper:

For instance, the application that maintains customer shopping carts can choose to “merge” the conflicting versions and return a single unified shopping cart. Despite this flexibility, some application developers may not want to write their own conflict resolution mechanisms and choose to push it down to the data store, which in turn chooses a simple policy such as “last write wins”.

Last write wins. That sounds like a timestamp. Didn’t we learn that Clocks Are Not To Be Trusted? Let’s try it and find out!

## Riak with last-write-wins

Riak is an excellent open-source adaptation of the Dynamo model. It includes a default conflict resolution mode of last-write-wins, which means that every write includes a timestamp, and when conflicts arise, it picks the one with the higher timestamp. If our clocks are perfectly synchronized, this ensures we pick the most recent value.

To be clear: there are actually two settings in Riak which affect conflict resolution: lww=true, which turns off vector clock analysis entirely, and allow-mult=false, which uses vector clocks but picks the sibling with the highest timestamp. Allow-mult=false is safer, and that’s the setting I’m referring to by “last write wins.” All cases of data loss in this post apply to both settings, though.

First, let’s install Riak, join the nodes together, and tell the cluster to commit the change:

salticid riak.setup salticid riak.join salticid riak.commit

You can watch the logs with salticid riak.tail. Watch salticid riak.transfers until there are no handoffs remaining. The cluster is now in a stable state.

For this particular application we’ll be adding numbers to a list stored in a single Riak object. This is a typical use case for Dynamo systems–the atomic units in the system are keys, not rows or columns. Let’s run the app with last-write-wins consistency:

lein run riak lww-sloppy-quorumWrites completed in 5.119 seconds 2000 total 2000 acknowledged 566 survivors 1434 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 1 2 3 4 6 8 ... 1990 1991 1992 1995 1996 1997 1.0 ack rate 0.717 loss rate

Riak lost 71% of acknowledged writes on a fully-connected, healthy cluster. No partitions. Why?

Remember how partitions and concurrency are essentially the same problem? Simultaneous writes are causally disconnected. If two clients write values which descend from the same object, Riak just picks the write with the higher timestamp, and throws away the other write. This is a classic data race, and we know how to fix those: just add a mutex. We’ll wrap all operations against Riak in a perfectly consistent, available distributed lock.

“But you can’t do that! That violates the CAP theorem!”

Clever girl. Jepsen lets us pretend, though:

lein run lock riak-lww-sloppy-quorumWrites completed in 21.475 seconds 2000 total 2000 acknowledged 2000 survivors All 2000 writes succeeded. :-D

Problem solved! No more write conflicts. Now let’s see how it behaves under a partition by running salticid jepsen.partition during a run:

237 :ok 242 :ok 247 :ok 252 :ok 257 :ok 262 timeout 85 :ok 204 timeout 203 timeout 106 :ok 209 timeout 267 timeout 90 :ok

The first thing you’ll notice is that our writes start to lag hard. Some clients are waiting to replicate a write to a majority of nodes, but one side of the partition doesn’t have a majority available. Even though Riak is an AP design, it can functionally become unavailable while nodes are timing out.

Those requests time out until Riak determines those nodes are inaccessible, and sets up fallback vnodes. Once the fallback vnodes are in place, writes proceed on both sides of the cluster, because both sides have a majority of vnodes available. This is by design in Dynamo. Allowing both components to see a majority is called a sloppy quorum, and it allows both components to continue writing data with full multi-node durability guarantees. If we didn’t set up fallback vnodes, a single node failure could destroy our data.

Before collecting results, let’s heal the cluster: salticid jepsen.heal. Remember to wait for Riak to recover, by waiting until salticid riak.transfers says there’s no data left to hand off.

Writes completed in 92.773 seconds 2000 total 1985 acknowledged 176 survivors 1815 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 85 90 95 100 105 106 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー｀)ノ (203 204 218 234 262 277) 0.9925 ack rate 0.91435766 loss rate 0.00302267 unacknowledged but successful rate

91% data lost. This is fucking catastrophic, ladies.

What happened? When the partition healed, Riak had two essentially two versions of the list: one from each side of the partition (plus some minorly divergent copies on each side). Last-write-wins means we pick the one with the higher timestamp. No matter what you do, all the writes from one side or the other will be discarded.

If your Riak cluster partitions, and you write to a node which can’t reach any of the original copies of the data, that write of a fresh object can overwrite the original record–destroying all the original data.

## Strict quorum

The problem is that we allowed writes to proceed on both sides of the partition. Riak has two more settings for reads and writes: PR and PW, for primary read and write, respectively. PR means you have to read a value from at least that many of the original owners of a key: fallback vnodes don’t count. If we set PR + PW >= quorum, operations against a given key will only be able to proceed on one component of a partitioned cluster. That’s a CP system, right?

lein run lock riak-lww-quorum274 :ok 1250 :ok 279 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pw_val_unsatisfied,2,1} 1381 :ok 277 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pr_val_unsatisfied,2,1}

Here we see the cluster denying a write and a read, respectively, to clients which can’t see a majority of the primary nodes for a key. Note that because the quorums are spread around the nodes, a Dynamo system will be partially available in this mode. In any given component, you’ll be able to read and write some fraction of the keys, but not others.

2000 total 1971 acknowledged 170 survivors 1807 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 86 91 95 96 100 101 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー｀)ノ (193 208 219 237 249 252) 0.9855 ack rate 0.9167935 loss rate 0.00304414 unacknowledged but successful rate

PR=PW=R=W=quorum still allowed 92% write loss. We reported failure for more writes than before, so that’s a start–but what gives? Shouldn’t this have been CP?

The problem is that that failed writes may still be partially successful. Dynamo is designed to preserve writes as much as possible. Even though a node might return “PW val unsatisfied” when it can’t replicate to the primary vnodes for a key, it may have been able to write to one primary vnode–or any number of fallback vnodes. Those values will still be exchanged during read-repair, considered as conflicts, and the timestamp used to discard the older value–meaning all writes from one side of the cluster.

This means the minority component’s failing writes can destroy all of the majority component’s successful writes. Repeat after me: Clocks. Are. Evil.

Is there no hope? Is there anything we can do to preserve my writes in Riak?

Yes. We can use CRDTs.

If we enable allow-mult in Riak, the vector clock algorithms will present both versions to the client. We can combine those objects together using a merge function. If the merge function is associative, commutative, and idempotent over that type of object, we can guarantee that it always converges to the same value regardless of the order of writes. If the merge function doesn’t discard data (like last-write-wins does), then it will preserve writes from both sides.

In this case, we’re accumulating a set of numbers. We can use set union as our merge function, or 2P sets, or OR sets, if we need to remove numbers.

lein run riak-crdtWrites completed in 80.918 seconds 2000 total 1948 acknowledged 2000 survivors All 2000 writes succeeded. :-D

CRDTs preserve 100% of our writes. We still have false negatives in this demo, because the client timed out on a few writes which Riak was still propagating, when the partition first began. False negatives are OK, though, because state-based CRDTs are idempotent. We can repeat our writes arbitrarily many times, in any order, without duplicating data.

Moreover, CRDTs are an AP design: we can write safely and consistently even when the cluster is totally partitioned–for example, when no majority exists. They’re also eventually consistent (in a safe, data-preserving sense) when components are partitioned away from all copies of a given object and are forced to start from scratch.

## Strategies for working with Riak

Enable allow-mult. Use CRDTs.

Seriously. LWW never should have been the standard behavior for a Dynamo system, but Basho made it the default after customers complained that they didn’t like the complexity of reasoning about siblings. Customers are the only reason Riak exists, and this behavior is gonna seem OK until you start experiencing partitions (and remember, fault tolerance is the reason you chose Riak in the first place), so we’re stuck with a default config which promotes simple-yet-dangerous behavior.

As a consequence of that decision, community resources which people rely on to learn how to use Riak are often aimed towards last-write-wins. Software isn’t just an artifact, but a culture around its use. I don’t really know what we can learn from this, besides the fact that engineering and culture are tough problems.

CRDTs may be too large, too complex, or too difficult to garbage-collect for your use case. However, even if you can’t structure your data as a full CRDT, writing a hacked-together merge function which just takes care of a couple important fields (say, set union over your friend list and logical OR over the other fields) can go a long way towards preventing catastrophic data loss.

There are cases where last-write-wins is a safe strategy. If your data is immutable, then it doesn’t matter which copy you choose. If your writes mean “I know the full correct state of this object at this time”, it’s safe. Many caches and backup systems look like this. If, however, your writes mean “I am changing something I read earlier,” then LWW is unsafe.

Finally, you can decide to accept dropped data. All databases will fail, in different ways, and with varying probabilities. Riak’s probability distribution might be OK for you.

Introducing locks is a bad idea. Even if they did prevent data loss–and as we saw, they don’t–you’ll impose a big latency cost. Moreover, locks restrict your system to being CP, so there’s little advantage to having an AP database. However, some really smart folks at Basho are working on adding Paxos rounds for writes which need to be CP. Having a real consensus protocol will allow Riak’s distributed writes to be truly atomic.

So: we’ve seen that Riak’s last-write-wins is fundamentally unsafe in the presence of network partitions. You can lose not only writes made during the partition, but all writes made at any time prior. Riak is an AP system, and its tunable CAP controls only allow you to detect some forms of write loss–not prevent it. You can’t add consistency to a database by tacking on a lock service because wall clock time doesn’t matter: consistency is a causal property of the relationships between the writes themselves. AP systems involve fundamentally different kinds of data structures, with their own unique tradeoffs.

In the next post, we’ll review what we’ve learned from these four distributed systems, and where we go from here.

# Call me maybe: MongoDB

Previously in Jepsen, we discussed Redis. In this post, we'll see MongoDB drop a phenomenal amount of data.

MongoDB is a document-oriented database with a similar distribution design to Redis. In a replica set, there exists a single writable primary node which accepts writes, and asynchronously replicates those writes as an oplog to N secondaries. However, there are a few key differences.

First, Mongo builds in its leader election and replicated state machine. There's no separate system which tries to observe a replica set in order to make decisions about what it should do. The replica set decides among itself which node should be primary, when to step down, how to replicate, etc. This is operationally simpler and eliminates whole classes of topology problems.

Second, Mongo allows you to ask that the primary confirm successful replication of a write by its disk log, or by secondary nodes. At the cost of latency, we can get stronger guarantees about whether or not a write was successful.

What happens when a primary becomes inaccessible?

The remaining secondaries will gradually detect the failed connection and attempt to come to a consensus about what to do. If they have a majority (and remember, there can be only one majority in a cluster, so this suggests we're heading towards a CP system), they'll select the node with the highest optime (a monotonic clock maintained by each node) and promote it to be a new primary. Simultaneously, the minority nodes will detect that they no longer have a quorum, and demote the primary to a secondary so it can't accept writes.

If our primary is on n1, and we cut off n1 and n2 from the rest of the cluster, we expect either n3, n4, or n5 to become the new primary. Because this architecture demotes the original primary on n1, we won't find ourselves in the same split-brain problem we saw with Redis.

## Consistency

So is MongoDB CP? There's a popular notion that MongoDB is a CP system, including exchanges like this, where all kinds of nuanced technical assertions about strong consistency are thrown around. At the same time, Mongo's documentation for replica sets explains carefully that Mongo may “revert operations”:

In some failover situations primaries will have accepted write operations that have not replicated to the secondaries after a failover occurs. This case is rare and typically occurs as a result of a network partition with replication lag. When this member (the former primary) rejoins the replica set and attempts to continue replication as a secondary the former primary must revert these operations or “roll back” these operations to maintain database consistency across the replica set.

“Revert” certainly doesn't sound like linearizability to me, but that bit about “maintain[ing] database consistency” doesn't sound so bad. What actually happens? Let's find out!

For this example, we'll be adding integers to a list in a MongoDB document by using the update command in a CaS loop–just like you'd use with any transactionally isolated database. Yes, we could use $addInSet, but I'm using this app as an example of atomic updates in general, and they have different oplog dynamics. ## Unacknowledged Up until recently, clients for MongoDB didn't bother to check whether or not their writes succeeded, by default: they just sent them and assumed everything went fine. This goes about as well as you'd expect. lein run mongo-unsafe -n 6000 salticid jepsen.partition For a while, writes continue to complete against n1. Then we see errors as the replica set fails over, like 3186 No replica set members available in [ { address:'n3/10.10.3.101:27017', ok:true, ping:0.8954104, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n4/10.10.3.95:27017', ok:true, ping:0.681164, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n5/10.10.3.32:27017', ok:true, ping:0.6231328, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n2/10.10.3.52:27017', ok:true, ping:0.51316977, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n1/10.10.3.242:27017', ok:true, ping:0.37008655, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, } ] for { "mode" : "primary"} During this time, the majority nodes (n3, n4, n5) are still secondaries, but they've agreed that the old primary is inaccessible. They compare optimes and race to elect a leader: $ salticid mongo.rs_stat 22:09:08 Starting... 22:09:08 MongoDB shell version: 2.4.1 22:09:08 connecting to: test 22:09:08 n1:27017 (not reachable/healthy) 1368940104/56 22:09:08 n2:27017 (not reachable/healthy) 1368940103/458 22:09:08 n3:27017 SECONDARY 1368940104/89 22:09:08 n4:27017 SECONDARY 1368940104/89 22:09:08 n5:27017 SECONDARY 1368940104/102 22:09:08 true 22:09:08 Finished22:09:23 n1:27017 (not reachable/healthy) 1368941926/66 22:09:23 n2:27017 (not reachable/healthy) 1368941961/70 22:09:23 n3:27017 SECONDARY 1368941962/9 22:09:23 n4:27017 SECONDARY 1368941961/45 22:09:23 n5:27017 PRIMARY 1368941963/11

N5 wins the race, and proceeds to accept writes. If we heal the partition with salticid jepsen.heal, and wait a few seconds, the nodes will detect the fully connected cluster and the new primary will step down, to allow n1 to resume its place. Now that the cluster has stabilized, we hit enter to check how many of our writes survived:

Hit enter when ready to collect results. Writes completed in 93.608 seconds 6000 total 5700 acknowledged 3319 survivors 2381 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 469 474 479 484 489 494 ... 3166 3168 3171 3173 3178 3183 0.95 ack rate 0.4177193 loss rate 0.0 unacknowledged but successful rate

42% write loss. Well, to some extent, this shouldn't be surprising, because we weren't checking to see whether the server was successful in applying our writes. Those 300 errors only came about when we tried to write to a secondary. But we never actually crashed a node, and we didn't see any signs of a split-brain condition with two simultaneous primaries–so why did Mongo drop data?

Remember those writes that completed on n1 just after the partition started? Those writes are still on n1, but never made it to n5. N5 proceeded without them. Now n1 and n5 are comparing notes, and n1 realizes that n5's optime is higher. N1 figures out the last point where the two agreed on the oplog, and rolls back to that point.

22:09:33 Sun May 19 05:09:33.032 [rsHealthPoll] replSet member n5:27017 is now in state PRIMARY 22:09:33 Sun May 19 05:09:33.207 [initandlisten] connection accepted from 10.10.3.95:37718 #6154 (23 connections now open) 22:09:33 Sun May 19 05:09:33.417 [rsBackgroundSync] replSet syncing to: n5:27017 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet our last op time fetched: May 19 05:08:37:2 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replset source's GTE: May 19 05:09:26:1 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet rollback 0 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet ROLLBACK 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet rollback 1 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet rollback 2 FindCommonPoint 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback our last optime: May 19 05:08:37:2 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback their last optime: May 19 05:09:33:32 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback diff in end of log times: -56 seconds 22:09:35 Sun May 19 05:09:33.621 [initandlisten] connection accepted from 10.10.3.32:59066 #6155 (24 connections now open) 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet rollback found matching events at May 19 05:08:24:66 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet rollback findcommonpoint scanned : 3798 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet replSet rollback 3 fixup 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 3.5 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 4 n:1 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet minvalid=May 19 05:09:35 51985e8f:19 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 4.6 22:09:35 Sun May 19 05:09:35.223 [rsBackgroundSync] replSet rollback 4.7 22:09:35 Sun May 19 05:09:35.223 [rsBackgroundSync] replSet rollback 5 d:0 u:1 22:09:35 Sun May 19 05:09:35.224 [rsBackgroundSync] replSet rollback 6 22:09:35 Sun May 19 05:09:35.236 [rsBackgroundSync] replSet rollback 7 22:09:35 Sun May 19 05:09:35.238 [rsBackgroundSync] replSet rollback done 22:09:35 Sun May 19 05:09:35.238 [rsBackgroundSync] replSet RECOVERING

During a rollback, all the writes the old primary accepted after the common point in the oplog are removed from the database and written to a BSON file in Mongo's rollbacks directory. If you're a sysadmin, you could go look at the rollback files to try and reconstruct the writes that the database dropped.

Well, theoretically. In my tests, it only does this in 1 out of 5 runs or so. Mostly, it just throws those writes away entirely: no rollback files, no nothing. I don't really know why.

This leads to an important discovery: it doesn't matter whether or not there were two primaries at the same time. We can still get conflicting writes if the old primary's state is causally unconnected from the new primary. A primary/secondary system, by itself, is not sufficient. We have to actually track causality on the writes themselves in order to be CP. Otherwise, newly elected primaries could diverge from the old one.

## Safe

Aha! But that was with the old “unsafe” write concern! We should use the Safe write concern!

lein run mongo-safe -n 6000 ... 6000 total 5900 acknowledged 3692 survivors 2208 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 458 463 468 473 478 483 ... 3075 3080 3085 3090 3095 3100 0.98333335 ack rate 0.3742373 loss rate 0.0 unacknowledged but successful rate

## Replicas-safe

WriteConcern.SAFE only verifies that the write was accepted by the primary. We need to make sure that the replicas have received our write before considering it a success.

lein run mongo-replicas-safe -n 6000 ... 6000 total 5695 acknowledged 3768 survivors 1927 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 712 717 722 727 732 737 ... 2794 2799 2804 2809 2814 2819 0.94916666 ack rate 0.338367 loss rate 0.0 unacknowledged but successful rate

Mongo still rolled back our writes. Why? Because REPLICAS_SAFE only checks to see if the write took place against two replicas. Our cluster has five nodes, so it's possible for writes to exist only on n1 and n2. A new primary can be elected without having seen our write. We need to wait until our write has been acknowledged by a majority of nodes.

## Majority

lein run mongo -n 6000

Using WriteConcern.MAJORITY, we notice an improvement! When we cause the partition, writes pause immediately. The clients are blocked, waiting for the primary to confirm acknowledgement on nodes which will never respond. Eventually they time out. This is a hallmark of a CP system: we shouldn't be able to make progress without talking to a majority of nodes.

Writes completed in 157.425 seconds 6000 total 5700 acknowledged 5701 survivors 2 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ (596 598) 3 unacknowledged writes found! ヽ(´ー｀)ノ (562 653 3818) 0.95 ack rate 1.754386E-4 loss rate 5.2631577E-4 unacknowledged but successful rate

So 3 writes which supposedly failed actually succeeded. That's not so bad. On the other hand, Mongo still dropped two “successful” writes. Writes which were supposedly acknowledged by a majority of nodes.

I've been talking with 10gen, and they think this is a bug. When the network partitions, the server just checks off the “OK” field for the client's WriteConcern request, and sends it back. The client sees the “OK” message and… sensibly presumes the write was OK. This should be fixed in master, but is still present in 2.4.3, the most recent release.

Even if this bug is fixed, Mongo still isn't consistent. Those three writes which “failed” but showed up in the result set? Those are writes which were replicated to a majority node just prior to the partition, but never had the chance to acknowledge. Single writes are not atomic without a proper consensus protocol: those failed writes could materialize never, now, or some time in the future; potentially overwriting valid data.

## Strategies for working with Mongo

On the one hand, Mongo advocates usually tell me “but network partitions are exceedingly rare in practice.” Then I talk to Mongo users who report their cluster fails over on a weekly basis. One thing to keep in mind is that heavy load–like seasonal writes, recovering from a crash, or performing a rollback–can slow a node down to the point where other nodes declare it dead. This is a partition. I've seen my test cluster perform dozens of rollbacks as nodes go unavailable attempting to elect a new primary. You should probably instrument your cluster to watch for these events in production.

As we've discussed before, one option is simply to accept data loss. Not all applications need consistency.

At the same time, you should watch those rollback files. Sometimes they don't appear even though they're supposed to, and not all data types will actually be rolled back. Conflicts in capped collections, for example, appear to simply discard all data in the collection past the conflict point by design.

People use capped collections for distributed queues. Think about that for a minute.

Moreover, a rollback file doesn't give you enough information to actually reconstruct the correct state of the system–at least in general. It's just a snapshot of “some state” the database had to discard. Because there's no well-defined ordering for these writes, you'll have to decide what that means for your particular data structures. If you can structure your documents as CRDTs and write a merge function, you'll be able to safely merge. If there's no conflicting copy of the document in the database, and you never delete those kinds of documents, you can restore it automatically. Immutable records can always be recovered, too.

Finally, you can drastically reduce the probability of write loss by using WriteConcern.MAJORITY. This is gonna impose a big performance hit. That's another hallmark of more-available CP systems.

To recap: MongoDB is neither AP nor CP. The defaults can cause significant loss of acknowledged writes. The strongest consistency offered has bugs which cause false acknowledgements, and even if they're fixed, doesn't prevent false failures.

In the next post, we'll talk about a database which emphasizes availability and partition tolerance: Riak.

# Call me maybe: Redis

Previously on Jepsen, we explored two-phase commit in Postgres. In this post, we demonstrate Redis losing 56% of writes during a partition.

Redis is a fantastic data structure server, typically deployed as a shared heap. It provides fast access to strings, lists, sets, maps, and other structures with a simple text protocol. Since it runs on a single server, and that server is single-threaded, it offers linearizable consistency by default: all operations happen in a single, well-defined order. There’s also support for basic transactions, which are atomic and isolated from one another.

Because of this easy-to-understand consistency model, many users treat Redis as a message queue, lock service, session store, or even their primary database. Redis running on a single server is a CP system, so it is consistent for these purposes.

Redis offers asynchronous primary->secondary replication. A single server is chosen as the primary, which can accept writes. It relays its state changes to secondary servers, which follow along. Asynchronous means that you don’t have to wait for a write to be replicated before the primary returns a response to the client. Writes will eventually arrive on the secondaries, if we wait long enough. In our application, all 5 clients will read from the primary on n1, and n2–n5 will be secondaries.

This is still a CP system, so long as we never read from the secondaries. If you do read from the secondaries, it’s possible to read stale data. That’s just fine for something like a cache! However, if you read data from a secondary, then write it to the primary, you could inadvertently destroy writes which completed but weren’t yet replicated to the secondaries.

What happens if the primary fails? We need to promote one of the secondary servers to a new primary. One option is to use Heartbeat or a STONITH system which keeps a link open between two servers, but if the network partitions we don’t have any way to tell whether the other side is alive or not. If we don’t promote the primary, there could be no active servers. If we do promote the primary, there could be two active servers. We need more nodes.

If one connected component of the network contains a majority (more than N/2) of nodes, we call it a quorum. We’re guaranteed that at most one quorum exists at any point in time–so if a majority of nodes can see each other, they know that they’re the only component in that state. That group of nodes (also termed a “component”) has the authority to promote a new primary.

Redis has a system called Sentinel, which, when configured correctly, will try to establish a quorum between Sentinel nodes, agree on which Redis servers are alive, and promote any which appear to have failed. If we colocate the Sentinel nodes with the Redis nodes, this should allow us to promote a new primary in the majority component (should one exist).

What are the consistency and availability properties of Sentinel? Antirez, the author of Redis, says:

Redis Cluster for instance is a system biased towards consistency rather than availability. Redis Sentinel itself is an HA solution with the dogma of consistency and master slave setups.“

So we expect this system to be CP. Nodes in the minority component will become unavailable during the partition, and the majority component will elect a new primary. The Sentinels will then order clients to abandon the old primary and reconnect to the new one.

Before we begin, it’s important to recognize that Redis does not guarantee durability. Since writes to disk and replication to secondaries are asynchronous, we can lose up to N seconds of the most recent writes. We should not, however, see gaps in the write log. If write n is present, so are writes 0, 1, … n-2, n-1.

## Partitioning the cluster

Here’s a simple application which writes a list of numbers to a Redis set. At this time Carmine, the Clojure Redis client, doesn’t yet support failover using Sentinel. I’ve implemented a stricter version of the Sentinel client algorithm here: asking the server for a new primary before every write. Sentinel actually states that clients should only select new primaries when their connection is closed, which leaves a wider window for clients to disagree about which primary to use–leading to the possibility of more conflicting writes.

Let’s give it a shot. First, set up Redis:

salticid redis.setup

Then, in two terminals, start up Redis and Redis Sentinel:

salticid redis.startsalticid redis.sentinel

You should see messages go by as the sentinels discover one another and ensure all the nodes are properly configured. You can check the replication status with salticid redis.replication. salticid redis.stop will shut down the Redis servers and sentinels alike.

Now let’s run our application with lein run redis, then partition nodes n1 and n2 away from n3, n4, and n5 by running salticid jepsen.partition.

376 :ok 378 :ok 382 :ok 384 :ok 380 :ok 381 :ok 383 :ok 389 :ok 385 :ok

The first thing you’ll notice is that even though n1 can’t possibly be replicating its writes to n3, n4, and n5, writes against it are still completing successfully. N1 still thinks it’s the primary, and since replication is asynchronous, it’s acknowledging writes before they’re sent to others in the cluster. The sentinels notice the failure, and n3, n4, and n5’s sentinels promote a new primary:

19 May 00:37:36.314 # +sdown master mymaster 10.10.3.242 6379 19 May 00:37:36.616 # +sdown slave 10.10.3.52:6379 10.10.3.52 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.52:26379 10.10.3.52 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.242:26379 10.10.3.242 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:37.521 # +odown master mymaster 10.10.3.242 6379 #quorum 3/3 19 May 00:37:48.041 # +failover-detected master mymaster 10.10.3.242 6379 19 May 00:37:48.142 * +slave-reconf-inprog slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:48.143 * +slave-reconf-inprog slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.243 # +failover-end master mymaster 10.10.3.242 6379

Now n5 is a new primary–but n1 is still a primary too! Run salticid redis.replication to see the replication status of all nodes. We have two primary nodes, one in each component of the system. During this time both primaries are accepting writes independently. This is a classic split-brain scenario–and it violates the C in CP. Writes (and reads) in this state are not linearizable, because clients will see different results based on which node they’re talking to.

## Healing the partition

What happens when the network comes back online? salticid jepsen.heal repairs the partition, and the Sentinel nodes will discover each other again.

Redis Sentinel used to leave both primaries running indefinitely, which meant that any scenario like a partition or crash leading to failover would result in permanent split-brain. That’s fixed in version 2.6.13, which came out last week. Now, Sentinel demotes the old primary on n1 when it comes back into contact with the majority component. The client sees:

1687 :ok 1686 READONLY You can't write against a read only slave. 1690 READONLY You can't write against a read only slave. 1693 :ok

… since n1 stepped down just after a Sentinel told us it was a primary. Clients are a part of the distributed system too. If a system’s correctness depends on clients choosing specific nodes at specific times, the clients are now engaged in a distributed consensus problem–not to mention a clock synchronization problem. This is damn hard to do correctly.

## Results

1991 :ok 1995 :ok 1996 :ok Hit enter when ready to collect results. Writes completed in 42.002 seconds 2000 total 1998 acknowledged 872 survivors 1126 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 50 51 52 53 54 55 ... 1671 1675 1676 1680 1681 1685 0.999 ack rate 0.5635636 loss rate 0.0 unacknowledged but successful rate

Out of 2000 writes, Redis claimed that 1998 of them completed successfully. However, only 872 of those integers were present in the final set. Redis threw away 56% of the writes it told us succeeded.

There are two problems at work here. First, notice that all the clients lost writes at the beginning of the partition: (50, 51, 52, 53, …). That’s because they were all writing to n1 when the network dropped–and since n1 was demoted later, any writes made during that window were destroyed.

The second problem was caused by split-brain: both n1 and n5 were primaries up until the partition healed. Depending on which node they were talking to, some clients might have their writes survive, and others have their writes lost. The last few numbers in the set, mod 5, are all 0 and 1: the clients which kept using n1 as a primary, in the minority partition.

Note that both of these failure modes violate the durability guarantees we claimed earlier for Redis, because there are gaps in the write log.

## Redis strategies

So you’re running a distributed Redis install, and have realized that the design of Redis Sentinel (or, for that matter, any other failover system on top of an asynchronously replicated primary-secondary design) means you can lose a lot of data when a partition occurs. What can you do?

From an operations perspective, I recommend you try to understand the Sentinel consensus algorithm. I don’t, and I’ve read it a dozen times.

I tried to write a formal verification of the algorithm in TLA+, and failed. There are dozens of interacting rules which can lead to phenomenally complex edge cases. The whole thing relies on clocks–and a special mode, TILT, which tries to detect sudden clock skew. You can specify a quorum which is smaller than the number of sentinels, allowing multiple quorums to operate simultaneously. Since the system auto-discovers peers, you’ve got to make sure nobody lets a new sentinel talk to your cluster, or you might find yourself with a quorum smaller than N/2. Client, sentinel, and Redis server topologies are all different things, which (I think) means…

• Sentinels could promote a node no clients can see
• Sentinels could demote the only node clients can actually reach
• Sentinels could assume a totally connected group of servers is unavailable
• Sentinels could promote an isolated node in a minority component, then destroy data on the majority by demoting their primary later

I (tentatively) recommend running exactly one sentinel on each server node, to force server and sentinel network topologies to align. Unless the partition doesn’t happen in the network, but somewhere upwards of layer 3. Let’s not talk about that possibility.

As an application developer working with Redis, one option is simply to estimate and accept your data loss. Not all applications have to be consistent. Microsoft estimates their WAN links have about 99.5% availability, and their datacenter networks are about 10x more reliable, going down for about 4 hours per year. Not all network failures result in this kind of partition. If you’re running good network hardware in redundant configurations in real datacenters (e.g. not EC2), you cut your probability of partition down pretty far. Plenty of important applications can tolerate data loss for a few hours a year.

If you can’t tolerate data loss, Redis Sentinel (and by extension Redis Cluster) is not safe for use as:

• A lock service
• A queue
• A database

If you use Redis as a lock service, this type of partition means you can take out the same lock twice–or up to N times for N nodes! Or maybe multiple times concurrently, against the same node, if you want to get weird about it. Write loss means locks can be resurrected from the dead, or vanish even when supposedly held. Bottom line: distributed lock services must be CP. Use a CP consensus system, like Zookeeper.

If you use Redis as a queue, it can drop enqueued items. However, it can also re-enqueue items which were removed. An item might be delivered zero, one, two, or more times. Most distributed queue services can provide reliable at-most-once or at-least-once delivery. CP queue systems can provide reliable exactly-once delivery with higher latency costs. Use them if message delivery is important.

If you use Redis as a database, be prepared for clients to disagree about the state of the system. Batch operations will still be atomic (I think), but you’ll have no inter-write linearizability, which almost all applications implicitly rely on. If you successfully write A, then B, you expect that any client which can see B can also see A. This is not the case. Be prepared for massive write loss during a partition, depending on client, server, and sentinel topology.

Because Redis does not have a consensus protocol for writes, it can’t be CP. Because it relies on quorums to promote secondaries, it can’t be AP. What it can be is fast, and that’s an excellent property for a weakly consistent best-effort service, like a cache. Redis Sentinel can do a great job of keeping your caches warm even in the face of network and node failure, and helping clients to gradually discover the correct nodes to interact with. Use Redis Sentinel for caching, sampling, statistics, and messaging where getting the wrong answer doesn’t hurt much. Occasional windows of 50% write loss may be just fine for your user feeds, hit counters, or upvotes.

In the next post, we’ll learn about a database with a related replication architecture: MongoDB.

# Call me maybe: Postgres

Previously on Jepsen, we introduced the problem of network partitions. Here, we demonstrate that a few transactions which “fail” during the start of a partition may have actually succeeded.

Postgresql is a terrific open-source relational database. It offers a variety of consistency guarantees, from read uncommitted to serializable. Because Postgres only accepts writes on a single primary node, we think of it as a CP system in the sense of the CAP theorem. If a partition occurs and you can’t talk to the server, the system is unavailable. Because transactions are ACID, we’re always consistent.

Right?

Well… almost. Even though the Postgres server is always consistent, the distributed system composed of the server and client together may not be consistent. It’s possible for the client and server to disagree about whether or not a transaction took place.

Postgres' commit protocol, like most relational databases, is a special case of two-phase commit, or 2PC. In the first phase, the client votes to commit (or abort) the current transaction, and sends that message to the server. The server checks to see whether its consistency constraints allow the transaction to proceed, and if so, it votes to commit. It writes the transaction to storage and informs the client that the commit has taken place (or failed, as the case may be.) Now both the client and server agree on the outcome of the transaction.

What happens if the message acknowledging the commit is dropped before the client receives it? Then the client doesn’t know whether the commit succeeded or not! The 2PC protocol says that we must wait for the acknowledgement message to arrive in order to decide the outcome. If it doesn’t arrive, 2PC deadlocks. It’s not a partition-tolerant protocol. Waiting forever isn’t realistic for real systems, so at some point the client will time out and declare an error occurred. The commit protocol is now in an indeterminate state.

To demonstrate this, we’ll need an install of Postgres to work with.

salticid postgres.setup

This installs Postgres from apt, uploads some config files from jepsen/salticid/postgres, and creates a database for Jepsen. Then we’ll run a simple application which writes a single row for each number, inside a transaction.

cd salticid lein run pg -n 100

If all goes well, you’ll see something like

... 85 :ok 91 :ok 90 :ok 95 :ok 96 :ok Hit enter when ready to collect results. Writes completed in 0.317 seconds 100 total 100 acknowledged 100 survivors All 100 writes succeeded. :-D

Each line shows the number being written, followed by whether it was OK or not. In this example, all five nodes talk to a single postgres server on n1. Out of 100 writes, the clients reported that all 100 succeeded–and at the end of the test, all 100 numbers were present in the result set.

Now let’s cause a partition. Since this failure mode only arises when the connection drops after the server decides to acknowledge, but before the client receives it, there’s only a short window in which to begin the partition. We can widen that window by slowing down the network:

salticid jepsen.slow

Now, we start the test:

lein run pg

And while it’s running, cut off all postgres traffic to and from n1:

salticid jepsen.drop_pg

If we’re lucky, we’ll manage to catch one of those acknowledgement packets in flight, and the client will log an error like:

217 An I/O error occurred while sending to the backend. Failure to execute query with SQL: INSERT INTO "set_app" ("element") VALUES (?) :: [219] PSQLException: Message: An I/O error occured while sending to the backend. SQLState: 08006 Error Code: 0 218 An I/O error occured while sending to the backend.

After that, new transactions will just time out; the client will correctly log these as failures:

220 Connection attempt timed out. 222 Connection attempt timed out.

We can resolve the partition with salticid jepsen.heal, and wait for the test to complete.

1000 total 950 acknowledged 952 survivors 2 unacknowledged writes found! ヽ(´ー｀)ノ (215 218) 0.95 ack rate 0.0 loss rate 0.002105263 unacknowledged but successful rate

So out of 1000 attempted writes, 950 were successfully acknowledged, and all 950 of those writes were present in the result set. However, two writes (215 and 218) succeeded, even though they threw an exception claiming that a failure occurred! Note that this exception doesn’t guarantee that the write succeeded or failed: 217 also threw an I/O error while sending, but because the connection dropped before the client’s commit message arrived at the server, the transaction never took place.

There is no way to distinguish these cases from the client. A network partition–and indeed, most network errors–doesn’t mean a failure. It means the absence of information. Without a partition-tolerant commit protocol, like extended three-phase commit, we cannot assert the state of the system for these writes.

## 2PC strategies

Two-phase commit protocols aren’t just for relational databases. They crop up in all sorts of consensus problems. Mongodb’s documents essentially comprise an asynchronous network, and many users implement 2PC on top of their Mongo objects to obtain multi-key transactions.

If you’re working with two-phase commit, there are a few things you can do. One is to accept false negatives. In most relational databases, the probability of this failure occurring is low–and it can only affect writes which were in-flight at the time the partition began. It may be perfectly acceptable to return failures to clients even if there’s a small chance the transaction succeeded.

Alternatively, you can use consistency guarantees or other data structures to allow for idempotent operations. When you encounter a network error, just retry them blindly. A highly available queue with at-least-once delivery is a great place to put repeatable writes which need to be retried later.

Finally, within some databases you can obtain strong consistency by taking note of the current transaction ID, and writing that ID to the database during the transaction. When the partition is resolved, the client can either retry or cancel the transaction at a later time, by checking whether or not that transaction ID was written. Again, this relies on having some sort of storage suitable for the timescales of the partition: perhaps a local log on disk, or an at-least-once queue.

In the next post, we look at a very different kind of consistency model: Redis Sentinel.

# Call me maybe: Carly Rae Jepsen and the perils of network partitions

Carly Rae Jepsen may be singing about the cute guy next door, but she's also telling a story about the struggle to communicate with someone who doesn't even know you're alive. The suspense of observation: did he see me? Did he see me see him? The risks of speaking your mind and being shot down–or worse, ignored. The fundamental unknowability of The Other, as Lacan would have it. In short, this is a song about distributed systems.

Modern software systems are composed of dozens of components which communicate over an asynchronous, unreliable network. Understanding the reliability of a distributed system's dynamics requires careful analysis of the network itself. Like most hard problems in computer science, this one comes down to shared state. A set of nodes separated by the network must exchange information: “Did I like that post?” “Was my write successful?” “Will you thumbnail my image?” “How much is in my account?”

At the end of one of these requests, you might guarantee that the requested operation…

• will be visible to everyone from now on
• will be visible to your connection now, and others later
• may not yet be visible, but is causally connected to some future state of the system
• is visible now, but might not be later
• may or may not be visible: ERRNO_YOLO

These are some examples of the complex interplay between consistency and durability in distributed systems. For instance, if you're writing CRDTs to one of two geographically replicated Riak clusters with W=2 and DW=1, you can guarantee that write…

• is causally connected to some future state of the system
• will survive the total failure of one node
• will survive a power failure (assuming fsync works) of all nodes
• will survive the destruction of an entire datacenter, given a few minutes to replicate

If you're writing to ZooKeeper, you might have a stronger set of guarantees: the write is visible now to all participants, for instance, and that the write will survive the total failure of up to n/2 - 1 nodes. If you write to Postgres, depending on your transaction's consistency level, you might be able to guarantee that the write will be visible to everyone, just to yourself, or “eventually”.

These guarantees are particularly tricky to understand when the network is unreliable.

## Partitions

Formal proofs of distributed systems often assume that the network is asynchronous, which means the network may arbitrarily duplicate, drop, delay, or reorder messages between nodes. This is a weak hypothesis: some physical networks can do better than this, but in practice IP networks will encounter all of these failure modes, so the theoretical limitations of the asynchronous network apply to real-world systems as well.

In practice, the TCP state machine allows nodes to reconstruct “reliable” ordered delivery of messages between nodes. TCP sockets guarantee that our messages will arrive without drops, duplication, or reordering. However, there can still be arbitrary delays–which would ordinarily cause the distributed system to lock indefinitely. Since computers have finite memory and latency bounds, we introduce timeouts, which close the connection when expected messages fail to arrive within a given time frame. Calls to read() on sockets will simply block, then fail.

Detecting network failures is hard. Since our only knowledge of the other nodes passes through the network, delays are indistinguishible from failure. This is the fundamental problem of the network partition: latency high enough to be considered a failure. When partitions arise, we have no way to determine what happened on the other nodes: are they alive? Dead? Did they receive our message? Did they try to respond? Literally no one knows. When the network finally heals, we'll have to re-establish the connection and try to work out what happened–perhaps recovering from an inconsistent state.

Many systems handle partitions by entering a special degraded mode of operation. The CAP theorem tells us that we can either have consistency (technically, linearizability for a read-write register), or availability (all nodes can continue to handle requests), but not both. What's more, few databases come close to CAP's theoretical limitations; many simply drop data.

In this series, I'm going to demonstrate how some real distributed systems behave when the network fails. We'll start by setting up a cluster and a simple application. In each subsequent post, we'll explore that application written for a particular database, and how that system behaves under partition.

## Setting up a cluster

You can create partitions at home! For these demonstrations, I'm going to be running a five node cluster of Ubuntu 12.10 machines, virtualized using LXC–but you can use real computers, virtual private servers, EC2, etc. I've named the nodes n1, n2, n3, n4, and n5: it's probably easiest to add these entries to /etc/hosts on your computer and on each of the nodes themselves.

We're going to need some configuration for the cluster, and client applications to test their behavior. You can clone http://github.com/aphyr/jepsen to follow along.

To run commands across the cluster, I'm using Salticid (http://github.com/aphyr/salticid). I've set my ~/.salticidrc to point to configuration in the Jepsen repo:

load ENV['HOME'] + '/jepsen/salticid/*.rb'

If you take a look at this file, you'll see that it defines a group called :jepsen, with hosts n1 … n5. The user and password for each node is 'ubuntu'–you'll probably want to change this if you're running your nodes on the public internet.

Try salticid -s salticid to see all the groups, hosts, and roles defined by the current configuration:

\$ salticid -s salticid Groups jepsen Hosts: n1 n2 n3 n4 n5 Roles base riak mongo redis postgres jepsen net Top-level tasks

First off, let's set up these nodes with some common software–compilers, network tools, etc.

salticid base.setup

The base role defines some basic operating system functions. base.reboot will reboot the cluster, and base.shutdown will unpower it.

The jepsen role defines tasks for simulating network failures. To cause a partition, run salticid jepsen.partition. That command causes nodes n1 and n2 to drop IP traffic from n3, n4, and n5–essentially by running

iptables -A INPUT -s n3 -j DROP iptables -A INPUT -s n4 -j DROP iptables -A INPUT -s n5 -j DROP

That's it, really. To check the current network status, run jepsen.status. jepsen.heal will reset the iptables chains to their defaults, resolving the partition.

To simulate slow networks, or networks which drop packets, we can use tc to adjust the ethernet interface. Jepsen assumes the inter-node interface is eth0. salticid jepsen.slow will add latency to the network, making it easier to reproduce bugs which rely on a particular message being dropped. salticid jepsen.flaky will probabilistically drop messages. Adjusting the inter-node latency and lossiness simulates the behavior of real-world networks under congestion, and helps expose timing dependencies in distributed algorithms–like database replication.

## A simple distributed system

In order to test a distributed system, we need a workload–a set of clients which make requests and record their results for analysis. For these posts, we're going to work with a simple application which writes several numbers to a list in a database. Each client app will independently write some integers to the DB. With five clients, client 0 writes 0, 5, 10, 15, …; client 1 writes 1, 6, 11, and so on.

For each write we record whether the database acknowledged the write successfully or whether there was an error. At the end of the run, we ask the database for the full set. If acknowledged writes are missing, or unacknowledged writes are present, we know that the system was inconsistent in some way: that the client application and the database disagreed about the state of the system.

In this series of blog posts, we're going to run this app against several distributed databases, and cause partitions during its run. In each case, we'll see how the system responds to the uncertainty of dropped messages. As the song might go:

I've written several implementations of this workload in Clojure. jepsen/src/jepsen/set_app.clj defines the application. (defprotocol SetApp ...) lists the functions an app has to implement, and (run n apps) sets up the apps and runs them in parallel, collects results, and shows any inconsistencies. Particular implementations live in src/jepsen/riak.clj, pg.clj,redis.clj, and so forth.

You'll need a JVM and Leiningen 2 to run this code. Once you've installed lein, and added it to your path, we're ready to go!

Next up on Jepsen, we take a look at how Postgresql's transaction protocol handles network failures.