# Call me maybe: Redis redux

In a recent blog post, antirez detailed a new operation in Redis: WAIT. WAIT is proposed as an enhancement to Redis' replication protocol to reduce the window of data loss in replicated Redis systems; clients can block awaiting acknowledgement of a write to a given number of nodes (or time out if the given threshold is not met). The theory here is that positive acknowledgement of a write to a majority of nodes guarantees that write will be visible in all future states of the system.

As I explained earlier, any asynchronously replicated system with primary-secondary failover allows data loss. Optional synchronous replication, antirez proposes, should make it possible for Redis to provide strong consistency for those operations.

WAIT means that if you run three nodes A, B, C where every node contains a Sentinel instance and a Redis instance, and you “WAIT 1” after every operation to reach the majority of slaves, you get a consistent system.

WAIT can be also used, by improving the failover procedure, in order to have a strong consistent system (no writes to the older master from the point the failure detection is positive, to the end of the failover when the configuration is updated, or alternative, disconnect the majority of slaves you can reach during the failure detection so that every write will fail during this time).

Antirez later qualified these claims:

I understand this not the “C” consistency of “CAP” but, before: the partition with clients and the (old) master partitioned away would receive writes that gets lost. after: under certain system models the system is consistent, like if you assume that crashed instances never start again. Of course, the existence of synchronous replication does not prove that the system is linearizable; only some types of failover preserve the ordering of writes.

As I showed in Call me maybe: Redis, Redis Sentinel will enter split-brain during network partitions, causing significant windows of data loss. Exactly how much data loss depends on the sentinel configuration and the failure topology. Antirez finally suggested that if we replace Redis Sentinel with a strongly consistent coordination service for failover, Redis WAIT could provide full linearizability.

## The failover proposal

In a five-node cluster, assume every write is followed by WAIT 2 to ensure that a majority of nodes have received the write. In the event of a failure, a strong external coordinator goes through the following election process:

1. Totally partition the old primary P1.
2. Of all reachable nodes, identify the node with the highest replication offset. Let that node be P2.
3. Promote P2.
4. Inform all reachable nodes that they are to follow P2.
5. Have all reachable clients switch to the new primary.

There are several serious problems with this design. I hinted at these issues in the mailing list with limited success. Kelly Sommers pointed out repeatedly that this design has the same issues as Cassandra’s CL.ALL. Replication alone does not ensure linearizability; we have to be able to roll back operations which should not have happened in the first place. If those failed operations can make it into our consistent timeline in an unsafe way, perhaps corrupting our successful operations, we can lose data.

… surprisingly I think that transactional rollbacks are totally irrelevant.

Ultimately I was hoping that antirez and other contributors might realize why their proposal for a custom replication protocol was unsafe nine months ago, and abandon it in favor of an established algorithm with a formal model and a peer-reviewed proof, but that hasn’t happened yet. Redis continues to accrete homegrown consensus and replication algorithms without even a cursory nod to formal analysis.

OK, fine. Let’s talk about the failover coordinator.

## The coordinator

Redis Sentinel is not linearizable; nor are its proposed improvements. Whatever failover system you’re planning to use here is going to need something stronger. In fact, we can’t even guarantee safety using a strong coordination service like ZooKeeper to serialize the failover operations, because ZooKeeper cannot guarantee the mutual exclusion of two services in the presence of message delays and clock skews. Let’s paper over that issue by introducing large delays and carefully ordering our timeouts.

It gets worse. Even if we did have a perfect mutex around the coordinator, two coordinators could issue messages to the same Redis nodes which arrive out of order. TCP does not guarantee ordering between two distinct TCP streams, which means we might see coordinator A initiate a failover process then time out halfway; followed by coordinator B which begins the failover process, only to be interrupted on some nodes by messages en-route through the network from coordinator A. Don’t believe me? TCP message delays have been reported in excess of ninety seconds. That one took out Github.

It gets even worse. If the original primary P1 is isolated from the coordinator, the coordinator will not be able to force P1 to step down. Indeed, P1 could remain a primary for the entire duration of a failover, accepting writes, making state changes, and attempting to replicate those changes to other nodes. This is dangerous because we cannot atomically guarantee that the new majority of nodes will reject those writes.

1. A client writes to P1, which replicates to secondaries S2, S3, S4, and S5.
2. The coordinator attempts to elect a new primary, and sees S2, S3, S4, and S5.
3. Without loss of generality, assume S2 has the highest replication offset. The coordinator promotes S2 to P2.
4. P1 receives acks from S3, S4, and S5, and, having reached a majority, returns success to the client.
5. The coordinator reparents S3, S4, and S5 to P2, destroying the write.

You might try to solve this by forcing S2–S5 into a read-only, non-replicating mode before attempting to promote a new primary, but that gets into a whole other morass of issues around multiple state transitions and partial failures. Suffice it to say: it’s difficult to solve this by simply pausing nodes first. Maybe impossible? I’m not sure.

Typically, replication protocols solve this problem by guaranteeing that writes from S1 can not be accepted after S2–S5 acknowledge to the coordinator that they will participate in a new cohort. This often takes the form of a ballot (Paxos), epoch (ZAB, Viewstamped Replication), or term (RAFT). Redis has no such construct, and antirez seems to eschew it as unnecessary:

In this model, it is possible to reach linearizability? I believe, yes, because we removed all the hard part, for which the strong protocols like Raft use epochs.

This brings us to a different, but related series of problems.

## The servers

By using the offset in the replication log as the determining factor in which nodes are promotable, the proposed failover design opens the door for significant data loss.

Imagine the following sequence:

1. The primary P1, with log offset O1, becomes isolated from S3, S4, and S5.
2. Clients writing to P1 see their operations using WAIT 2 fail.
3. S3 is promoted to P3, with offset O1=O3. Clients writing to P3 see their writes succeed, replicated to S4 and S5.
4. More operations occur on P1 than on P3. O1 becomes greater than O3.
5. The partition heals; the coordinator can see both P1 and P3.
6. The coordinator sees that O1 is higher than O3, and chooses P1 as the new primary.
7. P3 is demoted, and all its acknowledged writes are destroyed.

Don’t believe me? Here, let’s try it. Here’s a function which implements (more or less) the proposed coordinator algorithm. Note that we’re not demoting the original primary because it may not be reachable.

(defn elect! "Forces an election among the given nodes. Picks the node with the highest replication offset, promotes it, and re-parents the secondaries." [nodes] (let [highest (highest-node nodes)] (log "Promoting" highest) (with-node highest (redis/slaveof "no" "one")) (doseq [node (remove #{highest} nodes)] (log "Reparenting" node "to" highest) (with-node node (redis/slaveof highest 6379))))) 

And in the test, we’ll use WAIT to ensure that only writes which are successfully replicated to 2 or more replicas are considered successful:

 (add [app element] (try (redis/with-conn pool spec (redis/sadd key element)) ; Block for 2 secondaries (3 total) to ack. (let [acks (redis/with-conn pool spec (taoensso.carmine.protocol/send-request! "WAIT" 2 1000))] (if (< acks 2) (do (log "not enough copies: " acks) error) ok)) (catch Exception e (if (->> e .getMessage (re-find #"^READONLY")) error (throw e)))) 

I’m gonna punt on informing clients which node is the current primary; we’ll just issue set-add requests to each node independently. Jepsen only cares about whether successful writes are lost, so we’ll let those writes fail and log ‘em as unsuccessful.

Initially, the offset for all 5 nodes is 15. Writes complete successfully on P1 and fail on S2–S5.

We cut off P1 and S2 from S3, S4, and S5. S3, S4, and S5 all have equal offsets (1570), so we promote S3 to P3. As soon as the partition takes effect, writes to P1 begin to fail–we see not enough copies: 1, and an :error status for write 110, 115, and so on. Latencies on P1 jump to 1 second, since that’s how long we’re blocking for using WAIT.

Writes complete successfully on P3, since it can see a majority of nodes: itself, S4, and S5. We heal the partition and initiate a second election. Since P1’s offset (8010) is higher than P3’s (6487), we preserve P1 as a primary and demote all other nodes to follow it. All P3’s writes accepted during the partition are silently destroyed.

Note that there’s actually a window here where writes can successfully take place on either of P1 or P2 in a mixed sequence, depending on the order in which the secondaries are reparented. Both 560 and 562 complete successfully, even though 562 was written to S3, which was demoted at that point in time. Some weird opportunity for timing anomalies there.

These results are catastrophic. In a partition which lasted for roughly 45% of the test, 45% of acknowledged writes were thrown away. To add insult to injury, Redis preserved all the failed writes in place of the successful ones.

Two bugs amplify this problem. Note that this is the unstable branch, so this isn’t a huge deal right now:

First, Redis secondaries return -1 for their offset when they detect the primary is down. Returning a special status code makes sense… but not if you’re using the offset to determine which nodes become the primary. This could cause the highest nodes to appear the lowest, and vice versa. If a fresh node has offset 0, and all other nodes return offset -1, this could cause a cluster to erase all data ever written to it.

Second, Redis resets the replication offset to zero every time a node is promoted. Again, a reasonable choice in isolation, but it actually maximizes the chances that this particular failure mode will occur. The current design is biased towards data loss.

Even if these bugs were corrected, the problem could still occur. All that’s required is for more operations to happen on P1 than P3 after the two diverge.

## Going forward

Distributed systems design is really hard, but engineers continue to assume otherwise:

However I think that distributed systems are not super hard, like kernel programming is not super hard, like C system programming is not super hard. Everything new or that you don’t do in a daily basis seems super hard, but it is actually different concepts that are definitely things everybody here in this list can master.

For sure a few months of exposure will not make you able to provide work like Raft or Paxos, but the basics can be used in order to try to design practical systems, that can be improved over time.

I assert just the opposite: we need formal theory, written proofs, computer verification, and experimental demonstration that our systems make the tradeoffs we think they make. Throughout the Redis criticism thread and discussion on Twitter, I see engineers assuming that they understand the tradeoffs despite the presence of gaping holes in the system’s safety net.

This behavior endangers users.

These list threads and blog posts are the sources that users come to, years later, to understand the safety properties of our systems. They’ll read our conjectures and idle thoughts and tease out some gestalt, and use that to build their systems on top of ours. They’ll miss subtle differences in phrasing and they won’t read every reply. Most won’t do any reading at all; they’re not even aware that these problems could exist.

Engineers routinely characterize Redis’s reliability as “rock solid”.

This is part of why I engage in these discussions so vocally. As systems engineers, we continually struggle to erase the assumption of safety before that assumption causes data loss or downtime. We need to clearly document system behaviors so that users can make the right choices.

We must understand our systems in order to explain them–and distributed systems are hard to understand. That’s why it’s so important that we rely on formal models, on proofs, instead of inventing our own consensus protocols–because much of the hard work of understanding has been done already. We can build on that work. Implementing a peer-reviewed paper is vastly simpler than trying to design and verify an algorithm from scratch–or worse, evolving one piecemeal, comprised of systems which encode subtly different assumptions about their responsibilities to the world. Those designs lead to small gaps which, viewed from the right angle, become big enough to drive a truck through.

I wholeheartedly encourage antirez, myself, and every other distributed systems engineer: keep writing code, building features, solving problems–but please, please, use existing algorithms, or learn how to write a proof.

# Call me maybe: Strangeloop Hangout

Since the Strangeloop talks won’t be available for a few months, I recorded a new version of the talk as a Google Hangout.

# Call me maybe: Cassandra

Previously on Jepsen, we learned about Kafka’s proposed replication design.

Cassandra is a Dynamo system; like Riak, it divides a hash ring into a several chunks, and keeps N replicas of each chunk on different nodes. It uses tunable quorums, hinted handoff, and active anti-entropy to keep replicas up to date. Unlike the Dynamo paper and some of its peers, Cassandra eschews vector clocks in favor of a pure last-write-wins approach.

## Some Write Loses

If you read the Riak article, you might be freaking out at this point. In Riak, last-write-wins resulted in dropping 30-70% of writes, even with the strongest consistency settings (R=W=PR=PW=ALL), even with a perfect lock service ensuring writes did not occur simultaneously. To understand why, I’d like to briefly review the problem with last-write-wins in asynchronous networks.

In this causality diagram, two clients (far left and far right) add the elements “a”, “b”, and “c” to a set stored in an LWW register (middle line). The left client adds a, which is read by both clients. One client adds b, constructing the set [a b]. The other adds c, constructing the set [a c]. Both write their values back. Because the register is last-write-wins, it preserves whichever arrives with the highest timestamp. In this case, it’s as if the write from the client on the left never even happened. However, it could just as easily have discarded the write from the right-hand client. Without a strong external coordinator, there’s just no way to tell whose data will be preserved, and whose will be thrown away.

Again: in an LWW register, the only conditions under which you can guarantee your write will not be silently ignored are when the register’s value is immutable. If you never change the value, it doesn’t matter which copy you preserve.

Vector clocks avoid this problem by identifying conflicting writes, and allowing you to merge them together.

Because there’s no well-defined order for potential conflicts, the merge function needs to be associative, commutative, and idempotent. If it satisfies those three properties (in essence, if you can merge any values in any order and get the same result), the system forms a semilattice known as a CRDT, and you recover a type of order-free consistency known as lattice consistency. Last-write-wins is a particular type of CRDT–albeit, not a particularly good one, because it destroys information nondeterministically.

Early in Cassandra’s history, Cassandra chose not to implement vector clocks for performance reasons. Vclocks (typically) require a read before each write. By using last-write-wins in all cases, and ignoring the causality graph, Cassandra can cut the number of round trips required for a write from 2 to 1, and obtain a significant speedup. The downside is that there is no safe way to modify a Cassandra cell.

Some people claim you can serialize updates to a cell by perfectly synchronizing your clocks, using ConsistencyLevel.QUORUM or ALL, and using an external lock service to prevent simultaneous operations. Heck, the official Cassandra documentation even claims this:

As we’ll see throughout this post, the Cassandra documentation can be less than accurate. Here’s a Jepsen test which mutates the same cell repeatedly, using perfectly synchronized clocks, QUORUM consistency, and a perfect lock service:

lein run lock cassandra ... Writes completed in 200.036 seconds 2000 total 1009 acknowledged 724 survivors 285 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 1 3 6 8 11 13 ... 1986 1988 1991 1993 1996 1998 0.5045 ack rate 0.2824579 loss rate 0.0 unacknowledged but successful rate Losing 28% of your supposedly committed data is not serializable by any definition. Next question. ## CQL and CRDTs Without vector clocks, Cassandra can’t safely change a cell–but writing immutable data is safe. Consequently, Cassandra has evolved around those constraints, allowing you to efficiently journal thousands of cells to a single row, and to retrieve them in sorted order. Instead of modifying a cell, you write each distinct change to its own UUID-keyed cell. Then, at read time, you read all the cells back and apply a merge function to obtain a result. Cassandra’s query language, CQL, provides some collection-oriented data structures around this model: sets, lists, maps, and so forth. They’re CRDTs, though the semantics don’t align with what you’ll find in the INRIA paper–no G-sets, 2P-sets, OR-sets, etc. However, some operations are safe–for instance, adding elements to a CQL set: 0 unrecoverable timeouts Collecting results. Writes completed in 200.036 seconds 2000 total 2000 acknowledged 2000 survivors All 2000 writes succeeded. :-D That’s terrific! This is the same behavior we saw with G-sets in Riak. However, not all CQL collection operations are intuitively correct. In particular, I’d be wary of the index-based operations for lists, updating elements in a map, and any type of deletions. Deletes are implemented by writing special tombstone cells, which declare a range of other cells to be ignored. Because Cassandra doesn’t use techniques like OR-sets, you can potentially delete records that haven’t been seen yet–even delete writes from the future. Cassandra users jokingly refer to this behavior as “doomstones”. The important thing to remember is that because there are no ordering constraints on writes, one’s merge function must still be associative and commutative. Just as we saw with Riak, AP systems require you to reason about order-free data structures. In fact, Cassandra and Riak are (almost) formally equivalent in their consistency semantics–the primary differences are in the granularity of updates, in garbage collection/history compaction, and in performance. Bottom line: CQL collections are a great idea, and you should use them! Read the specs carefully to figure out whether CQL operations meet your needs, and if they don’t, you can always write your own CRDTs on top of wide rows yourself. ## Counters If you’re familiar with CRDTs, you might be wondering whether Cassandra’s counter type is a PN-counter–a commutative, monotonic data structure which can be incremented and decremented in an eventually consistent way. The answer is no: Cassandra (via Twitter, politics, etc), wound up with a less safe type of data structure. Consequently, Cassandra counters will over- or under-count by a wide range during a network partition. If partitioned for about half of the test run, I found counters could drift by up to 50% of the expected value. Here’s a relatively well-behaved run, drifting by less than a percent. 10000 total 9700 acknowledged 9921 survivors ## Isolation In Coming up in Cassandra 1.1: Row Level Isolation, and Atomic batches in Cassandra 1.2, DataStax asserts that a write which updates multiple keys in the same row will be atomic and isolated. Cassandra 1.1 guarantees that if you update both the login and the password in the same update (for the same row key) then no concurrent read may see only a partial update. Full row-level isolation is now in place so that writes to a row are isolated to the client performing the write and are not visible to any other user until they are complete. From a transactional ACID (atomic, consistent, isolated, durable) standpoint, this enhancement now gives Cassandra transactional AID support. We know what “atomic” means: either all of the changes in the transaction complete, or none of them do. But what does “isolated” mean? Isolated in the sense of ACID? Let’s ask Hacker News what they think Cassandra’s isolation provides: Peter Bailis pointed me at two really excellent papers on isolation and consistency–I highly recommend digging into them if you’re curious about this problem. Isolation comes in many flavors, or strengths, depending on what sorts of causal histories are allowed. Serializability is one of the strongest: all transactions appear to occur in a single well-defined non-interleaved order. Cursor stability and Snapshot Isolation are somewhat weaker. ANSI SQL defines four levels of isolation, which really have more to do with the historical behavior of various database systems than with behavior that any sane person would consider distinguishible, so I’m not going to get into the details–but suffice it to say that there are a range of phenomena which are prohibited by those isolation levels. In order from least to most awful: • P4: Lost Update • P3: Phantom • P2: Fuzzy read • P1: Dirty read • P0: Dirty write ANSI SQL’s SERIALIZABLE level prohibits P3-P0; REPEATABLE READ prohibits P2 and below, READ COMMITTED prohibits P1 and below, and READ UNCOMMITTED only prohibits P0. P0, or “dirty write” is especially important because all isolation levels must prohibit it. In P0, one transaction modifies some data; then a second transaction also modifies that data, before the first transaction commits. We never want writes from two different transactions to be mixed together, because it might violate integrity relationships which each transaction held independently. For instance, we might write [x=1, y=1] in one transaction, and [x=2, y=2] in a different transaction, assuming that x will always be equal to y. P0 allows those transactions to result in [x=1, y=2], or [x=2, y=1]. Cassandra allows P0. The key thing to remember here is that in Cassandara, the order of writes is completely irrelevant. Any write made to the cluster could eventually wind up winning, if it has a higher timestamp. But–what happens if Cassandra sees two copies of a cell with the same timestamp? It picks the lexicographically bigger value. That means that if the values written to two distinct cells don’t have the same sort order (which is likely), Cassandra could pick final cell values from different transactions. For instance, we might write [1 -1] and [2 -2]. 2 is greater than 1, so the first cell will be 2. But -1 is bigger than -2, so -1 wins in the second cell. The result? [2 -1]. “But,” you might protest, “In order for that to happen, you’d need two timestamps to collide. It’s really unlikely that two writes will get the same microsecond-resolution timestamp, right? I’ve never seen it happen in my cluster.” Well, it depends. If we assume N writes per second by Poisson processes to the same row, the probability of any given read seeing a conflicting value grows as the writes come closer together. rate probability of conflict/read ------------------------------------ 1 1.31E-7 10 5.74E-6 100 5.30E-5 1000 5.09E-4 10000 0.00504 100000 0.0492 1000000 0.417  So if you do 100,000 writes/sec, on any given read you’ve got a 5% chance of seeing corrupt data. If you do 10 writes/sec and 1 read/sec, in each day you’ve got about a 1/3 chance of seeing corrupt data in any given day. What if you write many rows over time–maybe 2 writes to each row, separated by a mean delta of 100 milliseconds? Then the theoretical probability of any given row being corrupt is about 5 × 10-6. That’s a pretty small probability–and remember, most applications can tolerate some small degree of corrupt data. Let’s confirm it with an experiment: 10000 total 9899 acknowledged 9942 survivors 58 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 127 253 277 339 423 434 ... 8112 8297 8650 8973 9096 9504 101 unacknowledged writes found! ヽ(´ー｀)ノ 1059 1102 1139 1142 1143 1158 ... 2701 2720 2721 2800 2815 2860 0.9899 ack rate 0.0058591776 loss rate 0.01020305 unacknowledged but successful rate Note that “writes lost” here means corrupted rows: entirely missing rows are treated as successes. Roughly 1 in 200 rows were corrupt! That’s way worse than 10-6! What gives? It turns out that somewhere in this maze of software, either Cassandra, the DataStax Java driver, or Cassaforte is taking the current time in milliseconds and tacking on three zeroes to the end, calling it good. The probability of millisecond conflicts is significantly higher than microsecond conflicts, which is why we saw so much corrupt data. Long story short, Cassandra row isolation is probabilistic at best; and remember, the only reason you actually want isolation is because you plan on doing two operations at the same time. If you rely on isolation, in any sense of the word, in Cassandra, you need to consider your tolerance for data corruption, and verify that you’re actually generating timestamps with the expected distribution. A strong external coordinator which guarantees unique timestamps might be of use. ## Lightweight Transactions In Cassandra 2.0.0, Lightweight Transactions offer linearizable consistency for compare-and-set operations. The implementation is based on naive Paxos–requiring four round trips for each write–but the performance can be improved with time. The important thing is that Cassandra is first to have a distributed linearizable data store, or something. That said, sometimes you really do need linearizable operations. That’s why we added lightweight transactions in Cassandra 2.0 This is a sign of Cassandra maturing — Cassandra 1.0 (released October 2011) was the fulfilment of its designers original vision; Cassandra 2.0 takes it in new directions to make it even more powerful. Open source has had the reputation of producing good imitations, but not innovation. Perhaps Cassandra’s origins as a hybrid of Dynamo and Bigtable did not disprove this, but Apache Cassandra’s development of lightweight transactions and CQL are true industry firsts. The first thing you’ll notice if you try to test the new transaction system is that the Java driver doesn’t support it. It’ll throw some weird exceptions like “unknown consistency level SERIAL”, because it doesn’t support the v2 native Cassandra protocol yet. So you’ll need to use the Python Thrift client, or, in my case, get a patched client from DataStax. The second thing you’ll notice is deadlocks. In my Jepsen tests, the cluster would go unresponsive after the first 10 or so transactions–and it would never recover. Any further attempts to modify a cell via transaction would spin endlessly in failed transactions, until I manually truncated the system.paxos table. You can’t make this shit up. So you confer with DataStax for a while, and they manage to reproduce and fix the bug: #6029 (Lightweight transactions race render primary key useless), and #5985 (Paxos replay of in progress update is incorrect). You start building patched versions of Cassandra. git checkout paxos-fixed-hopefully Let’s give it a whirl. In this transaction test, we perform repeated compare-and-set operations against a single cell, retrying failed attempts for up to 10 seconds. The first thing you’ll notice is that those four round-trips aren’t exactly lightweight, which means that at 50 transactions/sec, the majority of transaction attempts time out: But we’re less concerned with performance or availability than safety. Let’s slow down the test to 5 transactions/sec to reduce contention, and check: are lightweight transactions actually linearizable? 2000 total 829 acknowledged 827 survivors 3 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ (102 1628 1988) 1 unacknowledged writes found! ヽ(´ー｀)ノ (283) 0.4145 ack rate 0.0036188178 loss rate 0.0012062726 unacknowledged but successful rate No. Cassandra lightweight transactions are not even close to correct. Depending on throughput, they may drop anywhere from 1-5% of acknowledged writes–and this doesn’t even require a network partition to demonstrate. It’s just a broken implementation of Paxos. In addition to the deadlock bug, these Jepsen tests revealed #6012 (Cassandra may accept multiple proposals for a single Paxos round) and #6013 (unnecessarily high false negative probabilities). Paxos is notoriously difficult to implement correctly. The Chubby authors note: Our tests start in safety mode and inject random failures into the system. After running for a predetermined period of time, we stop injecting failures and give the system time to fully recover. Then we switch the test to liveness mode. The purpose for the liveness test is to verify that the system does not deadlock after a sequence of failures. This test proved useful in finding various subtle protocol errors, including errors in our group membership implementation, and our modifications to deal with corrupted disks…. We found additional bugs, some of which took weeks of simulated execution time (at extremely high failure rates) to find. Our hooks can be used to crash a replica, disconnect it from other replicas for a period of time or force a replica to pretend that it is no longer the master. This test found five subtle bugs in Chubby related to master failover in its first two weeks. And in particular, I want to emphasize: By their very nature, fault-tolerant systems try to mask problems. Thus they can mask bugs or configuration problems while insidiously lowering their own fault-tolerance. The bugs I found were low-hanging fruit: anyone who ran a few hundred simple transactions could reproduce them, even without causing a single node or network failure. Why didn’t DataStax catch this in the release process? Why publish glowing blog posts and smug retrospectives if the most fundamental safety properties of the application haven’t been trivially verified? And if I hadn’t reported these bugs, how many users do you suppose would have been subject to silent data loss or corruption in prod? I can’t say this strongly enough: One way or another, software is always tested: either by the maintainers, by users, or by applications in production. One of my goals in this series is to push database vendors to test their software prior to release, so that we can all enjoy safer, faster systems. If you’re writing a database, please try to verify its correctness experimentally. You don’t need to do a perfect job–testing is tough!–but a little effort can catch 90% of the bugs. ## Final thoughts DataStax and the open-source community around Cassandra have been working hard on the AP storage problem for several years, and it shows. Cassandra runs on thousand-node clusters and accepts phenomenal write volume. It’s extraordinarily suited for high-throughput capture of immutable or otherwise log-oriented data, and its AAE and tunable durability features work well. It is, in short, a capable AP datastore, and though I haven’t deployed it personally, many engineers I respect recommend it from their production experience wholeheartedly. Jonathan Ellis, Aleksey Yeschenko‎, and Patrick McFadin were all very helpful in helping me understand Cassandra’s model, and I hope that I have depicted it accurately here. Any errors are mine alone. I’m especially thankful that they volunteered so much of their time on nights and weekends to help someone tear apart their hard work, and that they’ve fixed the bugs I’ve found so quickly. Reproducing and fixing distributed systems bugs is an especially challenging task, and it speaks to the skill of the entire Cassandra team. DataStax has adapted some of these Jepsen tests for use in their internal testing process, and, like Basho, may use Jepsen directly to help test future releases. I’m optimistic that they’ll notify users that the transactional features are unsafe in the current release, and clarify their documentation and marketing. Again, there’s nothing technically wrong with many of the behaviors I’ve discussed above–they’re simply subtle, and deserve clear exposition so that users can interpret them correctly. I’m looking forward to watching a good database improve. # Call me maybe: Kafka In the last Jepsen post, we learned about NuoDB. Now it’s time to switch gears and discuss Kafka. Up next: Cassandra. Kafka is a messaging system which provides an immutable, linearizable, sharded log of messages. Throughput and storage capacity scale linearly with nodes, and thanks to some impressive engineering tricks, Kafka can push astonishingly high volume through each node; often saturating disk, network, or both. Consumers use Zookeeper to coordinate their reads over the message log, providing efficient at-least-once delivery–and some other nice properties, like replayability. In the upcoming 0.8 release, Kafka is introducing a new feature: replication. Replication enhances the durability and availability of Kafka by duplicating each shard’s data across multiple nodes. In this post, we’ll explore how Kafka’s proposed replication system works, and see a new type of failure. Here’s a slide from Jun Rao’s overview of the replication architecture. In the context of the CAP theorem, Kafka claims to provide both serializability and availability by sacrificing partition tolerance. Kafka can do this because LinkedIn’s brokers run in a datacenter, where partitions are rare. Note that the claimed behavior isn’t impossible: Kafka could be a CP system, providing “bytewise identical replicas” and remaining available whenever, say, a majority of nodes are connected. It just can’t be fully available if a partition occurs. On the other hand, we saw that NuoDB, in purporting to refute the CAP theorem, actually sacrificed availability. What happens to Kafka during a network partition? ## Design Kafka’s replication design uses leaders, elected via Zookeeper. Each shard has a single leader. The leader maintains a set of in-sync-replicas: all the nodes which are up-to-date with the leader’s log, and actively acknowledging new writes. Every write goes through the leader and is propagated to every node in the In Sync Replica set, or ISR. Once all nodes in the ISR have acknowledged the request, the leader considers it committed, and can ack to the client. When a node fails, the leader detects that writes have timed out, and removes that node from the ISR in Zookeeper. Remaining writes only have to be acknowledged by the healthy nodes still in the ISR, so we can tolerate a few failing or inaccessible nodes safely. So far, so good; this is about what you’d expect from a synchronous replication design. But then there’s this claim from the replication blog posts and wiki: “with f nodes, Kafka can tolerate f-1 failures”. This is of note because most CP systems only claim tolerance to n/2-1 failures; e.g. a majority of nodes must be connected and healthy in order to continue. Linkedin says that majority quorums are not reliable enough, in their operational experience, and that tolerating the loss of all but one node is an important aspect of the design. Kafka attains this goal by allowing the ISR to shrink to just one node: the leader itself. In this state, the leader is acknowledging writes which have been only been persisted locally. What happens if the leader then loses its Zookeeper claim? The system cannot safely continue–but the show must go on. In this case, Kafka holds a new election and promotes any remaining node–which could be arbitrarily far behind the original leader. That node begins accepting requests and replicating them to the new ISR. When the original leader comes back online, we have a conflict. The old leader is identical with the new up until some point, after which they diverge. Two possibilities come to mind: we could preserve both writes, perhaps appending the old leader’s writes to the new–but this would violate the linear ordering property Kafka aims to preserve. Another option is to drop the old leader’s conflicting writes altogether. This means destroying committed data. In order to see this failure mode, two things have to happen: 1. The ISR must shrink such that some node (the new leader) is no longer in the ISR. 2. All nodes in the ISR must lose their Zookeeper connection. For instance, a lossy NIC which drops some packets but not others might isolate a leader from its Kafka followers, but break the Zookeeper connection slightly later. Or the leader could be partitioned from the other kafka nodes by a network failure, and then crash, lose power, or be restarted by an administrator. Or there could be correlated failures across multiple nodes, though this is less likely. In short, two well-timed failures (or, depending on how you look at it, one complex failure) on a single node can cause the loss of arbitrary writes in the proposed replication system. I want to rephrase this, because it’s a bit tricky to understand. In the causality diagram to the right, the three vertical lines represent three distinct nodes, and time flows downwards. Initially, the Leader (L) can replicate requests to its followers in the ISR. Then a partition occurs, and writes time out. The leader detects the failure and removes nodes 2 and 3 from the ISR, then acknowledges some log entries written only to itself. When the leader loses its Zookeeper connection, the middle node becomes the new leader. What data does it have? We can trace its line upwards in time to see that it only knows about the very first write made. All other writes on the original leader are causally disconnected from the new leader. This is the reason data is lost: the causal invariant between leaders is violated by electing a new node once the ISR is empty. I suspected this problem existed from reading the JIRA ticket, but after talking it through with Jay Kreps I wasn’t convinced I understood the system correctly. Time for an experiment! ## Results First, I should mention that Kafka has some parameters that control write consistency. The default behaves like MongoDB: writes are not replicated prior to acknowledgement, which allows for higher throughput at the cost of safety. In this test, we’ll be running in synchronous mode: (producer/producer {"metadata.broker.list" (str (:host opts) ":9092") "request.required.acks" "-1" ; all in-sync brokers "producer.type" "sync" "message.send.max_retries" "1" "connect.timeout.ms" "1000" "retry.backoff.ms" "1000" "serializer.class" "kafka.serializer.DefaultEncoder" "partitioner.class" "kafka.producer.DefaultPartitioner"})  With that out of the way, our writes should be fully acknowledged by the ISR once the client returns from a write operation successfully. We’ll enqueue a series of integers into the Kafka cluster, then isolate a leader using iptables from the other Kafka nodes. Latencies spike initially, while the leader waits for the missing nodes to respond. A few requests may fail, but the ISR shrinks in a few seconds and writes begin to succeed again. We’ll allow that leader to acknowledge writes independently, for a time. While these writes look fine, they’re actually only durable on a single node–and could be lost if a leader election occurs. Then we totally partition the leader. ZK detects the leader’s disconnection and the remaining nodes will promote a new leader, causing data loss. Again, a brief latency spike: At the end of the run, Kafka typically acknowledges 98–100% of writes. However, half of those writes (all those made during the partition) are lost. Writes completed in 100.023 seconds 1000 total 987 acknowledged 468 survivors 520 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 130 131 132 133 134 135 ... 644 645 646 647 648 649 1 unacknowledged writes found! ヽ(´ー｀)ノ (126) 0.987 ack rate 0.52684903 loss rate 0.0010131713 unacknowledged but successful rate ## Discussion Kafka’s replication claimed to be CA, but in the presence of a partition, threw away an arbitrarily large volume of committed writes. It claimed tolerance to F-1 failures, but a single node could cause catastrophe. How could we improve the algorithm? All redundant systems have a breaking point. If you lose all N nodes in a system which writes to N nodes synchronously, it’ll lose data. If you lose 1 node in a system which writes to 1 node synchronously, that’ll lose data too. There’s a tradeoff to be made between how many nodes are required for a write, and the number of faults which cause data loss. That’s why many systems offer per-request settings for durability. But what choice is optimal, in general? If we wanted to preserve the all-nodes-in-the-ISR model, could we constrain the ISR in a way which is most highly available? It turns out there is a maximally available number. From Peleg and Wool’s overview paper on quorum consensus: It is shown that in a complete network the optimal availability quorum system is the majority (Maj) coterie if p < ½. In particular, given uniformly distributed element failure probabilities smaller than ½ (which realistically describes most homogenous clusters), the worst quorum systems are the Single coterie (one failure causes unavailability), and the best quorum system is the simple Majority (provided the cohort size is small). Because Kafka keeps only a small number (on the order of 1-10) replicas, Majority quorums are provably optimal in their availability characteristics. You can reason about this from extreme cases: if we allow the ISR to shrink to 1 node, the probability of a single additional failure causing data loss is high. If we require the ISR include all nodes, any node failure will make the system unavailable for writes. If we assume failures are partially independent, the probability of two failures goes like 1 - (1-p)2, which is much smaller than p. This superlinear failure probability at both ends is why bounding the ISR size in the middle has the lowest probability of failure. I made two recommendations to the Kafka team: 1. Ensure that the ISR never goes below N/2 nodes. This reduces the probability of a single node failure causing the loss of commited writes. 2. In the event that the ISR becomes empty, block and sound an alarm instead of silently dropping data. It’s OK to make this configurable, but as an administrator, you probably want to be aware when a datastore is about to violate one of its constraints–and make the decision yourself. It might be better to wait until an old leader can be recovered. Or perhaps the administrator would like a dump of the to-be-dropped writes which could be merged back into the new state of the cluster. Finally, remember that this is pre-release software; we’re discussing a candidate design, not a finished product. Jay Kreps and I discussed the possibility of a “stronger safety” mode which does bound the ISR and halts when it becomes empty–if that mode makes it into the next release, and strong safety is important for your use case, check that it is enabled. Remember, Jun Rao, Jay Kreps, Neha Narkhede, and the rest of the Kafka team are seasoned distributed systems experts–they’re much better at this sort of thing than I am. They’re also contending with nontrivial performance and fault-tolerance constraints at LinkedIn–and those constraints shape the design space of Kafka in ways I can’t fully understand. I trust that they’ve thought about this problem extensively, and will make the right tradeoffs for their (and hopefully everyone’s) use case. Kafka is still a phenomenal persistent messaging system, and I expect it will only get better. The next post in the Jepsen series explores Cassandra, an AP datastore based on the Dynamo model. # Call me maybe: NuoDB Previously on Jepsen, we explored Zookeeper. Next up: Kafka. NuoDB came to my attention through an amazing mailing list thread by the famous database engineer Jim Starkey, in which he argues that he has disproved the CAP theorem: The CAP conjecture, I am convinced, is false and can be proven false. The CAP conjecture has been a theoretical millstone around the neck of all ACID systems. Good riddance. This is the first wooden stake for the heart of the noSQL movement. There are more coming. I, and every database user on the planet, not to mention a good part of the distributed systems research community, would love to find a counterexample which disproves the CAP theorem. For that matter, I’m tremendously excited about the possibilities of causal and lattice consistency, which we know are achievable in asynchronous networks. So I was curious: what was NimbusDB (now named NuoDB) up to? How does their consistency model work? I usually try to understand a new system by reading the documentation, scanning for words like “safety”, “order”, “serializability”, “linearizability”, “consistency”, “conflict”, and “replica”. I keep notes as I go. Here are a few excerpts from my first six hours trying to figure out NuoDB’s consistency invariants: In particular, I want to draw attention to this excerpt: If the CAP theorem means that all surviving nodes must be able to continue processing without communication after a network failure, than NUODB is not partition resistant. This is kind of an odd statement to make, because Gilbert and Lynch’s proof defines “availability” as “every request received by a non-failing node in the system must result in a response.” That would seem to imply that NuoDB does not satisfy CAP availability. If partition resistance includes the possibility for a surviving subset of the chorus to sing on, then NUODB refutes the CAP theorem. We know systems exist in which a surviving subset of nodes continue processing during a partition. They are consistent with the CAP theorem because in those systems (e.g. Zookeeper) some requests to non-failing nodes do not succeed. Claiming this “refutes the CAP theorem” is incoherent. This isn’t getting us anywhere. To figure out how NuoDB actually behaves, we’ll need to set up a cluster and test it ourselves. ## Operational notes Setting up a NuoDB cluster turned out to be more difficult than I anticipated. For starters, there are race conditions in the cluster join process. Each node has a seed node to join to, which determines the cluster it will become a part of. If that seed is inaccessible at startup, the node will quietly become a part of a new, independent cluster–and will not, as far as I can tell, join the original cluster even if the node becomes accessible later. Consequently, performing a cold start is likely to result in several independent clusters, up to and including every node considering itself the sole node in its own cluster. This is a catastrophic outcome: if any clients manage to connect to one of these isolated clusters, their operations will almost certainly disagree with the other clusters. You’ll see conflicting row values, broken primary keys, invalid foreign key relationships, and so on. I have no idea how you go about repairing that kind of damage without simply dropping all the writes on one side of the split-brain. You can join a node to itself. This is easy to do accidentally if you, say, deploy the same seed node to every node’s configuration file. The consequences are… interesting. There are also race conditions in database creation. For instance, if you create and delete the same simple table a few times in succession, you can back yourself into this corner, where you can neither use, delete, nor recreate a table, short of nuking the entire cluster: I’ve talked with the NuoDB team about these bugs, and they’re working on fixing them. Hopefully they won’t be present in future releases. Finally, be aware that restarting a crashed NuoDB node does not restore its transaction managers or storage managers; if you do a naive rolling restart, all the data vanishes. In my conversations with NuoDB’s engineering staff, it looks like this is actually intended behavior for their customers' use cases. The cluster also doesn’t set up failover replicas when nodes become unavailable, so it’s easy to accidentally lose all the storage nodes if your membership shifts. NuoDB plans to improve that behavior in future releases. ## What happens during partition? In This NuoDB test, we check the consistency of compare-and-set updates to a single cell, by having transactions compete at the SERIAL consistency level to read, update, and write a vector of numbers. Note that this test does not check multi-key linearizability, or, for that matter, exclude behaviors like P4 or P3. During a partition, with the Java driver, you could see a variety of failure modes: • “Duplicate value in unique index SEQUENCES..PRIMARY_KEY” • End of stream reached • Broken pipe • Connection reset • Indefinite latency And I do mean indefinite. I haven’t actually found an upper limit to how long NuoDB will block for. As far as I can tell, when a node is inaccessible, operations will queue up for as long as the partition lasts. Moreover, they block globally: no subset of the cluster, even though a fully connected majority component existed, responded during partition. Perhaps because all operations are queued without timeout, it takes a long time for NuoDB latencies to recover after the partition resolves. In my tests, latencies continued to spike well into the 30-60 second range for as many as 1500 seconds after the partition ended. I haven’t found an upper limit for this behavior, but eventually, something somewhere must run out of ram. ## Results NuoDB typically acknowledged 55% of writes in my tests–most, but not all, writes made during the partition failed due to CaS conflict and were not retried after Jepsen’s internal timeout. The good news is that all acknowledged writes made at the SERIAL consistency level were present in the final dataset: no dropped writes. There were also a trivial fraction of false negatives, which is typical for most CP systems. This indicates that NUODB is capable of preserving some sort of linear order over CaS operations to a single cell, even in the presence of a partition. Note that NuoDB isn’t fully CP, because it does not enforce serializability for all write operations–just “local transaction order”. I’m not exactly sure how the local orders interact, and whether there are practical scenarios which would violate serializability but be allowed by NuoDB’s local transaction invariants. So far I haven’t been able to construct a test to demonstrate the difference. Does NuoDB refute the CAP theorem? Of course it doesn’t. By deferring all operations until the partition resolves, NuoDB is not even close to available. In fact, it’s a good deal less available than more consistent systems: Zookeeper, for example, remains available on all nodes connected to a majority component. NuoDB is another example of the adage that systems which purport to be CA or CAP usually sacrifice availability or consistency when a partition does occur–and often in spectacular ways. Blocking all writes during partition is, according to the NuoDB team, intended behavior. However, there is experimental liveness detection code in the most recent release, which will hopefully allow NuoDB to begin timing out requests to inaccessible nodes. I haven’t been able to test that code path yet, but future releases may enable it by default. If you are considering using NuoDB, be advised that the project’s marketing and documentation may exceed its present capabilities. Try to enable the liveness detection code, and set up your own client timeouts to avoid propagating high latencies to other systems. Try to build backpressure hints into your clients to reduce the requests against NuoDB during failure; the latency storm which persists after the network recovers is proportional to the backlog of requests. Finally, be aware of the operational caveats mentioned earlier: monitor your nodes carefully, restart their storage and transaction managers as appropriate, and verify that newly started nodes have indeed joined the cluster before exposing them to clients. Finally, I want to note (as always) that the presence of bugs does not mean that the NuoDB engineers are incompetent–in fact, I want to assert the opposite. In my discussions with the NuoDB team I’ve found them to be friendly, capable, aware of the product’s limitations, and doing their best to solve a difficult problem within constraints of time, budget, and complexity. Given time, I’m sure they’ll get past these initial hurdles. From one employee: I only hope you’ll footnote that crazy CAP rambling with the disclaimer that no one at NuoDB today actually agrees with Jim’s comments in that thread. In the next post, we’ll learn about Kafka 0.8’s proposed replication model. # Call me maybe: Zookeeper In this Jepsen post, we’ll explore Zookeeper. Up next: NuoDB. Zookeeper, or ZK for short, is a distributed CP datastore based on a consensus protocol called ZAB. ZAB is similar to Paxos in that it offers linearizable writes and is available whenever a majority quorum can complete a round, but unlike the Paxos papers, places a stronger emphasis on the role of a single leader in ensuring the consistency of commits. Because Zookeeper uses majority quorums, in an ensemble of five nodes, any two can fail or be partitioned away without causing the system to halt. Any clients connected to a majority component of the cluster can continue to make progress safely. In addition, the linearizability property means that all clients will see all updates in the same order–although clients may drift behind the primary by an arbitrary duration. This safety property comes at a cost: writes must be durably written to a disk log on a majority of nodes before they are acknowledged. In addition, the entire dataset must fit in memory. This means that Zookeeper is best deployed for small pieces of state where linearizability and high availability is critical. Often, ZK is used to track consistent pointers to larger, immutable data stored in a different (perhaps AP) system; combining the safety and scalability advantages of both. At the same time, this strategy reduces the availability for writes, since there are two systems to fail, and one of them (ZK) requires majority quorums. ## ZNode linearizability In this test, five clients use a Curator DistributedAtom to update a list of numbers. The list is stored as a single serialized znode, and updates are applied via a CaS loop: atomically reading, decoding, appending the appropriate number, enoding, and writing back iff the value has not changed. (let [curator (framework (str (:host opts) ":2181") "jepsen") path "/set-app" state (distributed-atom curator path [])] (reify SetApp (setup [app] (reset!! state [])) (add [app element] (try (swap!! state conj element) ok (catch org.apache.zookeeper.KeeperExceptionConnectionLossException e error))) (results [app] @state) (teardown [app] (delete! curator path))))) 

Initially, the ZK leader is n1. During the test, we partition [n1 n2] away from [n3 n4 n5], which means the leader cannot commit to a majority of nodes–and consequently, writes immediately block:

After 15 seconds or so, a new leader is elected in the majority component, and writes may proceed again. However, only the clients which can see one of [n3 n4 n5] can write: clients connected to [n1 n2] time out while waiting to make contact with the leader:

When the partition is resolved, writes on [n1 n2] begin to succeed right away; the leader election protocol is stable, so there is no need for a second transition during recovery.

Consequently, in a short test (~200 seconds, ~70 second partition, evenly distributed constant write load across all nodes) ZK might offer 78% availability, asymptotically converging on 60% (3/5 nodes) availability as the duration of the partition lengthens. ZK has never dropped an acknowledged write in any Jepsen test. It also typically yields 0-2 false positives: likely due to writes proxied through n1 and n2 just prior to the partition, such that the write committed, but the acknowledgement was not received by the proxying node.

As with any experiment, we can only disconfirm hypotheses. This test demonstrates that in the presence of a partition and leader election, Zookeeper is able to maintain the linearizability invariant. However, there could be other failure modes or write patterns which would not preserve linearizability–I just haven’t been able to find them so far. Nonetheless, this is a positive result: one that all CP datastores should aim for.

## Recommendations

Use Zookeeper. It’s mature, well-designed, and battle-tested. Because the consequences of its connection model and linearizability properties are subtle, you should, wherever possible, take advantage of tested recipes and client libraries like Curator, which do their best to correctly handle the complex state transitions associated with session and connection loss.

Also keep in mind that linearizable state in Zookeeper (such as leader election) does not guarantee the linearizability of a system which uses ZK. For instance, a cluster which uses ZK for leader election might allow multiple nodes to be the leader simultaneously. Even if there are no simultaneous leaders at the same wall-clock time, message delays can result in logical inconsistencies. Designing CP systems, even with a strong coordinator, requires carefully coupling the operations in the system to the underlying coordinator state.

Next up: NuoDB.

# Automating Jepsen

If you, as a database vendor, implement a few features in your API, I can probably offer repeatable automated tests of your DB’s partition tolerance through Jepsen.

The outcome of these tests would be a set of normalized metrics for each DB like “supports linearizability”, “available for writes when a majority partition exists”, “available for writes when no majority available”, “fraction of writes successful”, “fraction of writes denied”, “fraction of writes acked then lost”, “95th latency during condition X”, and so forth. I’m thinking this would be a single-page web site–a spreadsheet, really–making it easy to compare and contrast DBs and find one that fits your safety needs.

At a minimum, I need to know:

• After initial startup, when is the database stable and ready to accept writes?
• For a given key, which node (if any) is the primary replica?
• For a given key, which node (if any) are secondary replicas?
• After partitions end, when has the database fully recovered? (e.g. has it completed handoff, replayed oplogs, etc)

I also need totally automated, reliable scripting of DB installation and provisioning. Many DBs make it really tough to join nodes from the shell.

This is gonna take several months of my time and a nontrivial amount of money for hardware. I’m looking at a few options, from physical hardware in my garage to renting EC2 compute nodes. EC2 means anybody could, in theory, run these benchmarks themselves–but there are a ton of moving pieces involved, it takes a lot more work to set up, and VM performance is really variable. Ideally, someone out there has five or six identical boxes they don’t need any more–maybe leftover desktops, 1Us from a decommissioned colo, whatever. They don’t have to be all that fast, but I’m hitting the limits of what I can do on virtualized infrastructure.

If you want to make this happen, and can help make the necessary API improvements, write automation scripts for Jepsen, provide hardware or hosting, etc., please email aphyr@aphyr.com.

# The network is reliable

I’ve been discussing Jepsen and partition tolerance with Peter Bailis over the past few weeks, and I’m honored to present this post as a collaboration between the two of us. We’d also like to extend our sincere appreciation to everyone who contributed their research and experience to this piece.

Network partitions are a contentious subject. Some claim that modern networks are reliable and that we are too concerned with designing for theoretical failure modes. They often accept that single-node failures are common but argue that we can reliably detect and handle them. Conversely, others subscribe to Peter Deutsch’s Fallacies of Distributed Computing and disagree. They attest that partitions do occur in their systems, and that, as James Hamilton of Amazon Web Services neatly summarizes, “network partitions should be rare but net gear continues to cause more issues than it should.” The answer to this debate radically affects the design of distributed databases, queues, and applications. So who’s right?

A key challenge in this dispute is the lack of evidence. We have few normalized bases for comparing network and application reliability–and even less data. We can track link availability and estimate packet loss, but understanding the end-to-end effect on applications is more difficult. The scant evidence we have is difficult to generalize: it is often deployment-specific and closely tied to particular vendors, topologies, and application designs. Worse, even when an organization has clear picture of their network’s behavior, they rarely share specifics. Finally, distributed systems are designed to resist failure, which means noticeable outages often depend on complex interactions of failure modes. Many applications silently degrade when the network fails, and resulting problems may not be understood for some time–if they are understood at all.

As a result, much of what we know about the failure modes of real-world distributed systems is founded on guesswork and rumor. Sysadmins and developers will swap stories over beers, but detailed, public postmortems and comprehensive surveys of network availability are few and far between. In this post, we’d like to bring a few of these stories together. We believe this is a first step towards a more open and honest discussion of real-world partition behavior, and, ultimately, more robust distributed systems design.

## Rumblings from large deployments

To start off, let’s consider evidence from big players in distributed systems: companies running globally distributed infrastructure with hundreds of thousands of nodes. Of all of the data we have collected, these reports best summarize operation in the large, distilling the experience of operating what are likely the biggest distributed systems ever deployed. Their publications (unlike many of the case studies we will examine later) often capture aggregate system behavior and large-scale statistical trends, and indicate (often obliquely) that partitions are a significant concern in their deployments.

### The Microsoft Datacenter Study

A team from the University of Toronto and Microsoft Research studied the behavior of network failures in several of Microsoft’s datacenters. They found an average failure rate of 5.2 devices per day and 40.8 links per day with a median time to repair of approximately five minutes (and up to one week). While the researchers note that correlating link failures and communication partitions is challenging, they estimate a median packet loss of 59,000 packets per failure. Perhaps more concerning is their finding that network redundancy improves median traffic by only 43%; that is, network redundancy does not eliminate many common causes of network failure.

### HP Enterprise Managed Networks

A joint study between researchers at University of California, San Diego and HP Labs examined the causes and severity of network failures in HP’s managed networks by analyzing support ticket data. “Connectivity”-related tickets accounted for 11.4% of support tickets (14% of which were of the highest priority level), with a median incident duration of 2 hours and 45 minutes for the highest priority tickets and and a median duration of 4 hours 18 minutes for all priorities.

Google’s paper describing the design and operation of Chubby, their distributed lock manager, outlines the root causes of 61 outages over 700 days of operation across several clusters. Of the nine outages that lasted greater than 30 seconds, four were caused by network maintenance and two were caused by “suspected network connectivity problems.”

### Google’s Design Lessons from Distributed Systems

In Design Lessons and Advice from Building Large Scale Distributed Systems, Jeff Dean suggests that a typical first year for a new Google cluster involves:

• 5 racks going wonky (40-80 machines seeing 50% packet loss)
• 8 network maintenances (4 might cause ~30-minute random connectivity losses)
• 3 router failures (have to immediately pull traffic for an hour)

While Google doesn’t tell us much about the application-level consequences of their network partitions, “Lessons From Distributed Systems” suggests they are a significant concern, citing the challenge of “[e]asy-to-use abstractions for resolving conflicting updates to multiple versions of a piece of state” as useful in “reconciling replicated state in different data centers after repairing a network partition.”

### Amazon Dynamo

Amazon’s Dynamo paper frequently cites the incidence of partitions as a driving design consideration. Specifically, the authors note that they rejected designs from “traditional replicated relational database systems” because they “are not capable of handling network partitions.”

### Yahoo! PNUTS/Sherpa

Yahoo! PNUTS/Sherpa was designed as a distributed database operating out of multiple, geographically distinct sites. Originally, PNUTS supported a strongly consistent “timeline consistency” operation, with one master per data item. However, the developers noted that, in the event of “network partitioning or server failures,” this design decision was too restrictive for many applications:

The first deployment of Sherpa supported the timeline-consistency model — namely, all replicas of a record apply all updates in the same order — and has API-level features to enable applications to cope with asynchronous replication. Strict adherence leads to difficult situations under network partitioning or server failures. These can be partially addressed with override procedures and local data replication, but in many circumstances, applications need a relaxed approach.“

## Application-level failures

Not all partitions originate in the physical network. Sometimes dropped or delayed messages are a consequence of crashes, race conditions, OS scheduler latency, or overloaded processes. The following studies highlight the fact that partitions–wherein the system delays or drops messages–can occur at any layer of the software stack.

### CPU use and service contention

Bonsai.io discovered high CPU and memory use on an ElasticSearch node combined with difficulty connecting to various cluster components, likely a consequence of an "excessively high number of expensive requests being allowed through to the cluster.”

They restarted the cluster, but on restarting the cluster partitioned itself into two independent components. A subsequent cluster restart resolved the partition, but customers complained they were unable to delete or create indices. The logs revealed that servers were repeatedly trying to recover unassigned indices, which “poisoned the cluster’s attempt to service normal traffic which changes the cluster state.” The failure led to 20 minutes of unavailability and six hours of degraded service.

Bonsai concludes by noting that large-scale ElasticSearch clusters should use dedicated nodes which handle routing and leader election without serving normal requests for data, to prevent partitions under heavy load. They also emphasize the importance of request throttling and setting proper quorum values.

### Long GC pauses

Stop-the-world garbage collection can force application latencies on the order of seconds to minutes. As Searchbox.io observed, GC pressure in an ElasticSearch cluster can cause secondary nodes to declare a primary dead and to attempt a new election. Because their configuration used a low value of zen.minimum_master_nodes, ElasticSearch was able to elect two simultaneous primaries, leading to inconsistency and downtime.

### MySQL overload and a Pacemaker segfault

Github relies heavily on Pacemaker and Heartbeat: programs which coordinate cluster resources between nodes. They use Percona Replication Manager, a resource agent for Pacemaker, to replicate their MySQL database between three nodes.

On September 10th, 2012, a routine database migration caused unexpectedly high load on the MySQL primary. Percona Replication Manager, unable to perform health checks against the busy MySQL instance, decided the primary was down and promoted a secondary. The secondary had a cold cache and performed poorly. Normal query load on the node caused it to slow down, and Percona failed back to the original primary. The operations team put Pacemaker into maintenance-mode, temporarily halting automatic failover. The site appeared to recover.

The next morning, the operations team discovered that the standby MySQL node was no longer replicating changes from the primary. Operations decided to disable Pacemaker’s maintenance mode to allow the replication manager to fix the problem.

Upon attempting to disable maintenance-mode, a Pacemaker segfault occurred that resulted in a cluster state partition. After this update, two nodes (I’ll call them ‘a’ and ‘b’) rejected most messages from the third node (‘c’), while the third node rejected most messages from the other two. Despite having configured the cluster to require a majority of machines to agree on the state of the cluster before taking action, two simultaneous master election decisions were attempted without proper coordination. In the first cluster, master election was interrupted by messages from the second cluster and MySQL was stopped.

In the second, single-node cluster, node ‘c’ was elected at 8:19 AM, and any subsequent messages from the other two-node cluster were discarded. As luck would have it, the ‘c’ node was the node that our operations team previously determined to be out of date. We detected this fact and powered off this out-of-date node at 8:26 AM to end the partition and prevent further data drift, taking down all production database access and thus all access to github.com.

The partition caused inconsistency in the MySQL database–both between the secondary and primary, and between MySQL and other data stores like Redis. Because foreign key relationships were not consistent, Github showed private repositories to the wrong users' dashboards and incorrectly routed some newly created repos.

Github thought carefully about their infrastructure design, and were still surprised by a complex interaction of partial failures and software bugs. As they note in the postmortem:

… if any member of our operations team had been asked if the failover should have been performed, the answer would have been a resounding no.

Distributed systems are hard.

## NICs and drivers

### BCM5709 and friends

Unreliable NIC hardware or drivers are implicated in a broad array of partitions. Marc Donges and Michael Chan bring us a thrilling report of the popular Broadcom BCM5709 chipset abruptly dropping inbound but not outbound packets to a machine. Because the NIC dropped inbound packets, the node was unable to service requests. However, because it could still send heartbeats to its hot spare via keepalived, the spare considered the primary alive and refused to take over. The service was unavailable for five hours and did not recover without a reboot.

Sven Ulland followed up, reporting the same symptoms with the BCM5709S chipset on Linux 2.6.32-41squeeze2. Despite pulling commits from mainline which supposedly fixed a similar set of issues with the bnx2 driver, they were unable to resolve the issue until version 2.6.38.

Since Dell shipped a large number of servers with the BCM5709, the impact of these firmware bugs was widely observed. For instance, the 5709 and some chips had a bug in their 802.3x flow control code causing them to spew PAUSE frames when the chipset crashed or its buffer filled up. This problem was magnified by the BCM56314 and BCM56820 switch-on-a-chip devices (a component in a number of Dell’s top-of-rack switches), which, by default, spewed PAUSE frames at every interface trying to communicate with the offending 5709 NIC. This led to cascading failures on entire switches or networks.

The bnx2 driver could also cause transient or flapping network failures, as described in this ElasticSearch split brain report. Meanwhile, the the Broadcom 57711 was notorious for causing extremely high latencies under load with jumbo frames, a particularly thorny issue for ESX users with iSCSI-backed storage.

### A GlusterFS partition caused by a driver bug

After a scheduled upgrade, CityCloud noticed unexpected network failures in two distinct GlusterFS pairs, followed by a third. Suspecting link aggregation, CityCloud disabled the feature on their switches and allowed self-healing operations to proceed.

Roughly 12 hours later, the network failures returned on one node. CityCloud identified the cause as a driver issue and updated the downed node, returning service. However, the outage resulted in data inconsistency between GlusterFS pairs:

As the servers lost storage abruptly there were certain types of Gluster issues where files did not match each other on the two nodes in each storage pair. There were also some cases of data corruption in the VMs filesystems due to VMs going down in an uncontrolled way.

## Datacenter network failures

Individual network interfaces can fail, but they typically appear as single-node outages. Failures located in the physical network are often more nefarious. Switches are subject to power failure, misconfiguration, firmware bugs, topology changes, cable damage, and malicious traffic. Their failure modes are accordingly diverse:

### Power failure on both redundant switches

As Microsoft’s SIGCOMM paper suggests, redundancy doesn’t always prevent link failure. When a power distribution unit failed and took down one of two redundant top-of-rack switches, Fog Creek lost service for a subset of customers on that rack but remained consistent and available for most users. However, the other switch in that rack also lost power for undetermined reasons. That failure isolated the two neighboring racks from one another, taking down all On Demand services.

### Switch split-brain caused by BPDU flood

During a planned network reconfiguration to improve reliability, Fog Creek suddenly lost access to their network.

A network loop had formed between several switches.

The gateways controlling access to the switch management network were isolated from each other, generating a split-brain scenario. Neither were accessible due to a sudden traffic flood.

The flood was the result of a multi-switch BPDU (bridge protocol data unit) flood, indicating a spanning-tree flap. This is most likely what was changing the loop domain.

According to the BPDU standard, the flood shouldn’t have happened. But it did, and this deviation from the system’s assumptions resulted in two hours of total service unavailability.

### Bridge loops, misconfiguration, broken MAC caches

In an effort to address high latencies caused by a daisy-chained network topology, Github installed a set of aggregation switches in their datacenter. Despite a redundant network, the installation process resulted in bridge loops, and switches disabled links to prevent failure. This problem was quickly resolved, but later investigation revealed that many interfaces were still pegged at 100% capacity.

While investigating that problem, a misconfigured switch triggered aberrant automatic fault detection behavior: when one link was disabled, the fault detector disabled all links. This caused 18 minutes of hard downtime. The problem was later traced to a firmware bug preventing switches from updating their MAC address caches correctly, which forced them to broadcast most packets to every interface.

### Mystery RabbitMQ partitions

Sometimes, nobody knows why a system partitions. This RabbitMQ failure seems like one of those cases: few retransmits, no large gaps between messages, and no clear loss of connectivity between nodes. Upping the partition detection timeout to 2 minutes reduced the frequency of partitions but didn’t prevent them altogether.

### DRBD split-brain

When a two-node cluster partitions, there are no cases in which a node can reliably declare itself to be the primary. When this happens to a DRBD filesystem, as one user reported, both nodes can remain online and accept writes, leading to divergent filesystem-level changes. The only realistic option for resolving these kinds of conflicts is to discard all writes not made to a selected component of the cluster.

### A NetWare split-brain

Short-lived failures can lead to long outages. In this Usenet post to novell.support.cluster-services, an admin reports their two-node failover cluster running Novell NetWare experienced transient network outages. The secondary node eventually killed itself, and the primary (though still running) was no longer reachable by other hosts on the network. The post goes on to detail a series of network partition events correlated with backup jobs!

### MLAG, Spanning Tree, and STONITH

Github writes great postmortems, and this one is no exception. On December 22nd, 2012, a planned software update on an aggregation switch caused some mild instability during the maintenance window. In order to collect diagnostic information about the instability, the network vendor killed a particular software agent running on one of the aggregation switches.

Github’s aggregation switches are clustered in pairs using a feature called MLAG, which presents two physical switches as a single layer 2 device. The MLAG failure detection protocol relies on both ethernet link state and a logical heartbeat message exchanged between nodes. When the switch agent was killed, it was unable to shut down the ethernet link. Unlucky timing confused the MLAG takeover, preventing the still-healthy agg switch from handling link aggregation, spanning-tree, and other L2 protocols as normal. This forced a spanning-tree leader election and reconvergence for all links, blocking all traffic between access switches for 90 seconds.

The 90-second network partition caused fileservers using Pacemaker and DRBD for HA failover to declare each other dead, and to issue STONITH (Shoot The Other Node In The Head) messages to one another. The network partition delayed delivery of those messages, causing some fileserver pairs to believe they were both active. When the network recovered, both nodes shot each other at the same time. With both nodes dead, files belonging to the pair were unavailable.

To prevent filesystem corruption, DRBD requires that administrators ensure the original primary node is still the primary node before resuming replication. For pairs where both nodes were primary, the ops team had to examine log files or bring the node online in isolation to determine its state. Recovering those downed fileserver pairs took five hours, during which Github service was significantly degraded.

## Hosting providers

Running your own datacenter can be cheaper and more reliable than using public cloud infrastructure, but it also means you have to be a network and server administrator. What about hosting providers, which rent dedicated or virtualized hardware to users and often take care of the network and hardware setup for you?

### An undetected GlusterFS split-brain

Freistil IT hosts their servers with a colocation/managed-hosting provider. Their monitoring system alerted Freistil to 50–100% packet loss localized to a specific datacenter. The network failure, caused by a router firmware bug, returned the next day. Elevated packet loss caused the GlusterFS distributed filesystem to enter split-brain undetected:

Unfortunately, the malfunctioning network had caused additional problems which we became aware of in the afternoon when a customer called our support hotline because their website failed to deliver certain image files. We found that this was caused by a split-brain situation on the storage cluster “stor02″ where changes made on node “stor02b” weren’t reflected on “stor02a” and the self-heal algorithm built into the Gluster filesystem was not able to resolve this inconsistency between the two data sets.

Repairing that inconsistency led to a “brief overload of the web nodes because of a short surge in network traffic.”

### An anonymous hosting provider

From what we can gather informally, all the major managed hosting providers experience regular network failures. One company running 100-200 nodes on a major hosting provider reported that in a 90-day period the provider’s network went through five distinct periods of partitions. Some partitions disabled connectivity between the provider’s cloud network and the public internet, and others separated the cloud network from the provider’s internal managed-hosting network. The failures caused unavailability, but because this company wasn’t running any significant distributed systems across those partitioned networks, there was no observed inconsistency or data loss.

### Pacemaker/Heartbeat split-brain

A post to Linux-HA details a long-running partition between a Heartbeat pair, in which two Linode VMs each declared the other dead and claimed a shared IP for themselves. Successive posts suggest further network problems: emails failed to dispatch due to DNS resolution failure, and nodes reported “network unreachable.” In this case, the impact appears to have been minimal–in part because the partitioned application was just a proxy.

## Cloud networks

Large-scale virtualized environments are notorious for transient latency, dropped packets, and full-blown network partitions, often affecting a particular software version or availability zone. Sometimes the failures occur between specific subsections of the provider’s datacenter, revealing planes of cleavage in the underlying hardware topology.

### An isolated MongoDB primary on EC2

In a comment on Call me maybe: MongoDB, Scott Bessler observed exactly the same failure mode Kyle demonstrated in the Jepsen post:

“Prescient. The w=safe scenario you show (including extra fails during rollback/re-election) happened to us today when EC2 West region had network issues that caused a network partition that separated PRIMARY from its 2 SECONDARIES in a 3 node replset. 2 hours later the old primary rejoined and rolled back everything on the new primary. Our bad for not using w=majority.”

This partition caused two hours of write loss. From our conversations with large-scale MongoDB users, we gather that network events causing failover on EC2 are common. Simultaneous primaries accepting writes for multiple days are not unknown.

### Mnesia split-brain on EC2

EC2 outages can leave two nodes connected to the internet but unable to see each other. This type of partition is especially dangerous, as writes to both sides of a partitioned cluster can cause inconsistency and lost data. That’s exactly what happened to this Mnesia cluster, which diverged overnight. Their state wasn’t critical, so the operations team simply nuked one side of the cluster. They conclude: “the experience has convinced us that we need to prioritize up our network partition recovery strategy”.

### EC2 instability causing MongoDB and ElasticSearch unavailability

Network disruptions in EC2 can affect only certain groups of nodes. For instance, this report of a total partition between the frontend and backend stacks states that their the web servers lose their connections to all backend instances for a few seconds, several times a month. Even though the disruptions were short, cluster convergence resulted in 30-45 minute outages and a corrupted index for ElasticSearch. As problems escalated, the outages occurred “2 to 4 times a day.”

### VoltDB split-brain on EC2

One VoltDB user reports regular network failures causing replica divergence but also indicates that their network logs included no dropped packets. Because this cluster had not enabled split-brain detection, both nodes ran as isolated primaries, causing significant data loss.

### ElasticSearch discovery failure on EC2

Another EC2 split-brain: a two-node cluster failed to converge on “roughly 1 out of 10 startups” when discovery messages took longer than three seconds to exchange. As a result, both nodes would start as primaries with the same cluster name. Since ElasticSearch doesn’t demote primaries automatically, split-brain persisted until administrators intervened. Upping the discovery timeout to 15 seconds resolved the issue.

### RabbitMQ and ElasticSearch on Windows Azure

There are a few scattered reports of Windows Azure partitions, such as this account of a RabbitMQ cluster which entered split-brain on a weekly basis. There’s also this report of an ElasticSearch split-brain, but since Azure is a relative newcomer compared to EC2, descriptions of its network reliability are limited.

### AWS EBS outage

On April 21st, 2011, Amazon Web Services went down for over 12 hours, causing hundreds of high-profile web sites to go offline. As a part of normal AWS scaling activities, Amazon engineers shifted traffic away from a router in the Elastic Block Store (EBS) network in a single US-East Availability Zone (AZ).

The traffic shift was executed incorrectly and rather than routing the traffic to the other router on the primary network, the traffic was routed onto the lower capacity redundant EBS network. For a portion of the EBS cluster in the affected Availability Zone, this meant that they did not have a functioning primary or secondary network because traffic was purposely shifted away from the primary network and the secondary network couldn’t handle the traffic level it was receiving. As a result, many EBS nodes in the affected Availability Zone were completely isolated from other EBS nodes in its cluster. Unlike a normal network interruption, this change disconnected both the primary and secondary network simultaneously, leaving the affected nodes completely isolated from one another.

The partition coupled with aggressive failure-recovery code caused a mirroring storm, which led to network congestion and triggered a previously unknown race condition in EBS. EC2 was unavailable for roughly 12 hours, and EBS was unavailable or degraded for over 80 hours.

The EBS failure also caused an outage in Amazon’s Relational Database Service. When one AZ fails, RDS is designed to fail over to a different AZ. However, 2.5% of multi-AZ databases in US-East failed to fail over due to “stuck” IO.

The primary cause was that the rapid succession of network interruption (which partitioned the primary from the secondary) and “stuck” I/O on the primary replica triggered a previously un-encountered bug. This bug left the primary replica in an isolated state where it was not safe for our monitoring agent to automatically fail over to the secondary replica without risking data loss, and manual intervention was required.“

This correlated failure caused widespread outages for clients relying on AWS. For example, Heroku reported between 16 and 60 hours of unavailability for their users' databases.

While we have largely focused on failures over local area networks (or near-local networks), wide area network (WAN) failures are also common–if less frequently documented. These failures are particularly interesting because there are often fewer redundant WAN routes and because systems guaranteeing high availability (and disaster recovery) often require distribution across multiple datacenters. Accordingly, graceful degradation under partitions or increased latency is especially important for geographically widespread services.

### PagerDuty

PagerDuty designed their system to remain available in the face of node, datacenter, or even provider failure; their services are replicated between two EC2 regions and a datacenter hosted by Linode. On April 13, 2013, an AWS peering point in northern California degraded, causing connectivity issues for one of PagerDuty’s EC2 nodes. As latencies between AWS availability zones rose, the notification dispatch system lost quorum and stopped dispatching messages entirely.

Even though PagerDuty’s infrastructure was designed with partition tolerance in mind, correlated failures due to a shared peering point between two datacenters caused 18 minutes of unavailability, dropping inbound API requests and delaying queued pages until quorum was re-established.

### CENIC Study

Researchers at the University of California, San Diego quantitatively analyzed five years of operation in the CENIC wide-area network, which contains over two hundred routers across California. By cross-correlating link failures and additional external BGP and traceroute data, they discovered over 508 "isolating network partitions” that caused connectivity problems between hosts. Average partition duration ranged from 6 minutes for software-related failures to over 8.2 hours for hardware-related failures (median 2.7 and 32 minutes; 95th percentile of 19.9 minutes and 3.7 days, respectively).

## Global routing failures

Despite the high level of redundancy in internet systems, some network failures take place on a globally distributed scale.

### Cloudflare

CloudFlare runs 23 datacenters with redundant network paths and anycast failover. In response to a DDoS attack against one of their customers, their operations team deployed a new firewall rule to drop packets of a specific size. Juniper’s FlowSpec protocol propagated that rule to all CloudFlare edge routers–but then:

What should have happened is that no packet should have matched that rule because no packet was actually that large. What happened instead is that the routers encountered the rule and then proceeded to consume all their RAM until they crashed.

Recovering from the failure was complicated by routers which failed to reboot automatically, and inaccessible management ports.

Even though some data centers came back online initially, they fell back over again because all the traffic across our entire network hit them and overloaded their resources.

CloudFlare monitors their network carefully and the ops team had immediate visibility into the failure. However, coordinating globally distributed systems is complex, and calling on-site engineers to find and reboot routers by hand takes time. Recovery began after 30 minutes, and was complete after an hour of unavailability.

### Juniper routing bug

A firmware bug introduced as a part of an upgrade in Juniper Networks’s routers caused outages in Level 3 Communications’s networking backbone. This subsequently knocked services like Time Warner Cable, RIM BlackBerry, and several UK internet service providers offline.

### Global BGP outages

There have been several global Internet outages related to BGP misconfiguration. Notably, in 2008, Pakistan Telecom, responding to a government edict to block YouTube.com, incorrectly advertised its (blocked) route to other provides, which hijacked traffic from the site and briefly rendered it unreachable. In 2010, a group of Duke University researchers achieved a similar effect by testing an experimental flag in the BGP protocol. Similar incidents occurred in 2006 (knocking sites like Martha Stewart Living and The New York Times offline), in 2005 (where a misconfiguration in Turkey attempted in a redirect for the entire internet), and in 1997.

## Where do we go from here?

This post is meant as a reference point–to illustrate that, according to a wide range of accounts, partitions occur in many real-world environments. Processes, servers, NICs, switches, local and wide area networks can all fail, and the resulting economic consequences are real. Network outages can suddenly arise in systems that are stable for months at a time, during routine upgrades, or as a result of emergency maintenance. The consequences of these outages range from increased latency and temporary unavailability to inconsistency, corruption, and data loss. Split-brain is not an academic concern: it happens to all kinds of systems–sometimes for days on end. Partitions deserve serious consideration.

On the other hand, some networks really are reliable. Engineers at major financial firms report that despite putting serious effort into designing systems that gracefully tolerate partitions, their networks rarely, if ever, exhibit partition behavior. Cautious engineering (and lots of money) can prevent outages.

However, not all organizations can afford the cost or operational complexity of highly reliable networks. From Google and Amazon (who operate commodity and/or low-cost hardware due to sheer scale) to one-man startups built on shoestring budgets, communication-isolating network failures are a real risk.

It’s important to consider this risk before a partition occurs–because it’s much easier to make decisions about partition tolerance on a whiteboard than to redesign, re-engineer, and upgrade a complex system in a production environment–especially when it’s throwing errors at your users. For some applications, failure is an option–but you should characterize and explicitly account for it as a part of your design.

We invite you to contribute your own experiences with or without network partitions. Open a pull request on https://github.com/aphyr/partitions-post, leave a comment, write a blog post, or release a post-mortem. Data will inform this conversation, future designs, and, ultimately, the availability of the systems you depend on.

# Asynchronous replication with failover

In response to my earlier post on Redis inconsistency, Antirez was kind enough to help clarify some points about Redis Sentinel's design.

First, I'd like to reiterate my respect for Redis. I've used Redis extensively in the past with good results. It's delightfully fast, simple to operate, and offers some of the best documentation in the field. Redis is operationally predictable. Data structures and their performance behave just how you'd expect. I hear nothing but good things about the clarity and quality of Antirez' C code. This guy knows his programming.

I think Antirez and I agree with each other, and we're both saying the same sorts of things. I'd just like to expand on some of these ideas a bit, and generalize to a broader class of systems.

First, the distributed system comprised of Redis and Redis Sentinel cannot be characterized as consistent. Nor can MongoDB with anything less than WriteConcern.MAJORITY, or MySQL with asynchronous replication, for that matter. Antirez writes:

What I'm saying here is that just the goal of the system is:

1) To promote a slave into a master if the master fails.
2) To do so in a reliable way.

Redis Sentinel does reliably promote secondaries into primaries. It is so good at this that it can promote two, three, or all of your secondaries into primaries concurrently, and keep them in that state indefinitely. As we've seen, having causally unconnected primaries in this kind of distributed system allows for conflicts–and since Redis Sentinel will destroy the state on an old primary when it becomes visible to a quorum of Sentinels, this can lead to arbitrary loss of acknowledged writes to the system.

Ok I just made clear enough that there is no such goal in Sentinel to turn N Redis instances into a distributed store,

If you use any kind of failover, your Redis system is a distributed store. Heck, reading from secondaries makes Redis a distributed store.

So you can say, ok, Sentinel has a limited scope, but could you add a feature so that when the master feels in the minority it no longer accept writes? I don't think it's a good idea. What it means to be in the minority for a Redis master monitored by Sentinels (especially given that Redis and Sentinel are completely separated systems)?

Do you want your Redis master stopping to accept writes when it is no longer able to replicate to its slaves?

Yes. This is required for a CP system with failover. If you don't do it, your system can and will lose data. You cannot achieve consistency in the face of a partition without sacrificing availability. If you want Redis to be AP, then don't destroy the data on the old primaries by demoting them. Preserve conflicts and surface them to the clients for merging.

You could do this as an application developer by setting every Redis node to be a primary, and writing a proxy layer which uses, say, consistent hashing and active anti-entropy to replicate writes between nodes. Take a look at Antirez's own experiments in this direction. If you want a CP system, you could follow Datomic's model and use immutable shared-structure values in Redis, combined with, say, Zookeeper for mutable state.

## Why topology matters

Antirez recommends a different approach to placing Sentinels than I used in my Redis experiments:

… place your Sentinels and set your quorum so that you are defensive enough against partitions. This way the system will activate only when the issue is really the master node down, not a network problem. Fear data loss and partitions? Have 10 Linux boxes? Put a Sentinel in every box and set quorum to 8.

I… can't parse this statement in a way that makes sense. Adding more boxes to a distributed system doesn't reduce the probability of partitions–and more to the point, trying to determine the state of a distributed system from outside the system itself is fundamentally flawed.

I mentioned that having the nodes which determine the cluster state (the Sentinels) be separate from the nodes which actually perform the replication (the Redis servers) can lead to worse kinds of partitions. I'd like to explain a little more, because I'm concerned that people might actually be doing this in production.

In this image, S stands for Sentinel, R stands for a Redis server, and C stands for Client. A box around an R indicates that node is a primary, and where it is able to replicate data to a secondary Redis server, an arrow is shown on that path. Lines show open network connections, and the jagged border shows a network partition.

Let's say we place our sentinels on 3 nodes to observe a three-node cluster. In the left-hand scenario, the majority of Sentinels are isolated, with two servers, from the clients. They promote node 2 to be a new primary, and it begins replicating to node 3. Node 1, however, is still a primary. Clients will continue writing to node 1, even though a.) its durability guarantees are greatly diminished–if it dies, all writes will be lost, and b.) the node doesn't have a quorum, so it cannot safely accept writes. When the partition resolves, the Sentinels will demote node 1 to a secondary and replace its data with the copy from N2, effectively destroying all writes during the partition.

On the right-hand side, a fully connected group of Sentinels can only see one Redis node. It's not safe to promote that node, because it doesn't have a majority and servers won't demote themselves when isolated, but the sentinels do it anyway. This scenario could be safely available to clients because a majority is present, but Redis Sentinel happily creates a split-brain and obliterates the data on the first node at some later time.

If you take Antirez' advice and colocate the sentinels with your clients, we can still get in to awful states. On the left, an uneven partition between clients and servers means we elect a minority Redis server as the primary, even though it can't replicate to any other nodes. The majority component of the servers can still accept writes, but they're doomed: when the clients are able to see those nodes again, they'll wipe out all the writes that took place on those 2 nodes.

On the right, we've got the same partition topology I demonstrated in the Redis post. Same deal: split brain means conflicting writes and throwing away data.

If you encounter intermittent or rolling partitions (which can happen in the event of congestion and network failover), shifting quorums coupled with the inability of servers to reason about their own cluster state could yield horrifying consequences, like every node being a primary at the same time. You might be able to destroy not only writes that took place during the partition, but all data ever written–not sure if the replication protocol allows this or if every node just shuts down.

Bottom line: if you're building a distributed system, you must measure connectivity in the distributed system itself, not by what you can see from the outside. Like we saw with MongoDB and Riak, it's not the wall-clock state that matters–it's the logical messages in the system. The further you get from those messages, the wider your windows for data loss.

## It's not just Sentinel

I assert that any system which uses asynchronous primary-secondary replication, and can change which node is the primary, is inconsistent. Why? If you write an operation to the primary, and then failover occurs before the operation is replicated to the node which is about to become the new primary, the new primary won't have that operation. If your replication strategy is to make secondaries look like the current primary, the system isn't just inconsistent, but can actually destroy acknowledged operations.

Here's a formal model of a simple system which maintains a log of operations. At any stage, one of three things can happen: we can write an operation to the primary, replicate the log of the primary to the secondary, or fail over:

------------------------------ MODULE failover ------------------------------ EXTENDS Naturals, Sequences, TLC CONSTANT Ops \* N1 and N2 are the list of writes made against each node VARIABLES n1, n2 \* The list of writes acknowledged to the client VARIABLE acks \* The current primary node VARIABLE primary \* The types we allow variables to take on TypeInvariant == /\ primary \in {1, 2} /\ n1 \in Seq(Ops) /\ n2 \in Seq(Ops) /\ acks \in Seq(Ops) \* An operation is acknowledged if it has an index somewhere in acks. IsAcked(op) == \E i \in DOMAIN acks : acks[i] = op \* The system is *consistent* if every acknowledged operation appears, \* in order, in the current primary's oplog: Consistency == acks = SelectSeq((IF primary = 1 THEN n1 ELSE n2), IsAcked) \* We'll say the system is *potentially consistent* if at least one node \* has a superset of our acknowledged writes in order. PotentialConsistency == \/ acks = SelectSeq(n1, IsAcked) \/ acks = SelectSeq(n2, IsAcked) \* To start out, all oplogs are empty, and the primary is n1. Init == /\ primary = 1 /\ n1 = <<>> /\ n2 = <<>> /\ acks = <<>> \* A client can send an operation to the primary. The write is immediately \* stored on the primary and acknowledged to the client. Write(op) == IF primary = 1 THEN /\ n1' = Append(n1, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n2, primary>> ELSE /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n1, primary>> \* For clarity, we'll have the client issues unique writes WriteSomething == \E op \in Ops : ~IsAcked(op) /\ Write(op) \* The primary can *replicate* its state by forcing another node \* into conformance with its oplog Replicate == IF primary = 1 THEN /\ n2' = n1 /\ UNCHANGED <<n1, acks, primary>> ELSE /\ n1' = n2 /\ UNCHANGED <<n2, acks, primary>> \* Or we can failover to a new primary. Failover == /\ IF primary = 1 THEN primary' = 2 ELSE primary = 1 /\ UNCHANGED <<n1, n2, acks>> \* At each step, we allow the system to either write, replicate, or fail over Next == \/ WriteSomething \/ Replicate \/ Failover

This is written in the TLA+ language for describing algorithms, which encodes a good subset of ZF axiomatic set theory with first-order logic and the Temporal Law of Actions. We can explore this specification with the TLC model checker, which takes our initial state and evolves it by executing every possible state transition until it hits an error:

This protocol is inconsistent. The fields in red show the state changes during each transition: in the third step, the primary is n2, but n2's oplog is empty, instead of containing the list <<2>>. In fact, this model fails the PotentiallyConsistent invariant shortly thereafter, if replication or a write occurs. We can also test for the total loss of writes; it fails that invariant too.

That doesn't mean primary-secondary failover systems must be inconsistent. You just have to ensure that writes are replicated before they're acknowledged:

\* We can recover consistency by making the write protocol synchronous SyncWrite(op) == /\ n1' = Append(n1, op) /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED primary \* This new state transition satisfies both consistency constraints SyncNext == \/ \E op \in Ops : SyncWrite(op) \/ Replicate \/ Failover

And in fact, we don't have to replicate to all nodes before ack to achieve consistency–we can get away with only writing to a quorum, if we're willing to use a more complex protocol like Paxos.

## The important bit

So you skimmed the proof; big deal, right? The important thing that it doesn't matter how you actually decide to do the failover: Sentinel, Mongo's gossip protocol, Heartbeat, Corosync, Byzantine Paxos, or a human being flipping the switch. Redis Sentinel happens to be more complicated than it needs to be, and it leaves much larger windows for write loss than it has to, but even if it were perfect the underlying Redis replication model is fundamentally inconsistent. We saw the same problem in MongoDB when we wrote with less than WriteConcern.MAJORITY. This affects asynchronous replication in MySQL and Postgres. It affects DRBD (yeaaaahhh, this can happen to your filesystem). If you use any of this software, you are building an asynchronous distributed system, and there are eventualities that have to be acknowledged.

Look guys, there's nothing new here. This is an old proof and many mature software projects (for instance, DRBD or RabbitMQ) explain the inconsistency and data-loss consequences of a partition in their documentation. However, not everyone knows. In fact, a good number of people seem shocked.

Why is this? I think it might be because software engineering is a really permeable field. You can start out learning Rails, and in two years wind up running four distributed databases by accident. Not everyone chose or could afford formal education, or was lucky enough to have a curmudgeonly mentor, or happened to read the right academic papers or find the right blogs. Now they might be using Redis as a lock server, or storing financial information in MongoDB. Is this dangerous? I honestly don't know. Depends on how they're using the system.

I don't view this so much as an engineering problem as a cultural one. Knives still come with sharp ends. Instruments are still hard for beginners to play. Not everything can or should be perfectly safe–or accessible. But I think we should warn people about what can happen, up front.

Tangentially: like many cultures, much of our collective understanding about what is desirable or achievable in distributed systems is driven by advertising. Yeah, MongoDB. That means you. ;-)

## Bottom line

I don't mean to be a downer about all this. Inconsistency and even full-out data loss aren't the end of the world. Asynchronous replication is a good deal faster, both in bulk throughput and client latencies. I just think we lose sight, occasionally, of what that means for our production systems. My goal in writing Jepsen has been to push folks to consider their consistency properties carefully, and to explain them clearly to others. I think that'll help us all build safer systems. :)

# Call me maybe: final thoughts

Previously in Jepsen, we discussed Riak. Now we'll review and integrate our findings.

We started this series with an open problem.

Notorious computer expert Joe Damato explains: “Literally no one knows.”

We've pushed the boundaries of our knowledge a little, though. By building a simple application which models a sequence of causally dependent writes, recording a log of that app's view of the world, and comparing that log to the final state of the database, we were able to verify–and challenge–our assumptions about the behavior of various distributed systems. In this talk we discussed one particular type of failure mode: a stable network partition which isolated one or more primary nodes–and explored its consequences in depth.

In each case, the system did something… odd. Maybe we hadn't fully thought through the consequences of the system, even if they were documented. Maybe the marketing or documentation were misleading, or flat-out lies. We saw design flaws, like the Redis Sentinel protocol. Some involved bugs, like MongoDB's WriteConcern.MAJORITY treating network errors as successful acknowledgements. Other times we uncovered operational caveats, like Riak's high latencies before setting up fallback vnodes. In each case, the unexpected behavior led to surprising new information about the challenge of building correct distributed systems.

In this series, we chose a simple network failure which we know happens to real production systems. The test encoded specific assumptions about concurrency, throughput, latency, timeout, error handling, and conflict resolution. The results demonstrate one point in a high-dimensional parameter space. The fraction of dropped writes in these Jepsen's demos can vary wildly for all these reasons, which means we can't make general assertions about how bad the possibility of write loss really is. Mongo could lose almost all your writes, or none at all. It completely depends on the nature of your network, application, server topology, hardware, load, and the failure itself.

To apply these findings to your systems–especially in fuzzy, probabilistic ways–you'll need to measure your assumptions about how your system behaves. Write an app that hits your API and records responses. Cause some failures and see whether the app's log of what happened lines up with the final state of the system. The results may be surprising.

Measurement isn't something you do just once. Ideally, your production systems should be instrumented continuously for performance and correctness. Some of these failure modes leave traces you can detect.

Some people claim that partitions don't happen to them. If you run in EC2 or other virtualized environments, noisy neighbors and network congestion/failures are a well-known problem. Running your own hardware doesn't make you immune either: Amazon, with some of the best datacenter engineers on the planet, considers partitions such a major problem that they were willing to design and build Dynamo. You are probably not Amazon.

Even if your network is reliable, logical failures can be partitions, too. Nodes which become so busy they fail to respond to heartbeats are a common cause of failover. Virtual machines can do all kinds of weird things to your network and clocks. Restoring from a backup can look like a partition resolving. These failures are hard to detect, so many people don't know they even occurred. You just… get slow for a while, or run across data corruption, weeks or years later, and wonder how what happened.

## Aiming for correctness

We've learned a bunch of practical lessons from these examples, and I'd like to condense them briefly:

Network errors mean “I don't know,” not “It failed.” Make the difference between success, failure, and indeterminacy explicit in your code and APIs. Consider extending consistency algorithms through the boundaries of your systems. Hand TCP clients ETags or vector clocks. Extend CRDTs to the browser itself.

Even well-known, widely deployed algorithms like two-phase commit have some caveats, like false negatives. SQL transactional consistency comes in several levels. You're probably not using the stronger ones, and if you are, your code needs to handle conflicts. It's not usually a big deal, but keep it on your mental checklist.

Certain problems are hard to solve well, like maintaining a single authoritative record of data with primary failover. Consistency is a property of your data, not of your nodes. Avoid systems which assume node consensus implies data consistency.

Wall clocks are only useful for ensuring responsiveness in the face of deadlock, and even then they're not a positive guarantee of correctness. Our clocks were completely synchronized in this demo and we still lost data. Even worse things can happen if a clock gets out of sync, or a node pauses for a while. Use logical clocks on your data. Distrust systems which rely on the system time, unless you're running GPS or atomic clocks on your nodes. Measure your clock skew anyway.

Avoid home-grown distributed algorithms. Where correctness matters, rely on techniques with a formal proof and review in the literature. There's a huge gulf between theoretically correct algorithm and living breathing software–especially with respect to latency–but a buggy implementation of a correct algorithm is typically better than a correct implementation of a terrible algorithm. Bugs you can fix. Designs are much harder to re-evaluate.

Choose the right design for your problem space. Some parts of your architecture demand consistency, and there is software for that. Other parts can sacrifice linearizability while remaining correct, like CRDTs. Sometimes you can afford to lose data entirely. There is often a tradeoff between performance and correctness: think, experiment, and find out.

Restricting your system with particular rules can make it easier to attain safety. Immutability is an incredibly useful property, and can be combined with a mutable CP data store for powerful hybrid systems. Use idempotent operations as much as possible: it enables all sorts of queuing and retry semantics. Go one step further, if practical, and use full CRDTs.

Preventing write loss in some weakly consistent databases, like Mongo, requires a significant latency tradeoff. It might be faster to just use Postgres. Sometimes buying ridiculously reliable network and power infrastructure is cheaper than scaling out. Sometimes not.

Replication between availability zones or between data centers is much more likely to fail than a rack or agg switch in your DC. Microsoft estimates their WAN links offer 99.5% availability, IIRC, and their LANs at 99.95%. Design your system accordingly.

## Embracing failure

All this analysis, measuring, and designing takes hard work. You may not have the money, experience, hardware, motivation, or time. Every system entails risk, and not quantifying that risk is a strategy in itself.

With that in mind, consider allowing your system to drop data. Spew data everywhere and repair it gradually with bulk processes. Garbage-collect structures instead of ensuring their correctness every time. Not everyone needs correct behavior right now. Some people don't ever need correct behavior. Look at the Facebook feed, or Twitter's DM light.

Code you can reason about is better than code you can't. Rely on libraries written and tested by other smart people to reduce the insane quantity of stuff you have to understand. If you don't get how to test that your merge function is associative, commutative, and idempotent, maybe you shouldn't be writing your own CRDTs just yet. Implementing two-phase commit on top of your database may be a warning sign.

Consistent, highly available systems are usually slow. There are proofs about the minimum number of network hops required to commit an operation in a CP system. You may want to trade correctness for performance for cost reasons, or to deliver a more responsive user experience.

I hope this work inspires you to test and improve your own distributed systems. The only reason I can talk about these mistakes is because I keep making them, over and over again. We're all in this together. Good luck. :)

http://github.com/aphyr/jepsen

## Thanks

Jepsen has consumed almost every hour of my life outside work for the last three months. I'm several hundred hours into the project now–and I couldn't have done it without the help and encouragement of friends and strangers.

My sincerest thanks to my fellow Boundary alumni Dietrich Featherston and Joe Damato for the conversations which sparked this whole endeavor. Salvatore Sanfilippo, Jordan West, Evan Vigil-McClanahan, Jared Rosoff, and Joseph Blomstedt were instrumental in helping me understand how these databases actually work. Stephen Strowes and someone whose name I've sadly forgotten helped me demonstrate partitions on a local cluster in the first place. My deepest appreciation to the Postgres team, the Redis project, 10Gen and Basho for their support, and for making cool databases available to everyone for free.

Sean Cribbs and Reid Draper clued me in to CRDTs and the problems of LWW. Tom Santero and Mark Phillips invited me to give this talk at RICON East. Jepsen wouldn't have existed without their encouragement, and I am continuously indebted to the pair. Zach Tellman, John Muellerleile, Josh O'Brien, Jared Morrow, and Ryan Zezeski helped refine my arguments and slides.

Hope I didn't forget anyone–if so, please drop me a line. Thanks for reading.

# Call me maybe: Riak

Previously in Jepsen, we discussed MongoDB. Today, we’ll see how last-write-wins in Riak can lead to unbounded data loss.

So far we’ve examined systems which aimed for the CP side of the CAP theorem, both with and without failover. We learned that primary-secondary failover is difficult to implement safely (though it can be done; see, for example, ZAB or Raft). Now I’d like to talk about a very different kind of database–one derived from Amazon’s Dynamo model.

Amazon designed Dynamo with the explicit goals of availability and partition tolerance–and partition-tolerant systems automatically handle node failure. It’s just a special kind of partition. In Dynamo, all nodes are equal participants in the cluster. A given object is identified by a key, which is consistently hashed into N slots (called “partitions”; not to be confused with a network partition) on a ring. Those N slots are claimed by N (hopefully distinct) nodes in the cluster, which means the system can, once data is replicated, tolerate up to N-1 node failures without losing data.

When a client reads from a Dynamo system, it specifys an R value: the number of nodes required to respond for a read to be successful. When it writes, it can specify W: the number of nodes which have to acknowledge the write. There’s also DW for “durable write”, and others. Riak has sometimes referred to these as “tunable CAP controls”: if you choose R=W=1, your system will be available even if all but one node fail–but you may not read the latest copy of data. If R + W is greater than N/2, you’re “guaranteed to read acknowledged writes”, with caveats. The defaults tend to be R=W=quorum, where quorum is N/2+1.

Dynamo handles partitions by healing the ring. Each connected set of machines establishes a set of fallback vnodes, to handle the portions of the ring which are no longer accessible. Once failover is complete, a Dynamo cluster split into two disjoint components will have two complete hash rings, and (eventually, as repair completes) 2 * N copies of the data (N in each component). When the partition heals, the fallback vnodes engage in hinted handoff, giving their data back to the original “primary” vnodes.

Since any node can accept writes for its portion of the keyspace, a Dynamo system can theoretically achieve 100% availability, even when the network fails entirely. This comes with two drawbacks. First, if no copy of a given object is available in an isolated set of nodes, that part of the cluster can accept writes for that object, but the first reads will return 404. If you’re adding items to a shopping cart and a partition occurs, your cart might appear to be empty. You could add an item to that empty cart, and it’d be stored, but depending on which side of the partition you talk to, you might see 20 items or just one.

When the partition heals, we have a new problem: it’s not clear which version of an object is authoritative. Dynamo employs a causality-tracing algorithm called vector clocks, which means it knows which copies of an object have been overwritten by updates, and which copies are actually conflicts–causally unconnected–due to concurrent writes.

Concurrent. We were talking about partitions, right? Two writes are concurrent if they happen in different components and can’t see each other’s changes, because the network didn’t let them communicate.

Well that’s interesting, because we’re also used to concurrency being a property of normal database systems. If two people read an object, then write it back with changes, those writes will also conflict. In a very real sense, partitions are just really big windows of concurrency. We often handle concurrent writes in relational databases with multi-version concurrency control or locks, but we can’t use locks here because the time horizons could be minutes or hours, and there’s no safe way to distribute a lock algorithm over a partition. We need a different approach. We need to be able to merge arbitrary conflicting objects for Dynamo to work. From the paper:

For instance, the application that maintains customer shopping carts can choose to “merge” the conflicting versions and return a single unified shopping cart. Despite this flexibility, some application developers may not want to write their own conflict resolution mechanisms and choose to push it down to the data store, which in turn chooses a simple policy such as “last write wins”.

Last write wins. That sounds like a timestamp. Didn’t we learn that Clocks Are Not To Be Trusted? Let’s try it and find out!

## Riak with last-write-wins

Riak is an excellent open-source adaptation of the Dynamo model. It includes a default conflict resolution mode of last-write-wins, which means that every write includes a timestamp, and when conflicts arise, it picks the one with the higher timestamp. If our clocks are perfectly synchronized, this ensures we pick the most recent value.

To be clear: there are actually two settings in Riak which affect conflict resolution: lww=true, which turns off vector clock analysis entirely, and allow-mult=false, which uses vector clocks but picks the sibling with the highest timestamp. Allow-mult=false is safer, and that’s the setting I’m referring to by “last write wins.” All cases of data loss in this post apply to both settings, though.

First, let’s install Riak, join the nodes together, and tell the cluster to commit the change:

salticid riak.setup salticid riak.join salticid riak.commit

You can watch the logs with salticid riak.tail. Watch salticid riak.transfers until there are no handoffs remaining. The cluster is now in a stable state.

For this particular application we’ll be adding numbers to a list stored in a single Riak object. This is a typical use case for Dynamo systems–the atomic units in the system are keys, not rows or columns. Let’s run the app with last-write-wins consistency:

lein run riak lww-sloppy-quorumWrites completed in 5.119 seconds 2000 total 2000 acknowledged 566 survivors 1434 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 1 2 3 4 6 8 ... 1990 1991 1992 1995 1996 1997 1.0 ack rate 0.717 loss rate

Riak lost 71% of acknowledged writes on a fully-connected, healthy cluster. No partitions. Why?

Remember how partitions and concurrency are essentially the same problem? Simultaneous writes are causally disconnected. If two clients write values which descend from the same object, Riak just picks the write with the higher timestamp, and throws away the other write. This is a classic data race, and we know how to fix those: just add a mutex. We’ll wrap all operations against Riak in a perfectly consistent, available distributed lock.

“But you can’t do that! That violates the CAP theorem!”

Clever girl. Jepsen lets us pretend, though:

lein run lock riak-lww-sloppy-quorumWrites completed in 21.475 seconds 2000 total 2000 acknowledged 2000 survivors All 2000 writes succeeded. :-D

Problem solved! No more write conflicts. Now let’s see how it behaves under a partition by running salticid jepsen.partition during a run:

237 :ok 242 :ok 247 :ok 252 :ok 257 :ok 262 timeout 85 :ok 204 timeout 203 timeout 106 :ok 209 timeout 267 timeout 90 :ok

The first thing you’ll notice is that our writes start to lag hard. Some clients are waiting to replicate a write to a majority of nodes, but one side of the partition doesn’t have a majority available. Even though Riak is an AP design, it can functionally become unavailable while nodes are timing out.

Those requests time out until Riak determines those nodes are inaccessible, and sets up fallback vnodes. Once the fallback vnodes are in place, writes proceed on both sides of the cluster, because both sides have a majority of vnodes available. This is by design in Dynamo. Allowing both components to see a majority is called a sloppy quorum, and it allows both components to continue writing data with full multi-node durability guarantees. If we didn’t set up fallback vnodes, a single node failure could destroy our data.

Before collecting results, let’s heal the cluster: salticid jepsen.heal. Remember to wait for Riak to recover, by waiting until salticid riak.transfers says there’s no data left to hand off.

Writes completed in 92.773 seconds 2000 total 1985 acknowledged 176 survivors 1815 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 85 90 95 100 105 106 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー｀)ノ (203 204 218 234 262 277) 0.9925 ack rate 0.91435766 loss rate 0.00302267 unacknowledged but successful rate

91% data lost. This is fucking catastrophic, ladies.

What happened? When the partition healed, Riak had two essentially two versions of the list: one from each side of the partition (plus some minorly divergent copies on each side). Last-write-wins means we pick the one with the higher timestamp. No matter what you do, all the writes from one side or the other will be discarded.

If your Riak cluster partitions, and you write to a node which can’t reach any of the original copies of the data, that write of a fresh object can overwrite the original record–destroying all the original data.

## Strict quorum

The problem is that we allowed writes to proceed on both sides of the partition. Riak has two more settings for reads and writes: PR and PW, for primary read and write, respectively. PR means you have to read a value from at least that many of the original owners of a key: fallback vnodes don’t count. If we set PR + PW >= quorum, operations against a given key will only be able to proceed on one component of a partitioned cluster. That’s a CP system, right?

lein run lock riak-lww-quorum274 :ok 1250 :ok 279 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pw_val_unsatisfied,2,1} 1381 :ok 277 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pr_val_unsatisfied,2,1}

Here we see the cluster denying a write and a read, respectively, to clients which can’t see a majority of the primary nodes for a key. Note that because the quorums are spread around the nodes, a Dynamo system will be partially available in this mode. In any given component, you’ll be able to read and write some fraction of the keys, but not others.

2000 total 1971 acknowledged 170 survivors 1807 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 86 91 95 96 100 101 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー｀)ノ (193 208 219 237 249 252) 0.9855 ack rate 0.9167935 loss rate 0.00304414 unacknowledged but successful rate

PR=PW=R=W=quorum still allowed 92% write loss. We reported failure for more writes than before, so that’s a start–but what gives? Shouldn’t this have been CP?

The problem is that that failed writes may still be partially successful. Dynamo is designed to preserve writes as much as possible. Even though a node might return “PW val unsatisfied” when it can’t replicate to the primary vnodes for a key, it may have been able to write to one primary vnode–or any number of fallback vnodes. Those values will still be exchanged during read-repair, considered as conflicts, and the timestamp used to discard the older value–meaning all writes from one side of the cluster.

This means the minority component’s failing writes can destroy all of the majority component’s successful writes. Repeat after me: Clocks. Are. Evil.

Is there no hope? Is there anything we can do to preserve my writes in Riak?

Yes. We can use CRDTs.

If we enable allow-mult in Riak, the vector clock algorithms will present both versions to the client. We can combine those objects together using a merge function. If the merge function is associative, commutative, and idempotent over that type of object, we can guarantee that it always converges to the same value regardless of the order of writes. If the merge function doesn’t discard data (like last-write-wins does), then it will preserve writes from both sides.

In this case, we’re accumulating a set of numbers. We can use set union as our merge function, or 2P sets, or OR sets, if we need to remove numbers.

lein run riak-crdtWrites completed in 80.918 seconds 2000 total 1948 acknowledged 2000 survivors All 2000 writes succeeded. :-D

CRDTs preserve 100% of our writes. We still have false negatives in this demo, because the client timed out on a few writes which Riak was still propagating, when the partition first began. False negatives are OK, though, because state-based CRDTs are idempotent. We can repeat our writes arbitrarily many times, in any order, without duplicating data.

Moreover, CRDTs are an AP design: we can write safely and consistently even when the cluster is totally partitioned–for example, when no majority exists. They’re also eventually consistent (in a safe, data-preserving sense) when components are partitioned away from all copies of a given object and are forced to start from scratch.

## Strategies for working with Riak

Enable allow-mult. Use CRDTs.

Seriously. LWW never should have been the standard behavior for a Dynamo system, but Basho made it the default after customers complained that they didn’t like the complexity of reasoning about siblings. Customers are the only reason Riak exists, and this behavior is gonna seem OK until you start experiencing partitions (and remember, fault tolerance is the reason you chose Riak in the first place), so we’re stuck with a default config which promotes simple-yet-dangerous behavior.

As a consequence of that decision, community resources which people rely on to learn how to use Riak are often aimed towards last-write-wins. Software isn’t just an artifact, but a culture around its use. I don’t really know what we can learn from this, besides the fact that engineering and culture are tough problems.

CRDTs may be too large, too complex, or too difficult to garbage-collect for your use case. However, even if you can’t structure your data as a full CRDT, writing a hacked-together merge function which just takes care of a couple important fields (say, set union over your friend list and logical OR over the other fields) can go a long way towards preventing catastrophic data loss.

There are cases where last-write-wins is a safe strategy. If your data is immutable, then it doesn’t matter which copy you choose. If your writes mean “I know the full correct state of this object at this time”, it’s safe. Many caches and backup systems look like this. If, however, your writes mean “I am changing something I read earlier,” then LWW is unsafe.

Finally, you can decide to accept dropped data. All databases will fail, in different ways, and with varying probabilities. Riak’s probability distribution might be OK for you.

Introducing locks is a bad idea. Even if they did prevent data loss–and as we saw, they don’t–you’ll impose a big latency cost. Moreover, locks restrict your system to being CP, so there’s little advantage to having an AP database. However, some really smart folks at Basho are working on adding Paxos rounds for writes which need to be CP. Having a real consensus protocol will allow Riak’s distributed writes to be truly atomic.

So: we’ve seen that Riak’s last-write-wins is fundamentally unsafe in the presence of network partitions. You can lose not only writes made during the partition, but all writes made at any time prior. Riak is an AP system, and its tunable CAP controls only allow you to detect some forms of write loss–not prevent it. You can’t add consistency to a database by tacking on a lock service because wall clock time doesn’t matter: consistency is a causal property of the relationships between the writes themselves. AP systems involve fundamentally different kinds of data structures, with their own unique tradeoffs.

In the next post, we’ll review what we’ve learned from these four distributed systems, and where we go from here.

# Call me maybe: MongoDB

Previously in Jepsen, we discussed Redis. In this post, we'll see MongoDB drop a phenomenal amount of data.

MongoDB is a document-oriented database with a similar distribution design to Redis. In a replica set, there exists a single writable primary node which accepts writes, and asynchronously replicates those writes as an oplog to N secondaries. However, there are a few key differences.

First, Mongo builds in its leader election and replicated state machine. There's no separate system which tries to observe a replica set in order to make decisions about what it should do. The replica set decides among itself which node should be primary, when to step down, how to replicate, etc. This is operationally simpler and eliminates whole classes of topology problems.

Second, Mongo allows you to ask that the primary confirm successful replication of a write by its disk log, or by secondary nodes. At the cost of latency, we can get stronger guarantees about whether or not a write was successful.

What happens when a primary becomes inaccessible?

The remaining secondaries will gradually detect the failed connection and attempt to come to a consensus about what to do. If they have a majority (and remember, there can be only one majority in a cluster, so this suggests we're heading towards a CP system), they'll select the node with the highest optime (a monotonic clock maintained by each node) and promote it to be a new primary. Simultaneously, the minority nodes will detect that they no longer have a quorum, and demote the primary to a secondary so it can't accept writes.

If our primary is on n1, and we cut off n1 and n2 from the rest of the cluster, we expect either n3, n4, or n5 to become the new primary. Because this architecture demotes the original primary on n1, we won't find ourselves in the same split-brain problem we saw with Redis.

## Consistency

So is MongoDB CP? There's a popular notion that MongoDB is a CP system, including exchanges like this, where all kinds of nuanced technical assertions about strong consistency are thrown around. At the same time, Mongo's documentation for replica sets explains carefully that Mongo may “revert operations”:

In some failover situations primaries will have accepted write operations that have not replicated to the secondaries after a failover occurs. This case is rare and typically occurs as a result of a network partition with replication lag. When this member (the former primary) rejoins the replica set and attempts to continue replication as a secondary the former primary must revert these operations or “roll back” these operations to maintain database consistency across the replica set.

“Revert” certainly doesn't sound like linearizability to me, but that bit about “maintain[ing] database consistency” doesn't sound so bad. What actually happens? Let's find out!

For this example, we'll be adding integers to a list in a MongoDB document by using the update command in a CaS loop–just like you'd use with any transactionally isolated database. Yes, we could use $addInSet, but I'm using this app as an example of atomic updates in general, and they have different oplog dynamics. ## Unacknowledged Up until recently, clients for MongoDB didn't bother to check whether or not their writes succeeded, by default: they just sent them and assumed everything went fine. This goes about as well as you'd expect. lein run mongo-unsafe -n 6000 salticid jepsen.partition For a while, writes continue to complete against n1. Then we see errors as the replica set fails over, like 3186 No replica set members available in [ { address:'n3/10.10.3.101:27017', ok:true, ping:0.8954104, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n4/10.10.3.95:27017', ok:true, ping:0.681164, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n5/10.10.3.32:27017', ok:true, ping:0.6231328, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n2/10.10.3.52:27017', ok:true, ping:0.51316977, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n1/10.10.3.242:27017', ok:true, ping:0.37008655, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, } ] for { "mode" : "primary"} During this time, the majority nodes (n3, n4, n5) are still secondaries, but they've agreed that the old primary is inaccessible. They compare optimes and race to elect a leader: $ salticid mongo.rs_stat 22:09:08 Starting... 22:09:08 MongoDB shell version: 2.4.1 22:09:08 connecting to: test 22:09:08 n1:27017 (not reachable/healthy) 1368940104/56 22:09:08 n2:27017 (not reachable/healthy) 1368940103/458 22:09:08 n3:27017 SECONDARY 1368940104/89 22:09:08 n4:27017 SECONDARY 1368940104/89 22:09:08 n5:27017 SECONDARY 1368940104/102 22:09:08 true 22:09:08 Finished22:09:23 n1:27017 (not reachable/healthy) 1368941926/66 22:09:23 n2:27017 (not reachable/healthy) 1368941961/70 22:09:23 n3:27017 SECONDARY 1368941962/9 22:09:23 n4:27017 SECONDARY 1368941961/45 22:09:23 n5:27017 PRIMARY 1368941963/11

N5 wins the race, and proceeds to accept writes. If we heal the partition with salticid jepsen.heal, and wait a few seconds, the nodes will detect the fully connected cluster and the new primary will step down, to allow n1 to resume its place. Now that the cluster has stabilized, we hit enter to check how many of our writes survived:

Hit enter when ready to collect results. Writes completed in 93.608 seconds 6000 total 5700 acknowledged 3319 survivors 2381 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 469 474 479 484 489 494 ... 3166 3168 3171 3173 3178 3183 0.95 ack rate 0.4177193 loss rate 0.0 unacknowledged but successful rate

42% write loss. Well, to some extent, this shouldn't be surprising, because we weren't checking to see whether the server was successful in applying our writes. Those 300 errors only came about when we tried to write to a secondary. But we never actually crashed a node, and we didn't see any signs of a split-brain condition with two simultaneous primaries–so why did Mongo drop data?

Remember those writes that completed on n1 just after the partition started? Those writes are still on n1, but never made it to n5. N5 proceeded without them. Now n1 and n5 are comparing notes, and n1 realizes that n5's optime is higher. N1 figures out the last point where the two agreed on the oplog, and rolls back to that point.

22:09:33 Sun May 19 05:09:33.032 [rsHealthPoll] replSet member n5:27017 is now in state PRIMARY 22:09:33 Sun May 19 05:09:33.207 [initandlisten] connection accepted from 10.10.3.95:37718 #6154 (23 connections now open) 22:09:33 Sun May 19 05:09:33.417 [rsBackgroundSync] replSet syncing to: n5:27017 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet our last op time fetched: May 19 05:08:37:2 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replset source's GTE: May 19 05:09:26:1 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet rollback 0 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet ROLLBACK 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet rollback 1 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet rollback 2 FindCommonPoint 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback our last optime: May 19 05:08:37:2 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback their last optime: May 19 05:09:33:32 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback diff in end of log times: -56 seconds 22:09:35 Sun May 19 05:09:33.621 [initandlisten] connection accepted from 10.10.3.32:59066 #6155 (24 connections now open) 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet rollback found matching events at May 19 05:08:24:66 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet rollback findcommonpoint scanned : 3798 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet replSet rollback 3 fixup 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 3.5 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 4 n:1 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet minvalid=May 19 05:09:35 51985e8f:19 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 4.6 22:09:35 Sun May 19 05:09:35.223 [rsBackgroundSync] replSet rollback 4.7 22:09:35 Sun May 19 05:09:35.223 [rsBackgroundSync] replSet rollback 5 d:0 u:1 22:09:35 Sun May 19 05:09:35.224 [rsBackgroundSync] replSet rollback 6 22:09:35 Sun May 19 05:09:35.236 [rsBackgroundSync] replSet rollback 7 22:09:35 Sun May 19 05:09:35.238 [rsBackgroundSync] replSet rollback done 22:09:35 Sun May 19 05:09:35.238 [rsBackgroundSync] replSet RECOVERING

During a rollback, all the writes the old primary accepted after the common point in the oplog are removed from the database and written to a BSON file in Mongo's rollbacks directory. If you're a sysadmin, you could go look at the rollback files to try and reconstruct the writes that the database dropped.

Well, theoretically. In my tests, it only does this in 1 out of 5 runs or so. Mostly, it just throws those writes away entirely: no rollback files, no nothing. I don't really know why.

This leads to an important discovery: it doesn't matter whether or not there were two primaries at the same time. We can still get conflicting writes if the old primary's state is causally unconnected from the new primary. A primary/secondary system, by itself, is not sufficient. We have to actually track causality on the writes themselves in order to be CP. Otherwise, newly elected primaries could diverge from the old one.

## Safe

Aha! But that was with the old “unsafe” write concern! We should use the Safe write concern!

lein run mongo-safe -n 6000 ... 6000 total 5900 acknowledged 3692 survivors 2208 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 458 463 468 473 478 483 ... 3075 3080 3085 3090 3095 3100 0.98333335 ack rate 0.3742373 loss rate 0.0 unacknowledged but successful rate

## Replicas-safe

WriteConcern.SAFE only verifies that the write was accepted by the primary. We need to make sure that the replicas have received our write before considering it a success.

lein run mongo-replicas-safe -n 6000 ... 6000 total 5695 acknowledged 3768 survivors 1927 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 712 717 722 727 732 737 ... 2794 2799 2804 2809 2814 2819 0.94916666 ack rate 0.338367 loss rate 0.0 unacknowledged but successful rate

Mongo still rolled back our writes. Why? Because REPLICAS_SAFE only checks to see if the write took place against two replicas. Our cluster has five nodes, so it's possible for writes to exist only on n1 and n2. A new primary can be elected without having seen our write. We need to wait until our write has been acknowledged by a majority of nodes.

## Majority

lein run mongo -n 6000

Using WriteConcern.MAJORITY, we notice an improvement! When we cause the partition, writes pause immediately. The clients are blocked, waiting for the primary to confirm acknowledgement on nodes which will never respond. Eventually they time out. This is a hallmark of a CP system: we shouldn't be able to make progress without talking to a majority of nodes.

Writes completed in 157.425 seconds 6000 total 5700 acknowledged 5701 survivors 2 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ (596 598) 3 unacknowledged writes found! ヽ(´ー｀)ノ (562 653 3818) 0.95 ack rate 1.754386E-4 loss rate 5.2631577E-4 unacknowledged but successful rate

So 3 writes which supposedly failed actually succeeded. That's not so bad. On the other hand, Mongo still dropped two “successful” writes. Writes which were supposedly acknowledged by a majority of nodes.

I've been talking with 10gen, and they think this is a bug. When the network partitions, the server just checks off the “OK” field for the client's WriteConcern request, and sends it back. The client sees the “OK” message and… sensibly presumes the write was OK. This should be fixed in master, but is still present in 2.4.3, the most recent release.

Even if this bug is fixed, Mongo still isn't consistent. Those three writes which “failed” but showed up in the result set? Those are writes which were replicated to a majority node just prior to the partition, but never had the chance to acknowledge. Single writes are not atomic without a proper consensus protocol: those failed writes could materialize never, now, or some time in the future; potentially overwriting valid data.

## Strategies for working with Mongo

On the one hand, Mongo advocates usually tell me “but network partitions are exceedingly rare in practice.” Then I talk to Mongo users who report their cluster fails over on a weekly basis. One thing to keep in mind is that heavy load–like seasonal writes, recovering from a crash, or performing a rollback–can slow a node down to the point where other nodes declare it dead. This is a partition. I've seen my test cluster perform dozens of rollbacks as nodes go unavailable attempting to elect a new primary. You should probably instrument your cluster to watch for these events in production.

As we've discussed before, one option is simply to accept data loss. Not all applications need consistency.

At the same time, you should watch those rollback files. Sometimes they don't appear even though they're supposed to, and not all data types will actually be rolled back. Conflicts in capped collections, for example, appear to simply discard all data in the collection past the conflict point by design.

People use capped collections for distributed queues. Think about that for a minute.

Moreover, a rollback file doesn't give you enough information to actually reconstruct the correct state of the system–at least in general. It's just a snapshot of “some state” the database had to discard. Because there's no well-defined ordering for these writes, you'll have to decide what that means for your particular data structures. If you can structure your documents as CRDTs and write a merge function, you'll be able to safely merge. If there's no conflicting copy of the document in the database, and you never delete those kinds of documents, you can restore it automatically. Immutable records can always be recovered, too.

Finally, you can drastically reduce the probability of write loss by using WriteConcern.MAJORITY. This is gonna impose a big performance hit. That's another hallmark of more-available CP systems.

To recap: MongoDB is neither AP nor CP. The defaults can cause significant loss of acknowledged writes. The strongest consistency offered has bugs which cause false acknowledgements, and even if they're fixed, doesn't prevent false failures.

In the next post, we'll talk about a database which emphasizes availability and partition tolerance: Riak.

# Call me maybe: Redis

Previously on Jepsen, we explored two-phase commit in Postgres. In this post, we demonstrate Redis losing 56% of writes during a partition.

Redis is a fantastic data structure server, typically deployed as a shared heap. It provides fast access to strings, lists, sets, maps, and other structures with a simple text protocol. Since it runs on a single server, and that server is single-threaded, it offers linearizable consistency by default: all operations happen in a single, well-defined order. There’s also support for basic transactions, which are atomic and isolated from one another.

Because of this easy-to-understand consistency model, many users treat Redis as a message queue, lock service, session store, or even their primary database. Redis running on a single server is a CP system, so it is consistent for these purposes.

Redis offers asynchronous primary->secondary replication. A single server is chosen as the primary, which can accept writes. It relays its state changes to secondary servers, which follow along. Asynchronous means that you don’t have to wait for a write to be replicated before the primary returns a response to the client. Writes will eventually arrive on the secondaries, if we wait long enough. In our application, all 5 clients will read from the primary on n1, and n2–n5 will be secondaries.

This is still a CP system, so long as we never read from the secondaries. If you do read from the secondaries, it’s possible to read stale data. That’s just fine for something like a cache! However, if you read data from a secondary, then write it to the primary, you could inadvertently destroy writes which completed but weren’t yet replicated to the secondaries.

What happens if the primary fails? We need to promote one of the secondary servers to a new primary. One option is to use Heartbeat or a STONITH system which keeps a link open between two servers, but if the network partitions we don’t have any way to tell whether the other side is alive or not. If we don’t promote the primary, there could be no active servers. If we do promote the primary, there could be two active servers. We need more nodes.

If one connected component of the network contains a majority (more than N/2) of nodes, we call it a quorum. We’re guaranteed that at most one quorum exists at any point in time–so if a majority of nodes can see each other, they know that they’re the only component in that state. That group of nodes (also termed a “component”) has the authority to promote a new primary.

Redis has a system called Sentinel, which, when configured correctly, will try to establish a quorum between Sentinel nodes, agree on which Redis servers are alive, and promote any which appear to have failed. If we colocate the Sentinel nodes with the Redis nodes, this should allow us to promote a new primary in the majority component (should one exist).

What are the consistency and availability properties of Sentinel? Antirez, the author of Redis, says:

Redis Cluster for instance is a system biased towards consistency rather than availability. Redis Sentinel itself is an HA solution with the dogma of consistency and master slave setups.“

So we expect this system to be CP. Nodes in the minority component will become unavailable during the partition, and the majority component will elect a new primary. The Sentinels will then order clients to abandon the old primary and reconnect to the new one.

Before we begin, it’s important to recognize that Redis does not guarantee durability. Since writes to disk and replication to secondaries are asynchronous, we can lose up to N seconds of the most recent writes. We should not, however, see gaps in the write log. If write n is present, so are writes 0, 1, … n-2, n-1.

## Partitioning the cluster

Here’s a simple application which writes a list of numbers to a Redis set. At this time Carmine, the Clojure Redis client, doesn’t yet support failover using Sentinel. I’ve implemented a stricter version of the Sentinel client algorithm here: asking the server for a new primary before every write. Sentinel actually states that clients should only select new primaries when their connection is closed, which leaves a wider window for clients to disagree about which primary to use–leading to the possibility of more conflicting writes.

Let’s give it a shot. First, set up Redis:

salticid redis.setup

Then, in two terminals, start up Redis and Redis Sentinel:

salticid redis.startsalticid redis.sentinel

You should see messages go by as the sentinels discover one another and ensure all the nodes are properly configured. You can check the replication status with salticid redis.replication. salticid redis.stop will shut down the Redis servers and sentinels alike.

Now let’s run our application with lein run redis, then partition nodes n1 and n2 away from n3, n4, and n5 by running salticid jepsen.partition.

376 :ok 378 :ok 382 :ok 384 :ok 380 :ok 381 :ok 383 :ok 389 :ok 385 :ok

The first thing you’ll notice is that even though n1 can’t possibly be replicating its writes to n3, n4, and n5, writes against it are still completing successfully. N1 still thinks it’s the primary, and since replication is asynchronous, it’s acknowledging writes before they’re sent to others in the cluster. The sentinels notice the failure, and n3, n4, and n5’s sentinels promote a new primary:

19 May 00:37:36.314 # +sdown master mymaster 10.10.3.242 6379 19 May 00:37:36.616 # +sdown slave 10.10.3.52:6379 10.10.3.52 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.52:26379 10.10.3.52 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.242:26379 10.10.3.242 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:37.521 # +odown master mymaster 10.10.3.242 6379 #quorum 3/3 19 May 00:37:48.041 # +failover-detected master mymaster 10.10.3.242 6379 19 May 00:37:48.142 * +slave-reconf-inprog slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:48.143 * +slave-reconf-inprog slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.243 # +failover-end master mymaster 10.10.3.242 6379

Now n5 is a new primary–but n1 is still a primary too! Run salticid redis.replication to see the replication status of all nodes. We have two primary nodes, one in each component of the system. During this time both primaries are accepting writes independently. This is a classic split-brain scenario–and it violates the C in CP. Writes (and reads) in this state are not linearizable, because clients will see different results based on which node they’re talking to.

## Healing the partition

What happens when the network comes back online? salticid jepsen.heal repairs the partition, and the Sentinel nodes will discover each other again.

Redis Sentinel used to leave both primaries running indefinitely, which meant that any scenario like a partition or crash leading to failover would result in permanent split-brain. That’s fixed in version 2.6.13, which came out last week. Now, Sentinel demotes the old primary on n1 when it comes back into contact with the majority component. The client sees:

1687 :ok 1686 READONLY You can't write against a read only slave. 1690 READONLY You can't write against a read only slave. 1693 :ok

… since n1 stepped down just after a Sentinel told us it was a primary. Clients are a part of the distributed system too. If a system’s correctness depends on clients choosing specific nodes at specific times, the clients are now engaged in a distributed consensus problem–not to mention a clock synchronization problem. This is damn hard to do correctly.

## Results

1991 :ok 1995 :ok 1996 :ok Hit enter when ready to collect results. Writes completed in 42.002 seconds 2000 total 1998 acknowledged 872 survivors 1126 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 50 51 52 53 54 55 ... 1671 1675 1676 1680 1681 1685 0.999 ack rate 0.5635636 loss rate 0.0 unacknowledged but successful rate

Out of 2000 writes, Redis claimed that 1998 of them completed successfully. However, only 872 of those integers were present in the final set. Redis threw away 56% of the writes it told us succeeded.

There are two problems at work here. First, notice that all the clients lost writes at the beginning of the partition: (50, 51, 52, 53, …). That’s because they were all writing to n1 when the network dropped–and since n1 was demoted later, any writes made during that window were destroyed.

The second problem was caused by split-brain: both n1 and n5 were primaries up until the partition healed. Depending on which node they were talking to, some clients might have their writes survive, and others have their writes lost. The last few numbers in the set, mod 5, are all 0 and 1: the clients which kept using n1 as a primary, in the minority partition.

Note that both of these failure modes violate the durability guarantees we claimed earlier for Redis, because there are gaps in the write log.

## Redis strategies

So you’re running a distributed Redis install, and have realized that the design of Redis Sentinel (or, for that matter, any other failover system on top of an asynchronously replicated primary-secondary design) means you can lose a lot of data when a partition occurs. What can you do?

From an operations perspective, I recommend you try to understand the Sentinel consensus algorithm. I don’t, and I’ve read it a dozen times.

I tried to write a formal verification of the algorithm in TLA+, and failed. There are dozens of interacting rules which can lead to phenomenally complex edge cases. The whole thing relies on clocks–and a special mode, TILT, which tries to detect sudden clock skew. You can specify a quorum which is smaller than the number of sentinels, allowing multiple quorums to operate simultaneously. Since the system auto-discovers peers, you’ve got to make sure nobody lets a new sentinel talk to your cluster, or you might find yourself with a quorum smaller than N/2. Client, sentinel, and Redis server topologies are all different things, which (I think) means…

• Sentinels could promote a node no clients can see
• Sentinels could demote the only node clients can actually reach
• Sentinels could assume a totally connected group of servers is unavailable
• Sentinels could promote an isolated node in a minority component, then destroy data on the majority by demoting their primary later

I (tentatively) recommend running exactly one sentinel on each server node, to force server and sentinel network topologies to align. Unless the partition doesn’t happen in the network, but somewhere upwards of layer 3. Let’s not talk about that possibility.

As an application developer working with Redis, one option is simply to estimate and accept your data loss. Not all applications have to be consistent. Microsoft estimates their WAN links have about 99.5% availability, and their datacenter networks are about 10x more reliable, going down for about 4 hours per year. Not all network failures result in this kind of partition. If you’re running good network hardware in redundant configurations in real datacenters (e.g. not EC2), you cut your probability of partition down pretty far. Plenty of important applications can tolerate data loss for a few hours a year.

If you can’t tolerate data loss, Redis Sentinel (and by extension Redis Cluster) is not safe for use as:

• A lock service
• A queue
• A database

If you use Redis as a lock service, this type of partition means you can take out the same lock twice–or up to N times for N nodes! Or maybe multiple times concurrently, against the same node, if you want to get weird about it. Write loss means locks can be resurrected from the dead, or vanish even when supposedly held. Bottom line: distributed lock services must be CP. Use a CP consensus system, like Zookeeper.

If you use Redis as a queue, it can drop enqueued items. However, it can also re-enqueue items which were removed. An item might be delivered zero, one, two, or more times. Most distributed queue services can provide reliable at-most-once or at-least-once delivery. CP queue systems can provide reliable exactly-once delivery with higher latency costs. Use them if message delivery is important.

If you use Redis as a database, be prepared for clients to disagree about the state of the system. Batch operations will still be atomic (I think), but you’ll have no inter-write linearizability, which almost all applications implicitly rely on. If you successfully write A, then B, you expect that any client which can see B can also see A. This is not the case. Be prepared for massive write loss during a partition, depending on client, server, and sentinel topology.

Because Redis does not have a consensus protocol for writes, it can’t be CP. Because it relies on quorums to promote secondaries, it can’t be AP. What it can be is fast, and that’s an excellent property for a weakly consistent best-effort service, like a cache. Redis Sentinel can do a great job of keeping your caches warm even in the face of network and node failure, and helping clients to gradually discover the correct nodes to interact with. Use Redis Sentinel for caching, sampling, statistics, and messaging where getting the wrong answer doesn’t hurt much. Occasional windows of 50% write loss may be just fine for your user feeds, hit counters, or upvotes.

In the next post, we’ll learn about a database with a related replication architecture: MongoDB.

# Call me maybe: Postgres

Previously on Jepsen, we introduced the problem of network partitions. Here, we demonstrate that a few transactions which “fail” during the start of a partition may have actually succeeded.

Postgresql is a terrific open-source relational database. It offers a variety of consistency guarantees, from read uncommitted to serializable. Because Postgres only accepts writes on a single primary node, we think of it as a CP system in the sense of the CAP theorem. If a partition occurs and you can’t talk to the server, the system is unavailable. Because transactions are ACID, we’re always consistent.

Right?

Well… almost. Even though the Postgres server is always consistent, the distributed system composed of the server and client together may not be consistent. It’s possible for the client and server to disagree about whether or not a transaction took place.

Postgres' commit protocol, like most relational databases, is a special case of two-phase commit, or 2PC. In the first phase, the client votes to commit (or abort) the current transaction, and sends that message to the server. The server checks to see whether its consistency constraints allow the transaction to proceed, and if so, it votes to commit. It writes the transaction to storage and informs the client that the commit has taken place (or failed, as the case may be.) Now both the client and server agree on the outcome of the transaction.

What happens if the message acknowledging the commit is dropped before the client receives it? Then the client does’t know whether the commit succeeded or not! The 2PC protocol says that we must wait for the acknowledgement message to arrive in order to decide the outcome. If it doesn’t arrive, 2PC deadlocks. It’s not a partition-tolerant protocol. Waiting forever isn’t realistic for real systems, so at some point the client will time out and declare an error occurred. The commit protocol is now in an indeterminate state.

To demonstrate this, we’ll need an install of Postgres to work with.

salticid postgres.setup

This installs Postgres from apt, uploads some config files from jepsen/salticid/postgres, and creates a database for Jepsen. Then we’ll run a simple application which writes a single row for each number, inside a transaction.

cd salticid lein run pg -n 100

If all goes well, you’ll see something like

... 85 :ok 91 :ok 90 :ok 95 :ok 96 :ok Hit enter when ready to collect results. Writes completed in 0.317 seconds 100 total 100 acknowledged 100 survivors All 100 writes succeeded. :-D

Each line shows the number being written, followed by whether it was OK or not. In this example, all five nodes talk to a single postgres server on n1. Out of 100 writes, the clients reported that all 100 succeeded–and at the end of the test, all 100 numbers were present in the result set.

Now let’s cause a partition. Since this failure mode only arises when the connection drops after the server decides to acknowledge, but before the client receives it, there’s only a short window in which to begin the partition. We can widen that window by slowing down the network:

salticid jepsen.slow

Now, we start the test:

lein run pg

And while it’s running, cut off all postgres traffic to and from n1:

salticid jepsen.drop_pg

If we’re lucky, we’ll manage to catch one of those acknowledgement packets in flight, and the client will log an error like:

217 An I/O error occurred while sending to the backend. Failure to execute query with SQL: INSERT INTO "set_app" ("element") VALUES (?) :: [219] PSQLException: Message: An I/O error occured while sending to the backend. SQLState: 08006 Error Code: 0 218 An I/O error occured while sending to the backend.

After that, new transactions will just time out; the client will correctly log these as failures:

220 Connection attempt timed out. 222 Connection attempt timed out.

We can resolve the partition with salticid jepsen.heal, and wait for the test to complete.

1000 total 950 acknowledged 952 survivors 2 unacknowledged writes found! ヽ(´ー｀)ノ (215 218) 0.95 ack rate 0.0 loss rate 0.002105263 unacknowledged but successful rate

So out of 1000 attempted writes, 950 were successfully acknowledged, and all 950 of those writes were present in the result set. However, two writes (215 and 218) succeeded, even though they threw an exception claiming that a failure occurred! Note that this exception doesn’t guarantee that the write succeeded or failed: 217 also threw an I/O error while sending, but because the connection dropped before the client’s commit message arrived at the server, the transaction never took place.

There is no way to distinguish these cases from the client. A network partition–and indeed, most network errors–doesn’t mean a failure. It means the absence of information. Without a partition-tolerant commit protocol, like extended three-phase commit, we cannot assert the state of the system for these writes.

## 2PC strategies

Two-phase commit protocols aren’t just for relational databases. They crop up in all sorts of consensus problems. Mongodb’s documents essentially comprise an asynchronous network, and many users implement 2PC on top of their Mongo objects to obtain multi-key transactions.

If you’re working with two-phase commit, there are a few things you can do. One is to accept false negatives. In most relational databases, the probability of this failure occurring is low–and it can only affect writes which were in-flight at the time the partition began. It may be perfectly acceptable to return failures to clients even if there’s a small chance the transaction succeeded.

Alternatively, you can use consistency guarantees or other data structures to allow for idempotent operations. When you encounter a network error, just retry them blindly. A highly available queue with at-least-once delivery is a great place to put repeatable writes which need to be retried later.

Finally, within some databases you can obtain strong consistency by taking note of the current transaction ID, and writing that ID to the database during the transaction. When the partition is resolved, the client can either retry or cancel the transaction at a later time, by checking whether or not that transaction ID was written. Again, this relies on having some sort of storage suitable for the timescales of the partition: perhaps a local log on disk, or an at-least-once queue.

In the next post, we look at a very different kind of consistency model: Redis Sentinel.

# Call me maybe: Carly Rae Jepsen and the perils of network partitions

Carly Rae Jepsen may be singing about the cute guy next door, but she's also telling a story about the struggle to communicate with someone who doesn't even know you're alive. The suspense of observation: did he see me? Did he see me see him? The risks of speaking your mind and being shot down–or worse, ignored. The fundamental unknowability of The Other, as Lacan would have it. In short, this is a song about distributed systems.

Modern software systems are composed of dozens of components which communicate over an asynchronous, unreliable network. Understanding the reliability of a distributed system's dynamics requires careful analysis of the network itself. Like most hard problems in computer science, this one comes down to shared state. A set of nodes separated by the network must exchange information: “Did I like that post?” “Was my write successful?” “Will you thumbnail my image?” “How much is in my account?”

At the end of one of these requests, you might guarantee that the requested operation…

• will be visible to everyone from now on
• will be visible to your connection now, and others later
• may not yet be visible, but is causally connected to some future state of the system
• is visible now, but might not be later
• may or may not be visible: ERRNO_YOLO

These are some examples of the complex interplay between consistency and durability in distributed systems. For instance, if you're writing CRDTs to one of two geographically replicated Riak clusters with W=2 and DW=1, you can guarantee that write…

• is causally connected to some future state of the system
• will survive the total failure of one node
• will survive a power failure (assuming fsync works) of all nodes
• will survive the destruction of an entire datacenter, given a few minutes to replicate

If you're writing to ZooKeeper, you might have a stronger set of guarantees: the write is visible now to all participants, for instance, and that the write will survive the total failure of up to n/2 - 1 nodes. If you write to Postgres, depending on your transaction's consistency level, you might be able to guarantee that the write will be visible to everyone, just to yourself, or “eventually”.

These guarantees are particularly tricky to understand when the network is unreliable.

## Partitions

Formal proofs of distributed systems often assume that the network is asynchronous, which means the network may arbitrarily duplicate, drop, delay, or reorder messages between nodes. This is a weak hypothesis: some physical networks can do better than this, but in practice IP networks will encounter all of these failure modes, so the theoretical limitations of the asynchronous network apply to real-world systems as well.

In practice, the TCP state machine allows nodes to reconstruct “reliable” ordered delivery of messages between nodes. TCP sockets guarantee that our messages will arrive without drops, duplication, or reordering. However, there can still be arbitrary delays–which would ordinarily cause the distributed system to lock indefinitely. Since computers have finite memory and latency bounds, we introduce timeouts, which close the connection when expected messages fail to arrive within a given time frame. Calls to read() on sockets will simply block, then fail.

Detecting network failures is hard. Since our only knowledge of the other nodes passes through the network, delays are indistinguishible from failure. This is the fundamental problem of the network partition: latency high enough to be considered a failure. When partitions arise, we have no way to determine what happened on the other nodes: are they alive? Dead? Did they receive our message? Did they try to respond? Literally no one knows. When the network finally heals, we'll have to re-establish the connection and try to work out what happened–perhaps recovering from an inconsistent state.

Many systems handle partitions by entering a special degraded mode of operation. The CAP theorem tells us that we can either have consistency (technically, linearizability for a read-write register), or availability (all nodes can continue to handle requests), but not both. What's more, few databases come close to CAP's theoretical limitations; many simply drop data.

In this series, I'm going to demonstrate how some real distributed systems behave when the network fails. We'll start by setting up a cluster and a simple application. In each subsequent post, we'll explore that application written for a particular database, and how that system behaves under partition.

## Setting up a cluster

You can create partitions at home! For these demonstrations, I'm going to be running a five node cluster of Ubuntu 12.10 machines, virtualized using LXC–but you can use real computers, virtual private servers, EC2, etc. I've named the nodes n1, n2, n3, n4, and n5: it's probably easiest to add these entries to /etc/hosts on your computer and on each of the nodes themselves.

We're going to need some configuration for the cluster, and client applications to test their behavior. You can clone http://github.com/aphyr/jepsen to follow along.

To run commands across the cluster, I'm using Salticid (http://github.com/aphyr/salticid). I've set my ~/.salticidrc to point to configuration in the Jepsen repo:

load ENV['HOME'] + '/jepsen/salticid/*.rb'

If you take a look at this file, you'll see that it defines a group called :jepsen, with hosts n1 … n5. The user and password for each node is 'ubuntu'–you'll probably want to change this if you're running your nodes on the public internet.

Try salticid -s salticid to see all the groups, hosts, and roles defined by the current configuration:

\$ salticid -s salticid Groups jepsen Hosts: n1 n2 n3 n4 n5 Roles base riak mongo redis postgres jepsen net Top-level tasks

First off, let's set up these nodes with some common software–compilers, network tools, etc.

salticid base.setup

The base role defines some basic operating system functions. base.reboot will reboot the cluster, and base.shutdown will unpower it.

The jepsen role defines tasks for simulating network failures. To cause a partition, run salticid jepsen.partition. That command causes nodes n1 and n2 to drop IP traffic from n3, n4, and n5–essentially by running

iptables -A INPUT -s n3 -j DROP iptables -A INPUT -s n4 -j DROP iptables -A INPUT -s n5 -j DROP

That's it, really. To check the current network status, run jepsen.status. jepsen.heal will reset the iptables chains to their defaults, resolving the partition.

To simulate slow networks, or networks which drop packets, we can use tc to adjust the ethernet interface. Jepsen assumes the inter-node interface is eth0. salticid jepsen.slow will add latency to the network, making it easier to reproduce bugs which rely on a particular message being dropped. salticid jepsen.flaky will probabilistically drop messages. Adjusting the inter-node latency and lossiness simulates the behavior of real-world networks under congestion, and helps expose timing dependencies in distributed algorithms–like database replication.

## A simple distributed system

In order to test a distributed system, we need a workload–a set of clients which make requests and record their results for analysis. For these posts, we're going to work with a simple application which writes several numbers to a list in a database. Each client app will independently write some integers to the DB. With five clients, client 0 writes 0, 5, 10, 15, …; client 1 writes 1, 6, 11, and so on.

For each write we record whether the database acknowledged the write successfully or whether there was an error. At the end of the run, we ask the database for the full set. If acknowledged writes are missing, or unacknowledged writes are present, we know that the system was inconsistent in some way: that the client application and the database disagreed about the state of the system.

In this series of blog posts, we're going to run this app against several distributed databases, and cause partitions during its run. In each case, we'll see how the system responds to the uncertainty of dropped messages. As the song might go:

I've written several implementations of this workload in Clojure. jepsen/src/jepsen/set_app.clj defines the application. (defprotocol SetApp ...) lists the functions an app has to implement, and (run n apps) sets up the apps and runs them in parallel, collects results, and shows any inconsistencies. Particular implementations live in src/jepsen/riak.clj, pg.clj,redis.clj, and so forth.

You'll need a JVM and Leiningen 2 to run this code. Once you've installed lein, and added it to your path, we're ready to go!

Next up on Jepsen, we take a look at how Postgresql's transaction protocol handles network failures.