Write contention occurs when two people try to update the same piece of data at the same time.
We know several ways to handle write contention, and they fall along a spectrum. For strong consistency (or what CAP might term “CP”) you can use explicit locking, perhaps provided by a central server; or optimistic concurrency where writes proceed through independent transactions, but can fail on conflicting commits. These approaches need not be centralized: consensus protocols like Paxos or two-phase-commit allow a cluster of machines to agree on an isolated transaction–either with pessimistic or optimistic locking, even in the face of some failures and partitions.
On the other end of the spectrum are the AP solutions, where both writes are accepted, but are resolved at a later time. If the resolution process is monotonic (always progresses towards a stable value regardless of order), we call the system “eventually consistent”. Given sufficient time to repair itself, some correct value will emerge.
What kind of resolution function should we use? Well our writes happen over time, and newer values are more correct than old ones. We could pick the most recently written value, as determined by a timestamp. Assuming our clocks are synchronized more tightly than the time between conflicting writes, this guarantees that the last write wins.
But wait! If I’m storing a list of the 500,000 people who follow, say, @scoblizer, and two people follow him at the same time… and last-write-wins chooses the most recently written set of followers… I’ll have lost a write! That’s bad! It’s rare–at Showyou we saw conflicts in ~1% of follower lists–but it still disrupts the user experience enough that I care about solving the problem *correctly. My writes should never be lost so long as Riak is doing its thing.
Well, Riak is naturally AP, and could accept both writes simultaneously, and you could resolve them somehow. OR-sets are provably correct, fit quite naturally to how you expect following to work, and reduce the probability of any contention to single elements, rather than the entire set. But maybe we haven’t heard of CRDTs yet, or thought (as I did for some time) that 2P sets were as good as it gets. That’s OK; CRDTs have only been formally described for a couple of years.
Serializing writes with mutexes
So what if we used locking on top of Riak?
Before writing an object, I acquire a lock with a reliable network service. It guarantees that I, and I alone, hold the right to write to /followers-of/scoblizer. While holding that lock we write to Riak, and when the write is successful, I release the lock. No two clients write at the same time, so write contention is eliminated! Our clocks are tightly synchronized by GPS or atomic clocks, and we aggressively monitor clock skew.
This does not prevent write contention.
When we issue a write to Riak, the coordinating node–the one we’re talking to as a client–computes the preflist for the key we’re going to write. If our replication factor N is 3, it sends a write to each of 3 vnodes. Then it waits for responses. By default it waits for W = (N/2 + 1) “write confirmed” responses, and also for DW = (N/2 + 1) confirmations that the write is saved on disk, not just in memory. If one node crashes, or is partitioned during this time, our write still succeeds and all is well.
That’s OK–there was no conflict, because the lock service prevented it! And our write is (assuming clocks are correct) more recent than whatever that crashed node had, so when the crashed node comes back we’ll know to discard the old value, the minority report. All we need for that is to guarantee that every read checks at least R = (N/2 + 1) copies, so at least one of our new copies will be available to win a conflict resolution. If we ever read with R=1 we could get that solitary old copy from the failed node. Maybe really old data, like days old, if the node was down for that long. Then we might write that data back to Riak, obliterating days worth of writes. That would definitely be bad.
OK, so read with quorum and write with quorum. Got it.
Partitions
What if that node failed to respond–not because it crashed–but because of a partition? What if, on the other side of the network curtain, there are other parts of our app also trying to add new followers?
Let’s say we read the value “0”, and go to write version “1”. The riak coordinator node completes two writes, but a partition occurs and the third doesn’t complete. Since we used W=quorum, this still counts as success and we think the write completed.
Meanwhile, Riak has detected the partition, and both sides shuffle new vnodes into rotation to recover. There are now two complete sets of vnodes, one on each side. Clients talking to either side will see their writes succeed, even with W=DW=quorum (or W=all, for that matter).
When the partition is resolved, we take the most recent timestamp to decide which cluster’s copy wins. All the writes on the other side side of the partition will be lost–even though we, as clients, were told they were successful. W & DW >= quorum is not sufficient; we need PW >= quorum to consider writes to those fallback copies of vnodes as failures.
Well that tells the client there was a failure, at least. But hold on… those fallback vnodes still accepted the writes and wrote them down dutifully. They’re on disk and are just as valid as the writes on the “majority” side of the partition! We can read them back immediately, and when the partition is resolved those “failed” writes have a ~1 in 2 chance (assuming an even distribution of writes between partitions) of winning last-write-wins–again, obliterating writes to the primary vnodes which were claimed to be successful! Or maybe the majority side was more recently written, and the minority writes are lost. Either way, we are guaranteed to lose data.
Nowhere in the above scenario did two writes have to happen “simultaneously”. The locking service is doing its job and keeping all writes neatly sequential. This illustrates an important point about distributed systems:
"Simultaneous" is about causality, not clocks.
The lock service can only save us if it knows the shape of the partition. It has to understand that events on both sides of the partition happen concurrently, from the point of view of Riak, and to grant any locks to the minority side would no longer provide a mutual exclusion of writes in the logical history of the Riak object.
The lock service, therefore, must be distributed. It must also be isomorphic to the Riak topology, so that when Riak is partitioned, the lock service is partitioned in the same way; and can tell us which nodes are “safe” and which are “dead”. Clients must obtain locks not just for keys, but for hosts as well.
One way to provide these guarantees is to build the locking service into Riak itself: jtuple has been thinking about exactly this problem. Alternatively, we could run a consensus protocol like Zookeeper on the Riak nodes and trust that partitions will affect both kinds of traffic the same way.
So finally, we’ve succeeded: in a partition, the smaller side shuts down. We are immediately Consistent and no longer Available, in the CAP sense; but our load balancer could distribute requests to the majority partition and all is well.
What’s a quorum, anyway?
OK, so maybe I wasn’t quite honest there. It’s not clear which side of the partition is “bigger”.
Riak has vnodes. Key “A” might be stored on nodes 1, 2, and 3, “B” on 3, 4, and 5, and “C” on 6, 7, and 8. If a partition separates nodes 1-4 from 5-8, nodes 1-4 will have all copies of A, one of B, and none of C. There are actually M ensembles at work, where M = ring_size. Riak will spin up fallback vnodes for the missing sections of the ring, but they may have no data. It’s a good thing we’re using PR >= quorum, because if we didn’t, we could read an outdated copy of an object–or even get a not_found when one really exists!
This is absolutely critical. If we read a not_found, interpret it as an empty set of followers, add one user, and save, we could obliterate the entire record when the partition resolves–except for the single user we just added. A half-million followers gone, just like that.
Since there are M ensembles distributed across the partition in varying ways, we can’t pick an authoritative side to keep running. We can either shut down entirely, or allow some fraction of our requests (depending on the ring size, number of nodes, and number of replicas, and the partition shape) to fail. If our operations involve touching multiple keys, the probability of failure grows rapidly.
This system cannot be said to be highly available, even if the coordination service works perfectly. If more than N/2 + 1 nodes are partitioned, we must fail partly or completely.
In fact, strictly speaking, we don’t even have the option of partial failure. If the cluster partitions after we read a value, a subsequent write will still go through to the fallback vnodes and we have no way to stop ourselves. PW doesn’t prevent writes from being accepted; it just returns a failure code. Now our write is sitting in the minority cluster, waiting to conflict with its sibling upon resolution.
The only correct solution is to shut down everything during the partition. Assuming we can reliably detect the partition. We can do that, right?
Partitions never happen!
It’s not like partitions happen in real life. We’re running in EC2, and their network is totally reliable.
OK so we decide to move to our own hardware. Everything in the same rack, redundant bonded interfaces to a pair of fully meshed agg switches, redundant isolated power on all the nodes, you got it. This hardware is bulletproof. We use IP addresses so DNS faults can’t partially partition nodes.
Then, somehow, someone changes the Erlang cookie on a node. OK, well now we know a great way to test how our system responds to partitions.
Our ops team is paranoid about firewalls. Those are a great way to cause asymmetric partitions. Fun times. After our hosting provider’s support team fat-fingered our carefully crafted interface config for the third time, we locked them out of the OS. And we replaced our Brand X NICs with Intel ones after discovering what happens to the driver in our particular kernel version under load.
We carefully isolate our Riak nodes from the app when restarting, so the app can’t possibly write to them. It’s standard practice to bring a node back online in isolation from the cluster, to allow it to stabilize before rejoining. That’s a partition, and any writes during that time could create havoc. So we make sure the app can’t talk to nodes under maintenance at all; allow them to rejoin the cluster, perform read-repair, and then reconnect the clients.
Riak can (in extreme cases) partition itself. If a node hangs on some expensive operation like compaction or list-keys, and requests time out, other nodes can consider it down and begin failover. Sometimes that failover causes other timeouts, and nodes continually partition each other in rolling brownouts that last for hours. I’m not really sure what happens in this case–whether writes can get through to the slow nodes and cause data loss. Maybe there’s a window during ring convergence where the cluster can’t decide what to do. Maybe it’s just a normal failure and we’re fine.
Recovery from backups is a partition, too. If our n_val is 3, we restore at most one node at a time, and make sure to read-repair every key afterwards to wipe out that stale data. If we recovered two old nodes (or forgot to repair every key on those failed nodes), we could read 2 old copies of old data from the backups, consider it quorum, and write it back.
I have done this. To millions of keys. It is not a fun time.
The moral of these stories is that partitions are about lost messages, not about the network; and they can crop up in surprisingly sneaky ways. If we plan for and mitigate these conditions, I’m pretty confident that Riak is CP. At least to a decent approximation.
Theory
You know, when we started this whole mess, I thought Riak was eventually consistent. Our locking service is flawless. And yet… there are all these ways to lose data. What gives?
Eventually consistent systems are monotonic. That means that over time, they only grow in some way, towards a most-recent state. The flow of messages pushes the system inexorably towards the future. It will never lose our causal connection to the past, or (stably) regress towards a causally older version. Riak’s use of vector clocks to identify conflicts, coupled with a monotonic conflict-resolution function, guarantees our data will converge.
And Last Write Wins is monotonic. It’s associative: LWW(a, LWW(b, c)) = LWW(LWW(a, b), c). It’s commutative: LWW(a, b) = LWW(b, a). And it’s idempotent: LWW(LWW(a)) = LWW(a). It doesn’t matter what order versions arrive in: Riak will converge on the version with the highest timestamp, always.
The problem is that LWW doesn’t fit with our desired semantics. These three properties guarantee monotonicity, but don’t prevent us from losing writes. f(a, b) = 0 is associative, commutative, and idempotent, but obviously not information-preserving. Like burning the Library at Alexandria, Last Write Wins is a monotonic convergence function that can destroy knowledge–regardless of order, the results are the same.
Conclusions
Last-write-wins is an appropriate choice when the client knows that the current state is correct. But most applications don’t work this way: their app servers are stateless. State, after all, is the database’s job. The client simply reads a value, makes a small change (like adding a follower), and writes the result back. Under these conditions, last-write-wins is the razor’s edge of history. We cannot afford a single mistake if we are to preserve information.
What about eventual consistency?
If we had instead merged our followers list with set union, every failure mode I’ve discussed here disappears. The system’s eventual consistency preserves our writes, even in the face of partitions. You can still do deletes (over some time horizon) by using the lock service and a queue.
We could write the follower set as a list of follow or unfollow operations with timestamps, and take the union of those lists on conflict. This is the approach provided by statebox et al. If the timestamps are provided by the coordination service, we recover full CP semantics, apparent serializability, and Riak’s eventual consistency guarantees preserve our writes up to N-1 failures (depending on r/pr/w/pw/dw choices). That system can also be highly available, if paths to both sides of the partition are preserved and load balancers work correctly. Even without a coordination service, we can guarantee correctness under write contention to the accuracy of our clocks. GPS can provide 100-nanosecond-level precision. NTP might be good enough.
Or you could use OR-sets. It’s 130 lines of ruby. You can write an implementation and test its correctness in a matter of days, or use an existing library like Eric Moritz’ CRDT or knockbox. CRDTs require no coordination service and work even when clocks are unreliable: they are the natural choice for highly available, eventually consistent systems.
Good post. Especially great at showing the perils of using client-side locking for building stronger semantics into Riak. More on that later.
On your conclusion, queuing timestamped operations which are then processed in order does allow for essentially arbitrary types of data transformation that appears to be serializable consistent. The main problem is that you never know when you have read the final value. All partitions must heal and all data propagate before you’ll have an entire history to process. It’s still eventual consistency at the end of the day. This approach is also susceptible to write failures. As you know, Riak only provides positive guarantees. If a write succeeds, you know it was written. If a write fails, the write may or may not actually have been written.
In any case, a fine approximate solution that works well in many cases. However, I still believe stronger semantics remain necessary. The work I presented at RICON aims at providing immediate serializable consistency along with sane semantics around write failures. Yes, requests will be refused during partitions when necessary – it’s an A vs C tradeoff after all.
Back to locking and quorum issues, I’m glad I’m not the only one trying to educate the world on these issues. I’ve seen way too many attempts at building strong consistency on top of Riak using some sort of client-side coordinator. It’s just not possible.
As you mentioned, my work builds locks directly into Riak making things aware of cluster topology. That’s step one. The second issue is sloppy quorums. My work uses strict primary-only quorums. This isn’t PW/PR, this is new semantics that had to be added in Riak for this to work. By mapping primary replicas to our consensus ensemble, and requiring two-phase commit to change the membership (eg. when cluster ownership changes), we can handle writes during a partition. Provided a quorum of non-partitioned primaries exist of course. Yes, we will refuse reads/writes if a quorum can’t be reached, oh well we’re CP. At least the ring is broken into multiple ensembles and therefore failures only affect a subset of the overall keyspace.
Again, great post. The more people thinking/writing about this stuff the better.