Asynchronous replication with failover

In response to my earlier post on Redis inconsistency, Antirez was kind enough to help clarify some points about Redis Sentinel's design.

First, I'd like to reiterate my respect for Redis. I've used Redis extensively in the past with good results. It's delightfully fast, simple to operate, and offers some of the best documentation in the field. Redis is operationally predictable. Data structures and their performance behave just how you'd expect. I hear nothing but good things about the clarity and quality of Antirez' C code. This guy knows his programming.

I think Antirez and I agree with each other, and we're both saying the same sorts of things. I'd just like to expand on some of these ideas a bit, and generalize to a broader class of systems.

First, the distributed system comprised of Redis and Redis Sentinel cannot be characterized as consistent. Nor can MongoDB with anything less than WriteConcern.MAJORITY, or MySQL with asynchronous replication, for that matter. Antirez writes:

What I'm saying here is that just the goal of the system is:

1) To promote a slave into a master if the master fails.
2) To do so in a reliable way.

Redis Sentinel does reliably promote secondaries into primaries. It is so good at this that it can promote two, three, or all of your secondaries into primaries concurrently, and keep them in that state indefinitely. As we've seen, having causally unconnected primaries in this kind of distributed system allows for conflicts–and since Redis Sentinel will destroy the state on an old primary when it becomes visible to a quorum of Sentinels, this can lead to arbitrary loss of acknowledged writes to the system.

Ok I just made clear enough that there is no such goal in Sentinel to turn N Redis instances into a distributed store,

If you use any kind of failover, your Redis system is a distributed store. Heck, reading from secondaries makes Redis a distributed store.

So you can say, ok, Sentinel has a limited scope, but could you add a feature so that when the master feels in the minority it no longer accept writes? I don't think it's a good idea. What it means to be in the minority for a Redis master monitored by Sentinels (especially given that Redis and Sentinel are completely separated systems)?

Do you want your Redis master stopping to accept writes when it is no longer able to replicate to its slaves?

Yes. This is required for a CP system with failover. If you don't do it, your system can and will lose data. You cannot achieve consistency in the face of a partition without sacrificing availability. If you want Redis to be AP, then don't destroy the data on the old primaries by demoting them. Preserve conflicts and surface them to the clients for merging.

You could do this as an application developer by setting every Redis node to be a primary, and writing a proxy layer which uses, say, consistent hashing and active anti-entropy to replicate writes between nodes. Take a look at Antirez's own experiments in this direction. If you want a CP system, you could follow Datomic's model and use immutable shared-structure values in Redis, combined with, say, Zookeeper for mutable state.

Why topology matters

Antirez recommends a different approach to placing Sentinels than I used in my Redis experiments:

… place your Sentinels and set your quorum so that you are defensive enough against partitions. This way the system will activate only when the issue is really the master node down, not a network problem. Fear data loss and partitions? Have 10 Linux boxes? Put a Sentinel in every box and set quorum to 8.

I… can't parse this statement in a way that makes sense. Adding more boxes to a distributed system doesn't reduce the probability of partitions–and more to the point, trying to determine the state of a distributed system from outside the system itself is fundamentally flawed.

I mentioned that having the nodes which determine the cluster state (the Sentinels) be separate from the nodes which actually perform the replication (the Redis servers) can lead to worse kinds of partitions. I'd like to explain a little more, because I'm concerned that people might actually be doing this in production.

In this image, S stands for Sentinel, R stands for a Redis server, and C stands for Client. A box around an R indicates that node is a primary, and where it is able to replicate data to a secondary Redis server, an arrow is shown on that path. Lines show open network connections, and the jagged border shows a network partition.

Let's say we place our sentinels on 3 nodes to observe a three-node cluster. In the left-hand scenario, the majority of Sentinels are isolated, with two servers, from the clients. They promote node 2 to be a new primary, and it begins replicating to node 3. Node 1, however, is still a primary. Clients will continue writing to node 1, even though a.) its durability guarantees are greatly diminished–if it dies, all writes will be lost, and b.) the node doesn't have a quorum, so it cannot safely accept writes. When the partition resolves, the Sentinels will demote node 1 to a secondary and replace its data with the copy from N2, effectively destroying all writes during the partition.

On the right-hand side, a fully connected group of Sentinels can only see one Redis node. It's not safe to promote that node, because it doesn't have a majority and servers won't demote themselves when isolated, but the sentinels do it anyway. This scenario could be safely available to clients because a majority is present, but Redis Sentinel happily creates a split-brain and obliterates the data on the first node at some later time.

If you take Antirez' advice and colocate the sentinels with your clients, we can still get in to awful states. On the left, an uneven partition between clients and servers means we elect a minority Redis server as the primary, even though it can't replicate to any other nodes. The majority component of the servers can still accept writes, but they're doomed: when the clients are able to see those nodes again, they'll wipe out all the writes that took place on those 2 nodes.

On the right, we've got the same partition topology I demonstrated in the Redis post. Same deal: split brain means conflicting writes and throwing away data.

If you encounter intermittent or rolling partitions (which can happen in the event of congestion and network failover), shifting quorums coupled with the inability of servers to reason about their own cluster state could yield horrifying consequences, like every node being a primary at the same time. You might be able to destroy not only writes that took place during the partition, but all data ever written–not sure if the replication protocol allows this or if every node just shuts down.

Bottom line: if you're building a distributed system, you must measure connectivity in the distributed system itself, not by what you can see from the outside. Like we saw with MongoDB and Riak, it's not the wall-clock state that matters–it's the logical messages in the system. The further you get from those messages, the wider your windows for data loss.

It's not just Sentinel

I assert that any system which uses asynchronous primary-secondary replication, and can change which node is the primary, is inconsistent. Why? If you write an operation to the primary, and then failover occurs before the operation is replicated to the node which is about to become the new primary, the new primary won't have that operation. If your replication strategy is to make secondaries look like the current primary, the system isn't just inconsistent, but can actually destroy acknowledged operations.

Here's a formal model of a simple system which maintains a log of operations. At any stage, one of three things can happen: we can write an operation to the primary, replicate the log of the primary to the secondary, or fail over:

------------------------------ MODULE failover ------------------------------ EXTENDS Naturals, Sequences, TLC CONSTANT Ops \* N1 and N2 are the list of writes made against each node VARIABLES n1, n2 \* The list of writes acknowledged to the client VARIABLE acks \* The current primary node VARIABLE primary \* The types we allow variables to take on TypeInvariant == /\ primary \in {1, 2} /\ n1 \in Seq(Ops) /\ n2 \in Seq(Ops) /\ acks \in Seq(Ops) \* An operation is acknowledged if it has an index somewhere in acks. IsAcked(op) == \E i \in DOMAIN acks : acks[i] = op \* The system is *consistent* if every acknowledged operation appears, \* in order, in the current primary's oplog: Consistency == acks = SelectSeq((IF primary = 1 THEN n1 ELSE n2), IsAcked) \* We'll say the system is *potentially consistent* if at least one node \* has a superset of our acknowledged writes in order. PotentialConsistency == \/ acks = SelectSeq(n1, IsAcked) \/ acks = SelectSeq(n2, IsAcked) \* To start out, all oplogs are empty, and the primary is n1. Init == /\ primary = 1 /\ n1 = <<>> /\ n2 = <<>> /\ acks = <<>> \* A client can send an operation to the primary. The write is immediately \* stored on the primary and acknowledged to the client. Write(op) == IF primary = 1 THEN /\ n1' = Append(n1, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n2, primary>> ELSE /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n1, primary>> \* For clarity, we'll have the client issues unique writes WriteSomething == \E op \in Ops : ~IsAcked(op) /\ Write(op) \* The primary can *replicate* its state by forcing another node \* into conformance with its oplog Replicate == IF primary = 1 THEN /\ n2' = n1 /\ UNCHANGED <<n1, acks, primary>> ELSE /\ n1' = n2 /\ UNCHANGED <<n2, acks, primary>> \* Or we can failover to a new primary. Failover == /\ IF primary = 1 THEN primary' = 2 ELSE primary = 1 /\ UNCHANGED <<n1, n2, acks>> \* At each step, we allow the system to either write, replicate, or fail over Next == \/ WriteSomething \/ Replicate \/ Failover

This is written in the TLA+ language for describing algorithms, which encodes a good subset of ZF axiomatic set theory with first-order logic and the Temporal Law of Actions. We can explore this specification with the TLC model checker, which takes our initial state and evolves it by executing every possible state transition until it hits an error:

This protocol is inconsistent. The fields in red show the state changes during each transition: in the third step, the primary is n2, but n2's oplog is empty, instead of containing the list <<2>>. In fact, this model fails the PotentiallyConsistent invariant shortly thereafter, if replication or a write occurs. We can also test for the total loss of writes; it fails that invariant too.

That doesn't mean primary-secondary failover systems must be inconsistent. You just have to ensure that writes are replicated before they're acknowledged:

\* We can recover consistency by making the write protocol synchronous SyncWrite(op) == /\ n1' = Append(n1, op) /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED primary \* This new state transition satisfies both consistency constraints SyncNext == \/ \E op \in Ops : SyncWrite(op) \/ Replicate \/ Failover

And in fact, we don't have to replicate to all nodes before ack to achieve consistency–we can get away with only writing to a quorum, if we're willing to use a more complex protocol like Paxos.

The important bit

So you skimmed the proof; big deal, right? The important thing that it doesn't matter how you actually decide to do the failover: Sentinel, Mongo's gossip protocol, Heartbeat, Corosync, Byzantine Paxos, or a human being flipping the switch. Redis Sentinel happens to be more complicated than it needs to be, and it leaves much larger windows for write loss than it has to, but even if it were perfect the underlying Redis replication model is fundamentally inconsistent. We saw the same problem in MongoDB when we wrote with less than WriteConcern.MAJORITY. This affects asynchronous replication in MySQL and Postgres. It affects DRBD (yeaaaahhh, this can happen to your filesystem). If you use any of this software, you are building an asynchronous distributed system, and there are eventualities that have to be acknowledged.

Look guys, there's nothing new here. This is an old proof and many mature software projects (for instance, DRBD or RabbitMQ) explain the inconsistency and data-loss consequences of a partition in their documentation. However, not everyone knows. In fact, a good number of people seem shocked.

Why is this? I think it might be because software engineering is a really permeable field. You can start out learning Rails, and in two years wind up running four distributed databases by accident. Not everyone chose or could afford formal education, or was lucky enough to have a curmudgeonly mentor, or happened to read the right academic papers or find the right blogs. Now they might be using Redis as a lock server, or storing financial information in MongoDB. Is this dangerous? I honestly don't know. Depends on how they're using the system.

I don't view this so much as an engineering problem as a cultural one. Knives still come with sharp ends. Instruments are still hard for beginners to play. Not everything can or should be perfectly safe–or accessible. But I think we should warn people about what can happen, up front.

Tangentially: like many cultures, much of our collective understanding about what is desirable or achievable in distributed systems is driven by advertising. Yeah, MongoDB. That means you. ;-)

Bottom line

I don't mean to be a downer about all this. Inconsistency and even full-out data loss aren't the end of the world. Asynchronous replication is a good deal faster, both in bulk throughput and client latencies. I just think we lose sight, occasionally, of what that means for our production systems. My goal in writing Jepsen has been to push folks to consider their consistency properties carefully, and to explain them clearly to others. I think that'll help us all build safer systems. :)

Call me maybe: final thoughts

Previously in Jepsen, we discussed Riak. Now we'll review and integrate our findings.

We started this series with an open problem.

Notorious computer expert Joe Damato explains: “Literally no one knows.”

We've pushed the boundaries of our knowledge a little, though. By building a simple application which models a sequence of causally dependent writes, recording a log of that app's view of the world, and comparing that log to the final state of the database, we were able to verify–and challenge–our assumptions about the behavior of various distributed systems. In this talk we discussed one particular type of failure mode: a stable network partition which isolated one or more primary nodes–and explored its consequences in depth.

In each case, the system did something… odd. Maybe we hadn't fully thought through the consequences of the system, even if they were documented. Maybe the marketing or documentation were misleading, or flat-out lies. We saw design flaws, like the Redis Sentinel protocol. Some involved bugs, like MongoDB's WriteConcern.MAJORITY treating network errors as successful acknowledgements. Other times we uncovered operational caveats, like Riak's high latencies before setting up fallback vnodes. In each case, the unexpected behavior led to surprising new information about the challenge of building correct distributed systems.

In this series, we chose a simple network failure which we know happens to real production systems. The test encoded specific assumptions about concurrency, throughput, latency, timeout, error handling, and conflict resolution. The results demonstrate one point in a high-dimensional parameter space. The fraction of dropped writes in these Jepsen's demos can vary wildly for all these reasons, which means we can't make general assertions about how bad the possibility of write loss really is. Mongo could lose almost all your writes, or none at all. It completely depends on the nature of your network, application, server topology, hardware, load, and the failure itself.

To apply these findings to your systems–especially in fuzzy, probabilistic ways–you'll need to measure your assumptions about how your system behaves. Write an app that hits your API and records responses. Cause some failures and see whether the app's log of what happened lines up with the final state of the system. The results may be surprising.

Measurement isn't something you do just once. Ideally, your production systems should be instrumented continuously for performance and correctness. Some of these failure modes leave traces you can detect.

Some people claim that partitions don't happen to them. If you run in EC2 or other virtualized environments, noisy neighbors and network congestion/failures are a well-known problem. Running your own hardware doesn't make you immune either: Amazon, with some of the best datacenter engineers on the planet, considers partitions such a major problem that they were willing to design and build Dynamo. You are probably not Amazon.

Even if your network is reliable, logical failures can be partitions, too. Nodes which become so busy they fail to respond to heartbeats are a common cause of failover. Virtual machines can do all kinds of weird things to your network and clocks. Restoring from a backup can look like a partition resolving. These failures are hard to detect, so many people don't know they even occurred. You just… get slow for a while, or run across data corruption, weeks or years later, and wonder how what happened.

Aiming for correctness

We've learned a bunch of practical lessons from these examples, and I'd like to condense them briefly:

Network errors mean “I don't know,” not “It failed.” Make the difference between success, failure, and indeterminacy explicit in your code and APIs. Consider extending consistency algorithms through the boundaries of your systems. Hand TCP clients ETags or vector clocks. Extend CRDTs to the browser itself.

Even well-known, widely deployed algorithms like two-phase commit have some caveats, like false negatives. SQL transactional consistency comes in several levels. You're probably not using the stronger ones, and if you are, your code needs to handle conflicts. It's not usually a big deal, but keep it on your mental checklist.

Certain problems are hard to solve well, like maintaining a single authoritative record of data with primary failover. Consistency is a property of your data, not of your nodes. Avoid systems which assume node consensus implies data consistency.

Wall clocks are only useful for ensuring responsiveness in the face of deadlock, and even then they're not a positive guarantee of correctness. Our clocks were completely synchronized in this demo and we still lost data. Even worse things can happen if a clock gets out of sync, or a node pauses for a while. Use logical clocks on your data. Distrust systems which rely on the system time, unless you're running GPS or atomic clocks on your nodes. Measure your clock skew anyway.

Avoid home-grown distributed algorithms. Where correctness matters, rely on techniques with a formal proof and review in the literature. There's a huge gulf between theoretically correct algorithm and living breathing software–especially with respect to latency–but a buggy implementation of a correct algorithm is typically better than a correct implementation of a terrible algorithm. Bugs you can fix. Designs are much harder to re-evaluate.

Choose the right design for your problem space. Some parts of your architecture demand consistency, and there is software for that. Other parts can sacrifice linearizability while remaining correct, like CRDTs. Sometimes you can afford to lose data entirely. There is often a tradeoff between performance and correctness: think, experiment, and find out.

Restricting your system with particular rules can make it easier to attain safety. Immutability is an incredibly useful property, and can be combined with a mutable CP data store for powerful hybrid systems. Use idempotent operations as much as possible: it enables all sorts of queuing and retry semantics. Go one step further, if practical, and use full CRDTs.

Preventing write loss in some weakly consistent databases, like Mongo, requires a significant latency tradeoff. It might be faster to just use Postgres. Sometimes buying ridiculously reliable network and power infrastructure is cheaper than scaling out. Sometimes not.

Replication between availability zones or between data centers is much more likely to fail than a rack or agg switch in your DC. Microsoft estimates their WAN links offer 99.5% availability, IIRC, and their LANs at 99.95%. Design your system accordingly.

Embracing failure

All this analysis, measuring, and designing takes hard work. You may not have the money, experience, hardware, motivation, or time. Every system entails risk, and not quantifying that risk is a strategy in itself.

With that in mind, consider allowing your system to drop data. Spew data everywhere and repair it gradually with bulk processes. Garbage-collect structures instead of ensuring their correctness every time. Not everyone needs correct behavior right now. Some people don't ever need correct behavior. Look at the Facebook feed, or Twitter's DM light.

Code you can reason about is better than code you can't. Rely on libraries written and tested by other smart people to reduce the insane quantity of stuff you have to understand. If you don't get how to test that your merge function is associative, commutative, and idempotent, maybe you shouldn't be writing your own CRDTs just yet. Implementing two-phase commit on top of your database may be a warning sign.

Consistent, highly available systems are usually slow. There are proofs about the minimum number of network hops required to commit an operation in a CP system. You may want to trade correctness for performance for cost reasons, or to deliver a more responsive user experience.

I hope this work inspires you to test and improve your own distributed systems. The only reason I can talk about these mistakes is because I keep making them, over and over again. We're all in this together. Good luck. :)

http://github.com/aphyr/jepsen

Thanks

Jepsen has consumed almost every hour of my life outside work for the last three months. I'm several hundred hours into the project now–and I couldn't have done it without the help and encouragement of friends and strangers.

My sincerest thanks to my fellow Boundary alumni Dietrich Featherston and Joe Damato for the conversations which sparked this whole endeavor. Salvatore Sanfilippo, Jordan West, Evan Vigil-McClanahan, Jared Rosoff, and Joseph Blomstedt were instrumental in helping me understand how these databases actually work. Stephen Strowes and someone whose name I've sadly forgotten helped me demonstrate partitions on a local cluster in the first place. My deepest appreciation to the Postgres team, the Redis project, 10Gen and Basho for their support, and for making cool databases available to everyone for free.

Sean Cribbs and Reid Draper clued me in to CRDTs and the problems of LWW. Tom Santero and Mark Phillips invited me to give this talk at RICON East. Jepsen wouldn't have existed without their encouragement, and I am continuously indebted to the pair. Zach Tellman, John Muellerleile, Josh O'Brien, Jared Morrow, and Ryan Zezeski helped refine my arguments and slides.

Hope I didn't forget anyone–if so, please drop me a line. Thanks for reading.

Call me maybe: Riak

Previously in Jepsen, we discussed MongoDB. Today, we'll see how last-write-wins in Riak can lead to unbounded data loss.

So far we've examined systems which aimed for the CP side of the CAP theorem, both with and without failover. We learned that primary-secondary failover is difficult to implement safely (though it can be done; see, for example, ZAB or Raft). Now I'd like to talk about a very different kind of database–one derived from Amazon's Dynamo model.

Amazon designed Dynamo with the explicit goals of availability and partition tolerance–and partition-tolerant systems automatically handle node failure. It's just a special kind of partition. In Dynamo, all nodes are equal participants in the cluster. A given object is identified by a key, which is consistently hashed into N slots (called “partitions”; not to be confused with a network partition) on a ring. Those N slots are claimed by N (hopefully distinct) nodes in the cluster, which means the system can, once data is replicated, tolerate up to N-1 node failures without losing data.

When a client reads from a Dynamo system, it specifys an R value: the number of nodes required to respond for a read to be successful. When it writes, it can specify W: the number of nodes which have to acknowledge the write. There's also DW for “durable write”, and others. Riak has sometimes referred to these as “tunable CAP controls”: if you choose R=W=1, your system will be available even if all but one node fail–but you may not read the latest copy of data. If R + W is greater than N/2, you're “guaranteed to read acknowledged writes”, with caveats. The defaults tend to be R=W=quorum, where quorum is N/2+1.

Dynamo handles partitions by healing the ring. Each connected set of machines establishes a set of fallback vnodes, to handle the portions of the ring which are no longer accessible. Once failover is complete, a Dynamo cluster split into two disjoint components will have two complete hash rings, and (eventually, as repair completes) 2 * N copies of the data (N in each component). When the partition heals, the fallback vnodes engage in hinted handoff, giving their data back to the original “primary” vnodes.

Since any node can accept writes for its portion of the keyspace, a Dynamo system can theoretically achieve 100% availability, even when the network fails entirely. This comes with two drawbacks. First, if no copy of a given object is available in an isolated set of nodes, that part of the cluster can accept writes for that object, but the first reads will return 404. If you're adding items to a shopping cart and a partition occurs, your cart might appear to be empty. You could add an item to that empty cart, and it'd be stored, but depending on which side of the partition you talk to, you might see 20 items or just one.

When the partition heals, we have a new problem: it's not clear which version of an object is authoritative. Dynamo employs a causality-tracing algorithm called vector clocks, which means it knows which copies of an object have been overwritten by updates, and which copies are actually conflicts–causally unconnected–due to concurrent writes.

Concurrent. We were talking about partitions, right? Two writes are concurrent if they happen in different components and can't see each other's changes, because the network didn't let them communicate.

Well that's interesting, because we're also used to concurrency being a property of normal database systems. If two people read an object, then write it back with changes, those writes will also conflict. In a very real sense, partitions are just really big windows of concurrency. We often handle concurrent writes in relational databases with multi-version concurrency control or locks, but we can't use locks here because the time horizons could be minutes or hours, and there's no safe way to distribute a lock algorithm over a partition. We need a different approach. We need to be able to merge arbitrary conflicting objects for Dynamo to work. From the paper:

For instance, the application that maintains customer shopping carts can choose to “merge” the conflicting versions and return a single unified shopping cart. Despite this flexibility, some application developers may not want to write their own conflict resolution mechanisms and choose to push it down to the data store, which in turn chooses a simple policy such as “last write wins”.

Last write wins. That sounds like a timestamp. Didn't we learn that Clocks Are Not To Be Trusted? Let's try it and find out!

Riak with last-write-wins

Riak is an excellent open-source adaptation of the Dynamo model. It includes a default conflict resolution mode of last-write-wins, which means that every write includes a timestamp, and when conflicts arise, it picks the one with the higher timestamp. If our clocks are perfectly synchronized, this ensures we pick the most recent value.

To be clear: there are actually two settings in Riak which affect conflict resolution: lww=true, which turns off vector clock analysis entirely, and allow-mult=false, which uses vector clocks but picks the sibling with the highest timestamp. Allow-mult=false is safer, and that's the setting I'm referring to by “last write wins.” All cases of data loss in this post apply to both settings, though.

First, let's install Riak, join the nodes together, and tell the cluster to commit the change:

salticid riak.setup salticid riak.join salticid riak.commit

You can watch the logs with salticid riak.tail. Watch salticid riak.transfers until there are no handoffs remaining. The cluster is now in a stable state.

For this particular application we'll be adding numbers to a list stored in a single Riak object. This is a typical use case for Dynamo systems–the atomic units in the system are keys, not rows or columns. Let's run the app with last-write-wins consistency:

lein run riak lww-sloppy-quorumWrites completed in 5.119 seconds 2000 total 2000 acknowledged 566 survivors 1434 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 1 2 3 4 6 8 ... 1990 1991 1992 1995 1996 1997 1.0 ack rate 0.717 loss rate

Riak lost 71% of acknowledged writes on a fully-connected, healthy cluster. No partitions. Why?

Remember how partitions and concurrency are essentially the same problem? Simultaneous writes are causally disconnected. If two clients write values which descend from the same object, Riak just picks the write with the higher timestamp, and throws away the other write. This is a classic data race, and we know how to fix those: just add a mutex. We'll wrap all operations against Riak in a perfectly consistent, available distributed lock.

“But you can't do that! That violates the CAP theorem!”

Clever girl. Jepsen lets us pretend, though:

lein run lock riak-lww-sloppy-quorumWrites completed in 21.475 seconds 2000 total 2000 acknowledged 2000 survivors All 2000 writes succeeded. :-D

Problem solved! No more write conflicts. Now let's see how it behaves under a partition by running salticid jepsen.partition during a run:

237 :ok 242 :ok 247 :ok 252 :ok 257 :ok 262 timeout 85 :ok 204 timeout 203 timeout 106 :ok 209 timeout 267 timeout 90 :ok

The first thing you'll notice is that our writes start to lag hard. Some clients are waiting to replicate a write to a majority of nodes, but one side of the partition doesn't have a majority available. Even though Riak is an AP design, it can functionally become unavailable while nodes are timing out.

Those requests time out until Riak determines those nodes are inaccessible, and sets up fallback vnodes. Once the fallback vnodes are in place, writes proceed on both sides of the cluster, because both sides have a majority of vnodes available. This is by design in Dynamo. Allowing both components to see a majority is called a sloppy quorum, and it allows both components to continue writing data with full multi-node durability guarantees. If we didn't set up fallback vnodes, a single node failure could destroy our data.

Before collecting results, let's heal the cluster: salticid jepsen.heal. Remember to wait for Riak to recover, by waiting until salticid riak.transfers says there's no data left to hand off.

Writes completed in 92.773 seconds 2000 total 1985 acknowledged 176 survivors 1815 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 85 90 95 100 105 106 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー｀)ノ (203 204 218 234 262 277) 0.9925 ack rate 0.91435766 loss rate 0.00302267 unacknowledged but successful rate

91% data lost. This is fucking catastrophic, guys.

What happened? When the partition healed, Riak had two essentially two versions of the list: one from each side of the partition (plus some minorly divergent copies on each side). Last-write-wins means we pick the one with the higher timestamp. No matter what you do, all the writes from one side or the other will be discarded.

If your Riak cluster partitions, and you write to a node which can't reach any of the original copies of the data, that write of a fresh object can overwrite the original record–destroying all the original data.

Strict quorum

The problem is that we allowed writes to proceed on both sides of the partition. Riak has two more settings for reads and writes: PR and PW, for primary read and write, respectively. PR means you have to read a value from at least that many of the original owners of a key: fallback vnodes don't count. If we set PR + PW >= quorum, operations against a given key will only be able to proceed on one component of a partitioned cluster. That's a CP system, right?

lein run lock riak-lww-quorum274 :ok 1250 :ok 279 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pw_val_unsatisfied,2,1} 1381 :ok 277 com.basho.riak.client.RiakRetryFailedException: com.basho.riak.pbc.RiakError: {pr_val_unsatisfied,2,1}

Here we see the cluster denying a write and a read, respectively, to clients which can't see a majority of the primary nodes for a key. Note that because the quorums are spread around the nodes, a Dynamo system will be partially available in this mode. In any given component, you'll be able to read and write some fraction of the keys, but not others.

2000 total 1971 acknowledged 170 survivors 1807 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 86 91 95 96 100 101 ... 1994 1995 1996 1997 1998 1999 6 unacknowledged writes found! ヽ(´ー｀)ノ (193 208 219 237 249 252) 0.9855 ack rate 0.9167935 loss rate 0.00304414 unacknowledged but successful rate

PR=PW=R=W=quorum still allowed 92% write loss. We reported failure for more writes than before, so that's a start–but what gives? Shouldn't this have been CP?

The problem is that that failed writes may still be partially successful. Dynamo is designed to preserve writes as much as possible. Even though a node might return “PW val unsatisfied” when it can't replicate to the primary vnodes for a key, it may have been able to write to one primary vnode–or any number of fallback vnodes. Those values will still be exchanged during read-repair, considered as conflicts, and the timestamp used to discard the older value–meaning all writes from one side of the cluster.

This means the minority component's failing writes can destroy all of the majority component's successful writes. Repeat after me: Clocks. Are. Evil.

Is there no hope? Is there anything we can do to preserve my writes in Riak?

Yes. We can use CRDTs.

If we enable allow-mult in Riak, the vector clock algorithms will present both versions to the client. We can combine those objects together using a merge function. If the merge function is associative, commutative, and idempotent over that type of object, we can guarantee that it always converges to the same value regardless of the order of writes. If the merge function doesn't discard data (like last-write-wins does), then it will preserve writes from both sides.

In this case, we're accumulating a set of numbers. We can use set union as our merge function, or 2P sets, or OR sets, if we need to remove numbers.

lein run riak-crdtWrites completed in 80.918 seconds 2000 total 1948 acknowledged 2000 survivors All 2000 writes succeeded. :-D

CRDTs preserve 100% of our writes. We still have false negatives in this demo, because the client timed out on a few writes which Riak was still propagating, when the partition first began. False negatives are OK, though, because state-based CRDTs are idempotent. We can repeat our writes arbitrarily many times, in any order, without duplicating data.

Moreover, CRDTs are an AP design: we can write safely and consistently even when the cluster is totally partitioned–for example, when no majority exists. They're also eventually consistent (in a safe, data-preserving sense) when components are partitioned away from all copies of a given object and are forced to start from scratch.

Strategies for working with Riak

Enable allow-mult. Use CRDTs.

Seriously. LWW never should have been the standard behavior for a Dynamo system, but Basho made it the default after customers complained that they didn't like the complexity of reasoning about siblings. Customers are the only reason Riak exists, and this behavior is gonna seem OK until you start experiencing partitions (and remember, fault tolerance is the reason you chose Riak in the first place), so we're stuck with a default config which promotes simple-yet-dangerous behavior.

As a consequence of that decision, community resources which people rely on to learn how to use Riak are often aimed towards last-write-wins. Software isn't just an artifact, but a culture around its use. I don't really know what we can learn from this, besides the fact that engineering and culture are tough problems.

CRDTs may be too large, too complex, or too difficult to garbage-collect for your use case. However, even if you can't structure your data as a full CRDT, writing a hacked-together merge function which just takes care of a couple important fields (say, set union over your friend list and logical OR over the other fields) can go a long way towards preventing catastrophic data loss.

There are cases where last-write-wins is a safe strategy. If your data is immutable, then it doesn't matter which copy you choose. If your writes mean “I know the full correct state of this object at this time”, it's safe. Many caches and backup systems look like this. If, however, your writes mean “I am changing something I read earlier,” then LWW is unsafe.

Finally, you can decide to accept dropped data. All databases will fail, in different ways, and with varying probabilities. Riak's probability distribution might be OK for you.

Introducing locks is a bad idea. Even if they did prevent data loss–and as we saw, they don't–you'll impose a big latency cost. Moreover, locks restrict your system to being CP, so there's little advantage to having an AP database. However, some really smart folks at Basho are working on adding Paxos rounds for writes which need to be CP. Having a real consensus protocol will allow Riak's distributed writes to be truly atomic.

So: we've seen that Riak's last-write-wins is fundamentally unsafe in the presence of network partitions. You can lose not only writes made during the partition, but all writes made at any time prior. Riak is an AP system, and its tunable CAP controls only allow you to detect some forms of write loss–not prevent it. You can't add consistency to a database by tacking on a lock service because wall clock time doesn't matter: consistency is a causal property of the relationships between the writes themselves. AP systems involve fundamentally different kinds of data structures, with their own unique tradeoffs.

In the next post, we'll review what we've learned from these four distributed systems, and where we go from here.

Call me maybe: MongoDB

Previously in Jepsen, we discussed Redis. In this post, we'll see MongoDB drop a phenomenal amount of data.

MongoDB is a document-oriented database with a similar distribution design to Redis. In a replica set, there exists a single writable primary node which accepts writes, and asynchronously replicates those writes as an oplog to N secondaries. However, there are a few key differences.

First, Mongo builds in its leader election and replicated state machine. There's no separate system which tries to observe a replica set in order to make decisions about what it should do. The replica set decides among itself which node should be primary, when to step down, how to replicate, etc. This is operationally simpler and eliminates whole classes of topology problems.

Second, Mongo allows you to ask that the primary confirm successful replication of a write by its disk log, or by secondary nodes. At the cost of latency, we can get stronger guarantees about whether or not a write was successful.

What happens when a primary becomes inaccessible?

The remaining secondaries will gradually detect the failed connection and attempt to come to a consensus about what to do. If they have a majority (and remember, there can be only one majority in a cluster, so this suggests we're heading towards a CP system), they'll select the node with the highest optime (a monotonic clock maintained by each node) and promote it to be a new primary. Simultaneously, the minority nodes will detect that they no longer have a quorum, and demote the primary to a secondary so it can't accept writes.

If our primary is on n1, and we cut off n1 and n2 from the rest of the cluster, we expect either n3, n4, or n5 to become the new primary. Because this architecture demotes the original primary on n1, we won't find ourselves in the same split-brain problem we saw with Redis.

Consistency

So is MongoDB CP? There's a popular notion that MongoDB is a CP system, including exchanges like this, where all kinds of nuanced technical assertions about strong consistency are thrown around. At the same time, Mongo's documentation for replica sets explains carefully that Mongo may “revert operations”:

In some failover situations primaries will have accepted write operations that have not replicated to the secondaries after a failover occurs. This case is rare and typically occurs as a result of a network partition with replication lag. When this member (the former primary) rejoins the replica set and attempts to continue replication as a secondary the former primary must revert these operations or “roll back” these operations to maintain database consistency across the replica set.

“Revert” certainly doesn't sound like linearizability to me, but that bit about “maintain[ing] database consistency” doesn't sound so bad. What actually happens? Let's find out!

For this example, we'll be adding integers to a list in a MongoDB document by using the update command in a CaS loop–just like you'd use with any transactionally isolated database. Yes, we could use $addInSet, but I'm using this app as an example of atomic updates in general, and they have different oplog dynamics. Unacknowledged Up until recently, clients for MongoDB didn't bother to check whether or not their writes succeeded, by default: they just sent them and assumed everything went fine. This goes about as well as you'd expect. lein run mongo-unsafe -n 6000 salticid jepsen.partition For a while, writes continue to complete against n1. Then we see errors as the replica set fails over, like 3186 No replica set members available in [ { address:'n3/10.10.3.101:27017', ok:true, ping:0.8954104, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n4/10.10.3.95:27017', ok:true, ping:0.681164, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n5/10.10.3.32:27017', ok:true, ping:0.6231328, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n2/10.10.3.52:27017', ok:true, ping:0.51316977, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, },{ address:'n1/10.10.3.242:27017', ok:true, ping:0.37008655, isMaster:false, isSecondary:true, setName:rs0, maxBsonObjectSize:16777216, } ] for { "mode" : "primary"} During this time, the majority nodes (n3, n4, n5) are still secondaries, but they've agreed that the old primary is inaccessible. They compare optimes and race to elect a leader: $ salticid mongo.rs_stat 22:09:08 Starting... 22:09:08 MongoDB shell version: 2.4.1 22:09:08 connecting to: test 22:09:08 n1:27017 (not reachable/healthy) 1368940104/56 22:09:08 n2:27017 (not reachable/healthy) 1368940103/458 22:09:08 n3:27017 SECONDARY 1368940104/89 22:09:08 n4:27017 SECONDARY 1368940104/89 22:09:08 n5:27017 SECONDARY 1368940104/102 22:09:08 true 22:09:08 Finished22:09:23 n1:27017 (not reachable/healthy) 1368941926/66 22:09:23 n2:27017 (not reachable/healthy) 1368941961/70 22:09:23 n3:27017 SECONDARY 1368941962/9 22:09:23 n4:27017 SECONDARY 1368941961/45 22:09:23 n5:27017 PRIMARY 1368941963/11

N5 wins the race, and proceeds to accept writes. If we heal the partition with salticid jepsen.heal, and wait a few seconds, the nodes will detect the fully connected cluster and the new primary will step down, to allow n1 to resume its place. Now that the cluster has stabilized, we hit enter to check how many of our writes survived:

Hit enter when ready to collect results. Writes completed in 93.608 seconds 6000 total 5700 acknowledged 3319 survivors 2381 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 469 474 479 484 489 494 ... 3166 3168 3171 3173 3178 3183 0.95 ack rate 0.4177193 loss rate 0.0 unacknowledged but successful rate

42% write loss. Well, to some extent, this shouldn't be surprising, because we weren't checking to see whether the server was successful in applying our writes. Those 300 errors only came about when we tried to write to a secondary. But we never actually crashed a node, and we didn't see any signs of a split-brain condition with two simultaneous primaries–so why did Mongo drop data?

Remember those writes that completed on n1 just after the partition started? Those writes are still on n1, but never made it to n5. N5 proceeded without them. Now n1 and n5 are comparing notes, and n1 realizes that n5's optime is higher. N1 figures out the last point where the two agreed on the oplog, and rolls back to that point.

22:09:33 Sun May 19 05:09:33.032 [rsHealthPoll] replSet member n5:27017 is now in state PRIMARY 22:09:33 Sun May 19 05:09:33.207 [initandlisten] connection accepted from 10.10.3.95:37718 #6154 (23 connections now open) 22:09:33 Sun May 19 05:09:33.417 [rsBackgroundSync] replSet syncing to: n5:27017 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet our last op time fetched: May 19 05:08:37:2 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replset source's GTE: May 19 05:09:26:1 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet rollback 0 22:09:33 Sun May 19 05:09:33.438 [rsBackgroundSync] replSet ROLLBACK 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet rollback 1 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet rollback 2 FindCommonPoint 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback our last optime: May 19 05:08:37:2 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback their last optime: May 19 05:09:33:32 22:09:33 Sun May 19 05:09:33.439 [rsBackgroundSync] replSet info rollback diff in end of log times: -56 seconds 22:09:35 Sun May 19 05:09:33.621 [initandlisten] connection accepted from 10.10.3.32:59066 #6155 (24 connections now open) 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet rollback found matching events at May 19 05:08:24:66 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet rollback findcommonpoint scanned : 3798 22:09:35 Sun May 19 05:09:35.221 [rsBackgroundSync] replSet replSet rollback 3 fixup 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 3.5 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 4 n:1 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet minvalid=May 19 05:09:35 51985e8f:19 22:09:35 Sun May 19 05:09:35.222 [rsBackgroundSync] replSet rollback 4.6 22:09:35 Sun May 19 05:09:35.223 [rsBackgroundSync] replSet rollback 4.7 22:09:35 Sun May 19 05:09:35.223 [rsBackgroundSync] replSet rollback 5 d:0 u:1 22:09:35 Sun May 19 05:09:35.224 [rsBackgroundSync] replSet rollback 6 22:09:35 Sun May 19 05:09:35.236 [rsBackgroundSync] replSet rollback 7 22:09:35 Sun May 19 05:09:35.238 [rsBackgroundSync] replSet rollback done 22:09:35 Sun May 19 05:09:35.238 [rsBackgroundSync] replSet RECOVERING

During a rollback, all the writes the old primary accepted after the common point in the oplog are removed from the database and written to a BSON file in Mongo's rollbacks directory. If you're a sysadmin, you could go look at the rollback files to try and reconstruct the writes that the database dropped.

Well, theoretically. In my tests, it only does this in 1 out of 5 runs or so. Mostly, it just throws those writes away entirely: no rollback files, no nothing. I don't really know why.

This leads to an important discovery: it doesn't matter whether or not there were two primaries at the same time. We can still get conflicting writes if the old primary's state is causally unconnected from the new primary. A primary/secondary system, by itself, is not sufficient. We have to actually track causality on the writes themselves in order to be CP. Otherwise, newly elected primaries could diverge from the old one.

Safe

Aha! But that was with the old “unsafe” write concern! We should use the Safe write concern!

lein run mongo-safe -n 6000 ... 6000 total 5900 acknowledged 3692 survivors 2208 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 458 463 468 473 478 483 ... 3075 3080 3085 3090 3095 3100 0.98333335 ack rate 0.3742373 loss rate 0.0 unacknowledged but successful rate

Replicas-safe

WriteConcern.SAFE only verifies that the write was accepted by the primary. We need to make sure that the replicas have received our write before considering it a success.

lein run mongo-replicas-safe -n 6000 ... 6000 total 5695 acknowledged 3768 survivors 1927 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 712 717 722 727 732 737 ... 2794 2799 2804 2809 2814 2819 0.94916666 ack rate 0.338367 loss rate 0.0 unacknowledged but successful rate

Mongo still rolled back our writes. Why? Because REPLICAS_SAFE only checks to see if the write took place against two replicas. Our cluster has five nodes, so it's possible for writes to exist only on n1 and n2. A new primary can be elected without having seen our write. We need to wait until our write has been acknowledged by a majority of nodes.

Majority

lein run mongo -n 6000

Using WriteConcern.MAJORITY, we notice an improvement! When we cause the partition, writes pause immediately. The clients are blocked, waiting for the primary to confirm acknowledgement on nodes which will never respond. Eventually they time out. This is a hallmark of a CP system: we shouldn't be able to make progress without talking to a majority of nodes.

Writes completed in 157.425 seconds 6000 total 5700 acknowledged 5701 survivors 2 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ (596 598) 3 unacknowledged writes found! ヽ(´ー｀)ノ (562 653 3818) 0.95 ack rate 1.754386E-4 loss rate 5.2631577E-4 unacknowledged but successful rate

So 3 writes which supposedly failed actually succeeded. That's not so bad. On the other hand, Mongo still dropped two “successful” writes. Writes which were supposedly acknowledged by a majority of nodes.

I've been talking with 10gen, and they think this is a bug. When the network partitions, the server just checks off the “OK” field for the client's WriteConcern request, and sends it back. The client sees the “OK” message and… sensibly presumes the write was OK. This should be fixed in master, but is still present in 2.4.3, the most recent release.

Even if this bug is fixed, Mongo still isn't consistent. Those three writes which “failed” but showed up in the result set? Those are writes which were replicated to a majority node just prior to the partition, but never had the chance to acknowledge. Single writes are not atomic without a proper consensus protocol: those failed writes could materialize never, now, or some time in the future; potentially overwriting valid data.

Strategies for working with Mongo

On the one hand, Mongo advocates usually tell me “but network partitions are exceedingly rare in practice.” Then I talk to Mongo users who report their cluster fails over on a weekly basis. One thing to keep in mind is that heavy load–like seasonal writes, recovering from a crash, or performing a rollback–can slow a node down to the point where other nodes declare it dead. This is a partition. I've seen my test cluster perform dozens of rollbacks as nodes go unavailable attempting to elect a new primary. You should probably instrument your cluster to watch for these events in production.

As we've discussed before, one option is simply to accept data loss. Not all applications need consistency.

At the same time, you should watch those rollback files. Sometimes they don't appear even though they're supposed to, and not all data types will actually be rolled back. Conflicts in capped collections, for example, appear to simply discard all data in the collection past the conflict point by design.

People use capped collections for distributed queues. Think about that for a minute.

Moreover, a rollback file doesn't give you enough information to actually reconstruct the correct state of the system–at least in general. It's just a snapshot of “some state” the database had to discard. Because there's no well-defined ordering for these writes, you'll have to decide what that means for your particular data structures. If you can structure your documents as CRDTs and write a merge function, you'll be able to safely merge. If there's no conflicting copy of the document in the database, and you never delete those kinds of documents, you can restore it automatically. Immutable records can always be recovered, too.

Finally, you can drastically reduce the probability of write loss by using WriteConcern.MAJORITY. This is gonna impose a big performance hit. That's another hallmark of more-available CP systems.

To recap: MongoDB is neither AP nor CP. The defaults can cause significant loss of acknowledged writes. The strongest consistency offered has bugs which cause false acknowledgements, and even if they're fixed, doesn't prevent false failures.

In the next post, we'll talk about a database which emphasizes availability and partition tolerance: Riak.

Call me maybe: Redis

Previously on Jepsen, we explored two-phase commit in Postgres. In this post, we demonstrate Redis losing 56% of writes during a partition.

Redis is a fantastic data structure server, typically deployed as a shared heap. It provides fast access to strings, lists, sets, maps, and other structures with a simple text protocol. Since it runs on a single server, and that server is single-threaded, it offers linearizable consistency by default: all operations happen in a single, well-defined order. There's also support for basic transactions, which are atomic and isolated from one another.

Because of this easy-to-understand consistency model, many users treat Redis as a message queue, lock service, session store, or even their primary database. Redis running on a single server is a CP system, so it is consistent for these purposes.

Redis offers asynchronous primary->secondary replication. A single server is chosen as the primary, which can accept writes. It relays its state changes to secondary servers, which follow along. asynchronous means that you don't have to wait for a write to be replicated before the primary returns a response to the client. Writes will eventually arrive on the secondaries, if we wait long enough. In our application, all 5 clients will read from the primary on n1, and n2–n5 will be secondaries.

This is still a CP system, so long as we never read from the secondaries. If you do read from the secondaries, it's possible to read stale data. That's just fine for something like a cache! However, if you read data from a secondary, then write it to the primary, you could inadvertently destroy writes which completed but weren't yet replicated to the secondaries.

What happens if the primary fails? We need to promote one of the secondary servers to a new primary. One option is to use Heartbeat or a STONITH system which keeps a link open between two servers, but if the network partitions we don't have any way to tell whether the other side is alive or not. If we don't promote the primary, there could be no active servers. If we do promote the primary, there could be two active servers. We need more nodes.

If one connected component of the network contains a majority (more than N/2) of nodes, we call it a quorum. We're guaranteed that at most one quorum exists at any point in time–so if a majority of nodes can see each other, they know that they're the only component in that state. That group of nodes (also termed a “component”) has the authority to promote a new primary.

Redis has a system called Sentinel, which, when configured correctly, will try to establish a quorum between Sentinel nodes, agree on which Redis servers are alive, and promote any which appear to have failed. If we colocate the Sentinel nodes with the Redis nodes, this should allow us to promote a new primary in the majority component (should one exist).

What are the consistency and availability properties of Sentinel? Antirez, the author of Redis, says:

Redis Cluster for instance is a system biased towards consistency rather than availability. Redis Sentinel itself is an HA solution with the dogma of consistency and master slave setups.“

So we expect this system to be CP. Nodes in the minority component will become unavailable during the partition, and the majority component will elect a new primary. The Sentinels will then order clients to abandon the old primary and reconnect to the new one.

Before we begin, it's important to recognize that Redis does not guarantee durability. Since writes to disk and replication to secondaries are asynchronous, we can lose up to N seconds of the most recent writes. We should not, however, see gaps in the write log. If write n is present, so are writes 0, 1, … n-2, n-1.

Partitioning the cluster

Here's a simple application which writes a list of numbers to a Redis set. At this time Carmine, the Clojure Redis client, doesn't yet support failover using Sentinel. I've implemented a stricter version of the Sentinel client algorithm here: asking the server for a new primary before every write. Sentinel actually states that clients should only select new primaries when their connection is closed, which leaves a wider window for clients to disagree about which primary to use–leading to the possibility of more conflicting writes.

Let's give it a shot. First, set up Redis:

salticid redis.setup

Then, in two terminals, start up Redis and Redis Sentinel:

salticid redis.startsalticid redis.sentinel

You should see messages go by as the sentinels discover one another and ensure all the nodes are properly configured. You can check the replication status with salticid redis.replication. salticid redis.stop will shut down the Redis servers and sentinels alike.

Now let's run our application with lein run redis, then partition nodes n1 and n2 away from n3, n4, and n5 by running salticid jepsen.partition.

376 :ok 378 :ok 382 :ok 384 :ok 380 :ok 381 :ok 383 :ok 389 :ok 385 :ok

The first thing you'll notice is that even though n1 can't possibly be replicating its writes to n3, n4, and n5, writes against it are still completing successfully. N1 still thinks it's the primary, and since replication is asynchronous, it's acknowledging writes before they're sent to others in the cluster. The sentinels notice the failure, and n3, n4, and n5's sentinels promote a new primary:

19 May 00:37:36.314 # +sdown master mymaster 10.10.3.242 6379 19 May 00:37:36.616 # +sdown slave 10.10.3.52:6379 10.10.3.52 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.52:26379 10.10.3.52 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.242:26379 10.10.3.242 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:37.521 # +odown master mymaster 10.10.3.242 6379 #quorum 3/3 19 May 00:37:48.041 # +failover-detected master mymaster 10.10.3.242 6379 19 May 00:37:48.142 * +slave-reconf-inprog slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:48.143 * +slave-reconf-inprog slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.243 # +failover-end master mymaster 10.10.3.242 6379

Now n5 is a new primary–but n1 is still a primary too! Run salticid redis.replication to see the replication status of all nodes. We have two primary nodes, one in each component of the system. During this time both primaries are accepting writes independently. This is a classic split-brain scenario–and it violates the C in CP. Writes (and reads) in this state are not linearizable, because clients will see different results based on which node they're talking to.

Healing the partition

What happens when the network comes back online? salticid jepsen.heal repairs the partition, and the Sentinel nodes will discover each other again.

Redis Sentinel used to leave both primaries running indefinitely, which meant that any scenario like a partition or crash leading to failover would result in permanent split-brain. That's fixed in version 2.6.13, which came out last week. Now, Sentinel demotes the old primary on n1 when it comes back into contact with the majority component. The client sees:

1687 :ok 1686 READONLY You can't write against a read only slave. 1690 READONLY You can't write against a read only slave. 1693 :ok

… since n1 stepped down just after a Sentinel told us it was a primary. Clients are a part of the distributed system too. If a system's correctness depends on clients choosing specific nodes at specific times, the clients are now engaged in a distributed consensus problem–not to mention a clock synchronization problem. This is damn hard to do correctly.

Results

1991 :ok 1995 :ok 1996 :ok Hit enter when ready to collect results. Writes completed in 42.002 seconds 2000 total 1998 acknowledged 872 survivors 1126 acknowledged writes lost! (╯°□°）╯︵ ┻━┻ 50 51 52 53 54 55 ... 1671 1675 1676 1680 1681 1685 0.999 ack rate 0.5635636 loss rate 0.0 unacknowledged but successful rate

Out of 2000 writes, Redis claimed that 1998 of them completed successfully. However, only 872 of those integers were present in the final set. Redis threw away 56% of the writes it told us succeeded.

There are two problems at work here. First, notice that all the clients lost writes at the beginning of the partition: (50, 51, 52, 53, …). That's because they were all writing to n1 when the network dropped–and since n1 was demoted later, any writes made during that window were destroyed.

The second problem was caused by split-brain: both n1 and n5 were primaries up until the partition healed. Depending on which node they were talking to, some clients might have their writes survive, and others have their writes lost. The last few numbers in the set, mod 5, are all 0 and 1: the clients which kept using n1 as a primary, in the minority partition.

Note that both of these failure modes violate the durability guarantees we claimed earlier for Redis, because there are gaps in the write log.

Redis strategies

So you're running a distributed Redis install, and have realized that the design of Redis Sentinel (or, for that matter, any other failover system on top of an asynchronously replicated primary-secondary design) means you can lose a lot of data when a partition occurs. What can you do?

From an operations perspective, I recommend you try to understand the Sentinel consensus algorithm. I don't, and I've read it a dozen times.

I tried to write a formal verification of the algorithm in TLA+, and failed. There are dozens of interacting rules which can lead to phenomenally complex edge cases. The whole thing relies on clocks–and a special mode, TILT, which tries to detect sudden clock skew. You can specify a quorum which is smaller than the number of sentinels, allowing multiple quorums to operate simultaneously. Since the system auto-discovers peers, you've got to make sure nobody lets a new sentinel talk to your cluster, or you might find yourself with a quorum smaller than N/2. Client, sentinel, and Redis server topologies are all different things, which (I think) means…

• Sentinels could promote a node no clients can see
• Sentinels could demote the only node clients can actually reach
• Sentinels could assume a totally connected group of servers is unavailable
• Sentinels could promote an isolated node in a minority component, then destroy data on the majority by demoting their primary later

I (tentatively) recommend running exactly one sentinel on each server node, to force server and sentinel network topologies to align. Unless the partition doesn't happen in the network, but somewhere upwards of layer 3. Let's not talk about that possibility.

As an application developer working with Redis, one option is simply to estimate and accept your data loss. Not all applications have to be consistent. Microsoft estimates their WAN links have about 99.5% availability, and their datacenter networks are about 10x more reliable, going down for about 4 hours per year. Not all network failures result in this kind of partition. If you're running good network hardware in redundant configurations in real datacenters (e.g. not EC2), you cut your probability of partition down pretty far. Plenty of important applications can tolerate data loss for a few hours a year.

If you can't tolerate data loss, Redis Sentinel (and by extension Redis Cluster) is not safe for use as:

• A lock service
• A queue
• A database

If you use Redis as a lock service, this type of partition means you can take out the same lock twice–or up to N times for N nodes! Or maybe multiple times concurrently, against the same node, if you want to get weird about it. Write loss means locks can be resurrected from the dead, or vanish even when supposedly held. Bottom line: distributed lock services must be CP. Use a CP consensus system, like Zookeeper.

If you use Redis as a queue, it can drop enqueued items. However, it can also re-enqueue items which were removed. An item might be delivered zero, one, two, or more times. Most distributed queue services can provide reliable at-most-once or at-least-once delivery. CP queue systems can provide reliable exactly-once delivery with higher latency costs. Use them if message delivery is important.

If you use Redis as a database, be prepared for clients to disagree about the state of the system. Batch operations will still be atomic (I think), but you'll have no inter-write linearizability, which almost all applications implicitly rely on. If you successfully write A, then B, you expect that any client which can see B can also see A. This is not the case. Be prepared for massive write loss during a partition, depending on client, server, and sentinel topology.

Because Redis does not have a consensus protocol for writes, it can't be CP. Because it relies on quorums to promote secondaries, it can't be AP. What it can be is fast, and that's an excellent property for a weakly consistent best-effort service, like a cache. Redis Sentinel can do a great job of keeping your caches warm even in the face of network and node failure, and helping clients to gradually discover the correct nodes to interact with. Use Redis Sentinel for caching, sampling, statistics, and messaging where getting the wrong answer doesn't hurt much. Occasional windows of 50% write loss may be just fine for your user feeds, hit counters, or upvotes.

In the next post, we'll learn about a database with a related replication architecture: MongoDB.

Call me maybe: Postgres

Previously on Jepsen, we introduced the problem of network partitions. Here, we demonstrate that a few transactions which “fail” during the start of a partition may have actually succeeded.

Postgresql is a terrific open-source relational database. It offers a variety of consistency guarantees, from read uncommitted to serializable. Because Postgres only accepts writes on a single primary node, we think of it as a CP system in the sense of the CAP theorem. If a partition occurs and you can't talk to the server, the system is unavailable. Because transactions are ACID, we're always consistent.

Right?

Well… almost. Even though the Postgres server is always consistent, the distributed system composed of the server and client together may not be consistent. It's possible for the client and server to disagree about whether or not a transaction took place.

Postgres' commit protocol, like most relational databases, is a special case of two-phase commit, or 2PC. In the first phase, the client votes to commit (or abort) the current transaction, and sends that message to the server. The server checks to see whether its consistency constraints allow the transaction to proceed, and if so, it votes to commit. It writes the transaction to storage and informs the client that the commit has taken place (or failed, as the case may be.) Now both the client and server agree on the outcome of the transaction.

What happens if the message acknowledging the commit is dropped before the client receives it? Then the client does't know whether the commit succeeded or not! The 2PC protocol says that we must wait for the acknowledgement message to arrive in order to decide the outcome. If it doesn't arrive, 2PC deadlocks. It's not a partition-tolerant protocol. Waiting forever isn't realistic for real systems, so at some point the client will time out and declare an error occurred. The commit protocol is now in an indeterminate state.

To demonstrate this, we'll need an install of Postgres to work with.

salticid postgres.setup

This installs Postgres from apt, uploads some config files from jepsen/salticid/postgres, and creates a database for Jepsen. Then we'll run a simple application which writes a single row for each number, inside a transaction.

cd salticid lein run pg -n 100

If all goes well, you'll see something like

... 85 :ok 91 :ok 90 :ok 95 :ok 96 :ok Hit enter when ready to collect results. Writes completed in 0.317 seconds 100 total 100 acknowledged 100 survivors All 100 writes succeeded. :-D

Each line shows the number being written, followed by whether it was OK or not. In this example, all five nodes talk to a single postgres server on n1. Out of 100 writes, the clients reported that all 100 succeeded–and at the end of the test, all 100 numbers were present in the result set.

Now let's cause a partition. Since this failure mode only arises when the connection drops after the server decides to acknowledge, but before the client receives it, there's only a short window in which to begin the partition. We can widen that window by slowing down the network:

salticid jepsen.slow

Now, we start the test:

lein run pg

And while it's running, cut off all postgres traffic to and from n1:

salticid jepsen.drop_pg

If we're lucky, we'll manage to catch one of those acknowledgement packets in flight, and the client will log an error like:

217 An I/O error occurred while sending to the backend. Failure to execute query with SQL: INSERT INTO "set_app" ("element") VALUES (?) :: [219] PSQLException: Message: An I/O error occured while sending to the backend. SQLState: 08006 Error Code: 0 218 An I/O error occured while sending to the backend.

After that, new transactions will just time out; the client will correctly log these as failures:

220 Connection attempt timed out. 222 Connection attempt timed out.

We can resolve the partition with salticid jepsen.heal, and wait for the test to complete.

1000 total 950 acknowledged 952 survivors 2 unacknowledged writes found! ヽ(´ー｀)ノ (215 218) 0.95 ack rate 0.0 loss rate 0.002105263 unacknowledged but successful rate

So out of 1000 attempted writes, 950 were successfully acknowledged, and all 950 of those writes were present in the result set. Howerver, two writes (215 and 218) succeeded, even though they threw an exception claiming that a failure occurred! Note that this exception doesn't guarantee that the write succeeded or failed: 217 also threw an I/O error while sending, but because the connection dropped before the client's commit message arrived at the server, the transaction never took place.

There is no way to distinguish these cases from the client. A network partition–and indeed, most network errors–doesn't mean a failure. It means the absence of information. Without a partition-tolerant commit protocol, like extended three-phase commit, we cannot assert the state of the system for these writes.

2PC strategies

Two-phase commit protocols aren't just for relational databases. They crop up in all sorts of consensus problems. Mongodb's documents essentially comprise an asynchronous network, and many users implement 2PC on top of their Mongo objects to obtain multi-key transactions.

If you're working with two-phase commit, there are a few things you can do. One is to accept false negatives. In most relational databases, the probability of this failure occurring is low–and it can only affect writes which were in-flight at the time the partition began. It may be perfectly acceptable to return failures to clients even if there's a small chance the transaction succeeded.

Alternatively, you can use consistency guarantees or other data structures to allow for idempotent operations. When you encounter a network error, just retry them blindly. A highly available queue with at-least-once delivery is a great place to put repeatable writes which need to be retried later.

Finally, within some databases you can obtain strong consistency by taking note of the current transaction ID, and writing that ID to the database during the transaction. When the partition is resolved, the client can either retry or cancel the transaction at a later time, by checking whether or not that transaction ID was written. Again, this relies on having some sort of storage suitable for the timescales of the partition: perhaps a local log on disk, or an at-least-once queue.

In the next post, we look at a very different kind of consistency model: Redis Sentinel.

Call me maybe: Carly Rae Jepsen and the perils of network partitions

Carly Rae Jepsen may be singing about the cute guy next door, but she's also telling a story about the struggle to communicate with someone who doesn't even know you're alive. The suspense of observation: did he see me? Did he see me see him? The risks of speaking your mind and being shot down–or worse, ignored. The fundamental unknowability of The Other, as Lacan would have it. In short, this is a song about distributed systems.

Modern software systems are composed of dozens of components which communicate over an asynchronous, unreliable network. Understanding the reliability of a distributed system's dynamics requires careful analysis of the network itself. Like most hard problems in computer science, this one comes down to shared state. A set of nodes separated by the network must exchange information: “Did I like that post?” “Was my write successful?” “Will you thumbnail my image?” “How much is in my account?”

At the end of one of these requests, you might guarantee that the requested operation…

• will be visible to everyone from now on
• will be visible to your connection now, and others later
• may not yet be visible, but is causally connected to some future state of the system
• is visible now, but might not be later
• may or may not be visible: ERRNO_YOLO

These are some examples of the complex interplay between consistency and durability in distributed systems. For instance, if you're writing CRDTs to one of two geographically replicated Riak clusters with W=2 and DW=1, you can guarantee that write…

• is causally connected to some future state of the system
• will survive the total failure of one node
• will survive a power failure (assuming fsync works) of all nodes
• will survive the destruction of an entire datacenter, given a few minutes to replicate

If you're writing to ZooKeeper, you might have a stronger set of guarantees: the write is visible now to all participants, for instance, and that the write will survive the total failure of up to n/2 - 1 nodes. If you write to Postgres, depending on your transaction's consistency level, you might be able to guarantee that the write will be visible to everyone, just to yourself, or “eventually”.

These guarantees are particularly tricky to understand when the network is unreliable.

Partitions

Formal proofs of distributed systems often assume that the network is asynchronous, which means the network may arbitrarily duplicate, drop, delay, or reorder messages between nodes. This is a weak hypothesis: some physical networks can do better than this, but in practice IP networks will encounter all of these failure modes, so the theoretical limitations of the asynchronous network apply to real-world systems as well.

In practice, the TCP state machine allows nodes to reconstruct “reliable” ordered delivery of messages between nodes. TCP sockets guarantee that our messages will arrive without drops, duplication, or reordering. However, there can still be arbitrary delays–which would ordinarily cause the distributed system to lock indefinitely. Since computers have finite memory and latency bounds, we introduce timeouts, which close the connection when expected messages fail to arrive within a given time frame. Calls to read() on sockets will simply block, then fail.

Detecting network failures is hard. Since our only knowledge of the other nodes passes through the network, delays are indistinguishible from failure. This is the fundamental problem of the network partition: latency high enough to be considered a failure. When partitions arise, we have no way to determine what happened on the other nodes: are they alive? Dead? Did they receive our message? Did they try to respond? Literally no one knows. When the network finally heals, we'll have to re-establish the connection and try to work out what happened–perhaps recovering from an inconsistent state.

Many systems handle partitions by entering a special degraded mode of operation. The CAP theorem tells us that we can either have consistency (technically, linearizability for a read-write register), or availability (all nodes can continue to handle requests), but not both. What's more, few databases come close to CAP's theoretical limitations; many simply drop data.

In this series, I'm going to demonstrate how some real distributed systems behave when the network fails. We'll start by setting up a cluster and a simple application. In each subsequent post, we'll explore that application written for a particular database, and how that system behaves under partition.

Setting up a cluster

You can create partitions at home! For these demonstrations, I'm going to be running a five node cluster of Ubuntu 12.10 machines, virtualized using LXC–but you can use real computers, virtual private servers, EC2, etc. I've named the nodes n1, n2, n3, n4, and n5: it's probably easiest to add these entries to /etc/hosts on your computer and on each of the nodes themselves.

We're going to need some configuration for the cluster, and client applications to test their behavior. You can clone http://github.com/aphyr/jepsen to follow along.

To run commands across the cluster, I'm using Salticid (http://github.com/aphyr/salticid). I've set my ~/.salticidrc to point to configuration in the Jepsen repo:

load ENV['HOME'] + '/jepsen/salticid/*.rb'

If you take a look at this file, you'll see that it defines a group called :jepsen, with hosts n1 … n5. The user and password for each node is 'ubuntu'–you'll probably want to change this if you're running your nodes on the public internet.

Try salticid -s salticid to see all the groups, hosts, and roles defined by the current configuration:

$salticid -s salticid Groups jepsen Hosts: n1 n2 n3 n4 n5 Roles base riak mongo redis postgres jepsen net Top-level tasks First off, let's set up these nodes with some common software–compilers, network tools, etc. salticid base.setup The base role defines some basic operating system functions. base.reboot will reboot the cluster, and base.shutdown will unpower it. The jepsen role defines tasks for simulating network failures. To cause a partition, run salticid jepsen.partition. That command causes nodes n1 and n2 to drop IP traffic from n3, n4, and n5–essentially by running iptables -A INPUT -s n3 -j DROP iptables -A INPUT -s n4 -j DROP iptables -A INPUT -s n5 -j DROP That's it, really. To check the current network status, run jepsen.status. jepsen.heal will reset the iptables chains to their defaults, resolving the partition. To simulate slow networks, or networks which drop packets, we can use tc to adjust the ethernet interface. Jepsen assumes the inter-node interface is eth0. salticid jepsen.slow will add latency to the network, making it easier to reproduce bugs which rely on a particular message being dropped. salticid jepsen.flaky will probabilistically drop messages. Adjusting the inter-node latency and lossiness simulates the behavior of real-world networks under congestion, and helps expose timing dependencies in distributed algorithms–like database replication. A simple distributed system In order to test a distributed system, we need a workload–a set of clients which make requests and record their results for analysis. For these posts, we're going to work with a simple application which writes several numbers to a list in a database. Each client app will independently write some integers to the DB. With five clients, client 0 writes 0, 5, 10, 15, …; client 1 writes 1, 6, 11, and so on. For each write we record whether the database acknowledged the write successfully or whether there was an error. At the end of the run, we ask the database for the full set. If acknowledged writes are missing, or unacknowledged writes are present, we know that the system was inconsistent in some way: that the client application and the database disagreed about the state of the system. In this series of blog posts, we're going to run this app against several distributed databases, and cause partitions during its run. In each case, we'll see how the system responds to the uncertainty of dropped messages. As the song might go: I've written several implementations of this workload in Clojure. jepsen/src/jepsen/set_app.clj defines the application. (defprotocol SetApp ...) lists the functions an app has to implement, and (run n apps) sets up the apps and runs them in parallel, collects results, and shows any inconsistencies. Particular implementations live in src/jepsen/riak.clj, pg.clj,redis.clj, and so forth. You'll need a JVM and Leiningen 2 to run this code. Once you've installed lein, and added it to your path, we're ready to go! Next up on Jepsen, we take a look at how Postgresql's transaction protocol handles network failures. Riemann 0.2.0 Riemann 0.2.0 is ready. There's so much left that I want to build, but this release includes a ton of changes that should improve usability for everyone, and I'm excited to announce its release. Version 0.2.0 is a fairly major improvement in Riemann's performance and capabilities. Many things have been solidified, expanded, or tuned, and there are a few completely new ideas as well. There are a few minor API changes, mostly to internal structure–but a few streams are involved as well. Most functions will continue to work normally, but log a deprecation notice when used. I dedicated the past six months to working on Riemann full-time. I was fortunate to receive individual donations as well as formal contracts with Blue Mountain Capital, SevenScale, and Iovation during that time. That money gave me months of runway to help make these improvements–but even more valuable was the feedback I received from production users, big and small. I've used your complaints, frustrations, and ideas to plan Riemann's roadmap, and I hope this release reflects that. This release includes contributions from a broad cohort of open-source developers, and I want to recognize everyone who volunteered their time and energy to make Riemann better. In particular, I'd like to call out Pierre-Yves Ritschard, lwf, Ben Black, Thomas Omans, Dave Cottlehuber, and, well, the list goes on and on. You rock. These months have seen not only improvements to Riemann itself, but to the dashboard, clients, and integration packages. While I'm spending most of my time working on the core Riemann server, it's really this peripheral software that make Riemann useful for instrumenting production systems. There's no way I could hope to understand, let alone write and test the code to integrate with all these technologies–which makes your work particularly valuable. This week I started my new job at Factual. I won't be able to work 10 hours each day on Riemann any more, but I'm really happy with what we've built together, and I'll definitely keep working on the next release. To all Riemann's users and contributors, thank you. Here's to 0.2.0. New features • Arbitrary key-value (string) pairs on events • Hot config reloading • Integrated nrepl server • streams/sdo: bind together multiple streams as one • streams/split: like (cond), dispatch an event to the first matching stream • streams/splitp: like split, but on the basis of a specific predicate • config/delete-from-index: explicitly remove (similar) events from the index • streams/top: streaming top-k • streams/tag: add tags to events • RPM packaging • Init scripts, proper log dirs, and users for debian and RPM packages. Yeah, this means you can /etc/init.d/riemann reload, and Stuff Just Works ™. • folds/difference, product, and quotient. • Folds come in sloppy and strict variants which should “Do What I Mean” in most contexts. • Executor Services for asynchronous queued processing of events. • streams/exception-stream: captures exceptions and converts them to events. Improvements • http://riemann.io site • Lots more documentation and examples • Config file syntax errors are detected early • Cleaned up server logging • Helpful messages (line numbers! filenames!) for configuration errors • Silence closed channel exceptions • Cores can preserve services like pubsub, the index, etc through reloads • Massive speedups in TCP and UDP server throughput • streams/rate works in real-time: no need for fill-in any more • Graphite client is faster, more complete • Config files can include other files by relative path • streams/coalesce passes on expired events • riemann.email/mailer can take custom :subject and :body functions • riemann.config includes some common time/scheduling functions • streams/where returns whether it matched an event, which means (where) is now re-usable as a predicate in lots of different contexts. • streams/tagged-any and tagged-all return whether they matched • streams/counter is resettable to a particular metric, and supports expiry • Bring back “hyperspace core online” • Update to netty 3.6.1 • Reduced the number of threadpools used by the servers • Massive speedup in Netty performance by re-organizing execution handlers • core/reaper takes a :keep-keys option to specify which fields on an event are preserved • streams/smap ignores nil values for better use with folds • Update to aleph 0.3.0-beta15 • Config files ship with emacs modelines, too Bugfixes • Fixed a bug in part-time-fast causing undercounting under high contention • Catch exceptions while processing expired events • Fix a bug escaping metric names for librato • riemann.email/mailer can talk to SMTP relays again • graphite-path-percentiles will convert decimals of three or more places to percentile strings • streams/rollup is much more efficient; doesn't leak tasks • streams/rollup aggregates and forwards expired events instead of stopping • Fixed a threadpool leak from Netty • streams/coalesce: fixed a bug involving lazy persistence of transients • streams/ddt: fixed a few edge cases Internals • Cleaned up the test suite's logging • Pluggable transports for netty servers • Cores are immutable • Service protocol: provides lifecycle management for internal components • Tests for riemann.config • riemann.periodic is gone; replaced by riemann.time • Tried to clean up some duplicated functions between core, config, and streams • riemann.common/deprecated • Cleaned up riemann.streams, removing unused commented-out code • Lots of anonymous functions have names now, to help with profiling • Composing netty pipeline factories is much simpler • Clojure 1.5 Known bugs • Passing :host to websocket-server does nothing: it binds to * regardless. • Folds/mean throws when it receives empty lists • graphite-server has no tests • Riemann will happily overload browsers via websockets • streams/rate doesn't stop its internal poller correctly when self-expiring • When Netty runs out of filehandles, it'll hang new connections 65K messages/sec The Netty redesign of riemann-java-client made it possible to expose an end-to-end asynchronous API for writes, which has a dramatic improvement on messages with a small number of events. By introducing a small queue of pipelined write promises, riemann-clojure-client can now push 65K events per second, as individual messages, over a single TCP socket. Works out to about 120 mbps of sustained traffic. I'm really happy about the bulk throughput too: three threads using a single socket, sending messages of 100 events each, can push around 185-200K events/sec, at over 200 mbps. That throughput took 10 sockets and hundreds of threads to achieve in earlier tests. This isn't a particularly useful feature as far as clients go; it's unlikely most users will want to push this much from a single client. It is critical, however, for optimizing Riemann's server performance. The server, running the bulk test, consumes about 115% CPU on my 2.5Ghz Q8300. I believe this puts a million events/sec within reach for production hardware, though at that throughput CAS contention in the streams may become a limiting factor. If I can find a box (and network) powerful enough to test, I'd love to give it a shot! This is the last major improvement for Riemann 0.2.0. I'll be focusing on packaging and documentation tomorrow. :) Timelike 2: everything fails all the time In the previous post, I described an approximation of Heroku's Bamboo routing stack, based on their blog posts. Hacker News, as usual, is outraged that the difficulty of building fast, reliable distributed systems could prevent Heroku from building a magically optimal architecture. Coda Hale quips: Really enjoying @RapGenius’s latest mix tape, “I Have No Idea How Distributed Systems Work”. Coda understands the implications of the CAP theorem. This job is too big for one computer–any routing system we design must be distributed. Distribution increases the probability of a failure, both in nodes and in the network itself. These failures are usually partial, and often take the form of degradation rather than the system failing as a whole. Two nodes may be unable to communicate with each other, though a client can see both. Nodes can lie to each other. Time can flow backwards. CAP tells us that under these constraints, we can pick two of three properties (and I'm going to butcher them in an attempt to be concise): 1. Consistency: nodes agree on the system's state. 2. Availability: the system accepts requests. 3. Partition tolerance: the system runs even when the network delays or drops some messages. In the real world, partitions are common, and failing to operate during a partition is essentially a failure of availability. We must choose CP or AP, or some probabilistic blend of the two. There's a different way to talk about the properties of a distributed system–and I think Peter Bailis explains it well. Liveness means that at every point, there exists a sequence of operations that allows the “right thing” to happen–e.g. “threads are never deadlocked” or “you never get stuck in an infinite loop”. Safety means the system fails to do anything bad. Together, safety and liveness ensure the system does good things on time. With this in mind, what kind of constraints apply to HTTP request routing? 1. The system must be partition tolerant. 2. The system must be available–as much as possible, anyway. Serving web pages slower is preferable to not serving them at all. In the language of CAP, our system must be AP. 3. But we can't wait too long, because requests which take more than a minute to complete are essentially useless. We have a liveness constraint. 4. Requests must complete correctly, or not at all. We can't route an HTTP POST to multiple servers at once, or drop pieces of requests on the floor. We have a safety constraint. It's impossible to do this perfectly. If all of our data centers are nuked, there's no way we can remain available. If the network lies to us, it can be impractical to guarantee correct responses. And we can let latencies rise to accommodate failure: the liveness constraint is flexible. Finally, we're real engineers. We're going to make mistakes. We have limited time and money, limited ability to think, and must work with existing systems which were never designed for the task at hand. Complex algorithms are extraordinarily difficult to prove–let alone predict–at scale, or under the weird failure modes of distributed systems. This means it's often better to choose a dumb but predictable algorithm over an optimal but complex one. What I want to make clear is that Heroku is full of smart engineers–and if they're anything like the engineers I know, they're trying their hardest to adapt to a rapidly changing problem, fighting fires and designing new systems at the same time. Their problems don't look anything like yours or mine. Their engineering decisions are driven by complex and shifting internal constraints which we can't really analyze or predict. When I talk about “improved routing models” or “possible alternatives”, please understand that those models may be too complex, incompatible, or unpredictable to build in a given environment. Dealing with unreliability Returning to our Bamboo stack simulation, I'd like to start by introducing failure dynamics. Real nodes fail. We'll make our dynos unreliable with the faulty function, which simulates a component which stays online for an exponentially-distributed time before crashing, then returns error responses instead of allowing requests to pass through. After another exponentially-distributed outage time, it recovers, and the process continues. You can interpret this as a physical piece of hardware, or a virtual machine, or a hot-spare scenario where another node spins up to take the downed one's place, etc. This is a fail-fast model–the node returns failure immediately instead of swallowing messages indefinitely. Since the simulations we're running are short-lived, I'm going to choose relatively short failure times so we can see what happens under changing dynamics. (defn faulty-dyno [] (cable 2 ; Mean time before failure of 20 seconds, and ; mean time before resolution of one second. (faulty 20000 1000 (queue-exclusive (delay-fixed 20 (delay-exponential 100 (server :rails)))))) Again, we're using a pool of 250 dynos and a poisson-distributed load function. Let's compare an even load balancer with a pool of perfect dynos vs a pool of faulty ones: (test-node "Reliable min-conn -> pool of faulty dynos." (lb-min-conn (pool pool-size (faulty-dyno))))). Ideal dynos 95% available dynos Total reqs: 100000 100000 Selected reqs: 50000 50000 Successful frac: 1.0 0.62632 Request rate: 678.2972 reqs/s 679.6156 reqs/s Response rate: 673.90894 reqs/s 676.74567 reqs/s Latency distribution: Min: 24.0 4.0 Median: 93.0 46.5 95th %: 323.0 272.0 99th %: 488.0 438.0 Max: 1044.0 914.0 Well that was unexpected. Even though our pool is 95% available, over a third of all requests fail. Because our faulty nodes fail immediately, they have smaller queues on average–and the min-conns load balancer routes more requests to them. Real load balancers like HAProxy keep track of which nodes fail and avoid routing requests to them. Haproxy uses active health checks, but for simplicity I'll introduce a passive scheme: when a request fails, don't decrement that host's connection counter immediately. Instead, wait for a while–say 1 second, the mean time to resolution for a given dyno. We can still return the error response immediately, so this doesn't stop the load balancer from failing fast, but it will reduce the probability of assigning requests to broken nodes. (lb-min-conn :lb {:error-hold-time 1000} (pool pool-size (faulty-dyno)))))Total reqs: 100000 Selected reqs: 50000 Successful frac: 0.98846 Request rate: 678.72076 reqs/s Response rate: 671.3302 reqs/s Latency distribution: Min: 4.0 Median: 92.0 95th %: 323.0 99th %: 486.0 Max: 1157.0 Throughput is slightly lower than the ideal, perfect pool of dynos, but we've achieved 98% reliability over a pool of nodes which is only 95% available, and done it without any significant impact on latencies. This system is more than the sum of its parts. This system has an upper bound on its reliability: some requests must fail in order to determine which dynos are available. Can we do better? Let's wrap the load balancer with a system that retries requests on error, up to three requests total: (test-node "Retry -> min-conn -> faulty pool" (retry 3 (lb-min-conn :lb {:error-hold-time 1000} (pool pool-size (faulty-dyno))))))Total reqs: 100000 Selected reqs: 50000 Successful frac: 0.99996 Request rate: 676.8098 reqs/s Response rate: 670.16046 reqs/s Latency distribution: Min: 12.0 Median: 94.0 95th %: 320.0 99th %: 484.0 Max: 944.0 The combination of retries, least-conns balancing, and diverting requests away from failing nodes allows us to achieve 99.996% availability with minimal latency impact. This is a great building block to work with. Now let's find a way to compose it into a large-scale distributed system. Multilayer routing Minimum-connections and round-robin load balancers require coordinated state. If the machines which comprise our load balancer are faulty, we might try to distribute the load balancer itself in a highly available fashion. That would require state coordination with low latency bounds–and the CAP theorem tells us this is impossible to do. We'd need to make probabilistic tradeoffs under partitions, like allowing multiple requests to flow to the same backend. What if we punt on AP min-conns load balancers? What if we make them single machines, or CP clusters? As soon as the load balancer encountered a problem, it would become completely unavailable. (defn faulty-lb [pool] (faulty 20000 1000 (retry 3 (lb-min-conn :lb {:error-hold-time 1000} pool)))) Let's model the Bamboo architecture again: a stateless, random routing layer on top, which allocates requests to a pool of 10 faulty min-conns load balancers, all of which route over a single pool of faulty dynos: (test-node "Random -> 10 faulty lbs -> One pool" (let [dynos (dynos pool-size)] (lb-random (pool 10 (cable 5 (faulty-lb dynos)))))))Total reqs: 100000 Selected reqs: 50000 Successful frac: 0.9473 Request rate: 671.94366 reqs/s Response rate: 657.87744 reqs/s Latency distribution: Min: 10.0 Median: 947.0 95th %: 1620.0 99th %: 1916.0 Max: 3056.0 Notice that our availability dropped to 95% in the two-layer distributed model. This is a consequence of state isolation: because the individual least-conns routers don't share any state, they can't communicate about which nodes are down. That increases the probability that we'll allocate requests to broken dynos. A load-balancer which performed active state-checks wouldn't have this problem; but we can work around it by adding a second layer of retries on top of the stateless random routing layer: (let [dynos (pool pool-size (faulty-dyno))] (retry 3 (lb-random (pool 10 (cable 5 (faulty-lb dynos))))))))Total reqs: 100000 Selected reqs: 50000 Successful frac: 0.99952 Request rate: 686.97363 reqs/s Response rate: 668.2616 reqs/s Latency distribution: Min: 30.0 Median: 982.0 95th %: 1639.0 99th %: 1952.010000000002 Max: 2878.0 This doesn't help our latency problem, but it does provide three nines availability! Not bad for a stateless routing layer on top of a 95% available pool. However, we can do better. Isolating the least-conns routers from each other is essential to preserve liveness and availability. On the other hand, it means that they can't share state about how to efficiently allocate requests over the same dynos–so they'll encounter more failures, and queue multiple requests on the same dyno independently. One way to resolve this problem is to ensure that each least-conns router has a complete picture of its backends' state. We isolate the dynos from one another: This has real tradeoffs! For one, an imbalance in the random routing topology means that some min-conns routers will have more load than their neighbors–and they can't re-route requests to dynos outside their pool. And since our min-conns routers are CP systems in this architecture, when they fail, an entire block of dynos is unroutable. We have to strike a balance between more dynos per block (efficient least-conns routing) and more min-conn blocks (reduced impact of a router failure). Let's try 10 blocks of 25 dynos each: (test-node "Retry -> Random -> 10 faulty lbs -> 10 pools" (retry 3 (lb-random (pool 10 (cable 5 (faulty-lb (pool (/ pool-size 10) (faulty-dyno)))))))))Total reqs: 100000 Selected reqs: 50000 Successful frac: 0.99952 Request rate: 681.8213 reqs/s Response rate: 677.8099 reqs/s Latency distribution: Min: 30.0 Median: 104.0 95th %: 335.0 99th %: 491.0 Max: 1043.0 Whoah! We're still 99.9% available, even with a stateless random routing layer on top of 10 95% available routers. Throughput is slightly down, but our median latency is nine times lower than the homogenous dyno pool. I think system composition is important in distributed design. Every one of these components is complex. It helps to approach each task as an isolated system, and enforce easy-to-understand guarantees about that component's behavior. Then you can compose different systems together to make something bigger and more useful. In these articles, we composed an efficient (but nonscalable) CP system with an inefficient (but scalable) AP system to provide a hybrid of the two. If you have awareness of your network topology and are designing for singlethreaded, queuing backends, this kind of routing system makes sense. However, it's only going to be efficient if you can situate your dynos close to their least-conns load balancer. One obvious design is to put one load balancer in each rack, and hook it directly to the rack's switch. If blocks are going to fail as a group, you want to keep those blocks within the smallest network area possible. If you're working in EC2, you may not have clear network boundaries to take advantage of, and correlated failures across blocks could be a real problem. This architecture also doesn't make sense for concurrent servers–and that's a growing fraction of Heroku's hosted applications. I've also ignored the problem of dynamic pools, where dynos are spinning up and exiting pools constantly. Sadly I'm out of time to work on this project, but perhaps a reader will chime in a model for for distributed routing over concurrent servers–maybe with a nonlinear load model for server latencies? Thanks for exploring networks with me! Timelike: a network simulator For more on Timelike and routing simulation, check out part 2 of this article: everything fails all the time. There's also more discussion on Reddit. RapGenius is upset about Heroku's routing infrastructure. RapGenius, like many web sites, uses Rails, and Rails is notoriously difficult to operate in a multithreaded environment. Heroku operates at large scale, and made engineering tradeoffs which gave rise to high latencies–latencies with adverse effects on customers. I'd like to explore why Heroku's Bamboo architecture behaves this way, and help readers reason about their own network infrastructure. To start off with, here's a Rails server. Since we're going to be discussing complex chains of network software, I'll write it down as an s-expression: (server :rails) Let's pretend that server has some constant request-parsing overhead–perhaps 20 milliseconds–and an exponentially-distributed processing time with a mean of 100 milliseconds. (delay-fixed 20 (delay-exponential 100 (server :rails))) Heroku runs a Rails application in a virtual machine called a Dyno, on EC2. Since the Rails server can only do one thing at a time, the dyno keeps a queue of HTTP requests, and applies them sequentially to the rails application. We'll talk to the dyno over a 2-millisecond-long network cable. (defn dyno [] (cable 2 (queue-exclusive (delay-fixed 20 (delay-exponential 100 (server :rails)))))) This node can process an infinite queue of requests at the average rate of 1 every 124 milliseconds (2 + 20 + 100 + 2). But some requests take longer than others. What happens if your request lands behind a different, longer request? How long do you, the user, have to wait? Introducing Timelike Surprise! This way of describing network systems is also executable code. Welcome to Timelike. (cable 2 ...) returns a function which accepts a request, sleeps for 2 milliseconds, then passes the request to a child function–in this case, a queuing function returned by queue-exclusive. Then cable sleeps for 2 more milliseconds to simulate the return trip, and returns the response from queue-exclusive. The request (and response) are just a list of events, each one timestamped. The return value of each function, or “node”, is the entire history of a request as it passes through the pipeline. Network node composition is function composition–and since they're functions, we can run them. (let [responses (future* ; In a new thread, generate poisson-distributed ; requests. We want 10,000 total, spaced roughly ; 150 milliseconds apart. Apply them to a single ; dyno. (load-poisson 10000 150 req (dyno)))] (prn (first @responses)) (pstats @responses)) Timelike doesn't actually sleep for 150 milliseconds between requests. The openjdk and oracle schedulers are unreliable as it stands–and we don't actually need to wait that long to compute the value of this function. We just virtualize time for every thread in the network (in this case, a thread per request). All operations complete “immediately” according to the virtual clock, and the clock only advances when threads explicitly sleep. We can still exploit parallelism whenever two threads wake up at the same time, and advance the clock whenever there's no more work to be done at a given time. The scheduler will even detect deadlocks and allow the clock to advance when active threads are blocked waiting to acquire a mutex held by a thread which won't release it until the future… though that's a little slow. ;-) The upside of all this ridiculous lisp is that you can simulate concurrent systems where the results are independent of wall-clock time, which makes it easier to compare parallel systems at different scales. You can simulate one machine or a network of thousands, and the dynamics are the same. Here's an example request, and some response statistics. We discard the first and last parts of the request logs to avoid measuring the warm-up or cool-down period of the dyno queue. [{:time 0} {:node :rails, :time 66}] Total reqs: 10000 Selected reqs: 5000 Successful frac: 1.0 Request rate: 6.6635394 reqs/s Response rate: 6.653865 reqs/s Latency distribution: Min: 22.0 Median: 387.0 95th %: 1728.0 99th %: 2894.1100000000024 Max: 3706.0 Since the request and response rates are close, we know the dyno was stable during this time–it wasn't overloaded or draining its queue. But look at that latency distribution! Our median request took 3 times the mean, and some requests blocked for multiple seconds. Requests which stack up behind each other have to wait, even if they could complete quickly. We need a way to handle more than one request at a time. How do you do that with a singlethreaded Rails? You run more server processes at once. In Heroku, you add more dynos. Each runs in parallel, so with n dynos you can (optimally) process n requests at a time. (defn dynos "A pool of n dynos" [n] (pool n (dyno))) There's those funny macros again. Now you have a new problem: how do you get requests to the right dynos? Remember, whatever routing system we design needs to be distributed–multiple load balancers have to coordinate about the environment. Random routing Random load balancers are simple. When you get a new request, you pick a random dyno and send the request over there. In the infinite limit this is fine; a uniformly even distribution will distribute an infinite number of requests evenly across the cluster. But our systems aren't infinite. A random LB will sometimes send two, or even a hundred requests to the same dyno even when its neighbors go unused. That dyno's queue will back up, and everyone in that queue has to wait for all the requests ahead of them. (lb-random (dynos 250))Total reqs: 100000 Selected reqs: 50000 Successful frac: 1.0 Request rate: 1039.7172 reqs/s Response rate: 1012.6787 reqs/s Latency distribution: Min: 22.0 Median: 162.0 95th %: 631.0 99th %: 970.0 Max: 1995.0 A cool thing about random LBs is that they require little coordinated state. You don't have to agree with your peers about where to route a request. They also compose freely: a layer of random load balancers over another layer of random load balancers has exactly the same characteristics as a single random load balancer, assuming perfect concurrency. On the other hand, leaving nodes unused while piling up requests on a struggling dyno is silly. We can do better. Round-Robin routing Round-robin load balancers write down all their backends in a circular list (also termed a “ring”). The first request goes to the first backend in the ring; the second request to the second backend, and so forth, around and around. This has the advantage of evenly distributing requests, and it's relatively simple to manage the state involved: you only need to know a single number, telling you which element in the list to point to. (lb-rr (dynos 250))Total reqs: 100000 Selected reqs: 50000 Successful frac: 1.0 Request rate: 1043.9939 reqs/s Response rate: 1029.6116 reqs/s Latency distribution: Min: 22.0 Median: 105.0 95th %: 375.0 99th %: 560.0 Max: 1173.0 We halved our 95th percentile latencies, and cut median request time by roughly a third. RR balancers have a drawback though. Most real-world requests–like the one in our model–take a variable amount of time. When that variability is large enough (relative to pool saturation), round robin balancers can put two long-running requests on the same dyno. Queues back up again. Least-connections routing A min-conn LB algorithm keeps track of the number of connections which it has opened on each particular backend. When a new connection arrives, you find the backend with the least number of current connections. For singlethreaded servers, this also corresponds to the server with the shortest queue (in terms of request count, not time). (lb-min-conn (dynos 250))Total reqs: 100000 Selected reqs: 50000 Successful frac: 1.0 Request rate: 1049.7806 reqs/s Response rate: 1041.1244 reqs/s Latency distribution: Min: 22.0 Median: 92.0 95th %: 322.0 99th %: 483.0 Max: 974.0 Our 95th percentile latency has gone from 600 ms, to 375 ms, to 322ms. This algorithm is significantly more efficient over our simulated dynos than random or round-robin balancing–though it's still not optimal. An optimal algorithms would predict the future and figure out how long the request will take before allocating it–so it could avoid stacking two long-running requests in the same queue. Least-conns also means keeping track of lots of state: a number for every dyno, at least. All that state has to be shared between the load balancers in a given cluster, which can be expensive. On the other hand, we could afford up to a 200-millisecond delay on each connection, and still be more efficient than a random balancer. That's a fair bit of headroom. Meanwhile, in the real world Heroku can't use round-robin or min-conns load balancers for their whole infrastructure–it's just too big a problem to coordinate. Moreover, some of the load balancers are far apart from each other so they can't communicate quickly or reliably. Instead, Heroku uses several independent least-conns load balancers for their Bamboo stack. This has a drawback: with two least-conns routers, you can load the same dyno with requests from both routers at once–which increases the queue depth variability. Let's hook up a random router to a set of min-conns routers, all backed by the same pool of 250 dynos. We'll separate the random routing layer from the min-conns layer by a 5-millisecond-long network cable. (defn bamboo-test [n] (test-node (str "Bamboo with " n " routers") (let [dynos (dynos pool-size)] (lb-random (pool n (cable 5 (lb-min-conn dynos))))))) (deftest ^:bamboo bamboo-2 (bamboo-test 2)) (deftest ^:bamboo bamboo-4 (bamboo-test 4)) (deftest ^:bamboo bamboo-8 (bamboo-test 8)) (deftest ^:bamboo bamboo-16 (bamboo-test 16)) This plot sums up, in a nutshell, why RapGenius saw terrible response times. Latencies in this model–especially those killer 95th and 99th percentile times–rise linearly with additional least-conns routers (asymptotically bounded by the performance of a random router). As Heroku's Bamboo cluster grew, so did the variability of dyno queue depths. This is not the only routing topology available. In part 2, I explore some other options for distributed load balancing. If you want to experiment with Timelike for yourself, check out the github project. A typical Riemann contract I'm not a big fan of legal documents. I just don't have the resources or ability to reasonably defend myself from a lawsuit; retaining a lawyer for a dozen hours would literally bankrupt me. Even if I were able to defend myself against legal challenge, standard contracts for software consulting are absurd. Here's a section I encounter frequently: Ownership of Work Product. All Work Product (as defined below) and benefits thereof shall immediately and automatically be the sole and absolute property of Company, and Company shall own all Work Product developed pursuant to this Agreement. “Work Product” means each invention, modification, discovery, design, development, improvement, process, software program, work of authorship, documentation, formula, data, technique, know-how, secret or intellectual property right whatsoever or any interest therein (whether or not patentable or registrable under copyright or similar statutes or subject to analogous protection) that is made, conceived, discovered, or reduced to practice by Contractor (either alone or with others) and that (i) relates to Company’s business or any customer of or supplier to Company or any of the products or services being developed, manufactured or sold by Company or which may be used in relation therewith, (ii) results from the services performed by Contractor for Company or (iii) results from the use of premises or personal property (whether tangible or intangible) owned, leased or contracted for by Company. These paragraphs essentially state that any original thoughts I have during the course of the contract are the company's property. If the ideas are defensible under an IP law, I could be sued for using them in another context later. One must constantly weigh the risk of thinking under such a contract. “If I consider this idea now, I run the risk of inventing something important which I can never use again.” If you're contracted to work on an open-source project, the ramifications are bigger than just your life. Any code you write or data structure you invent is the company's property. You've got to trust that the company will make that code available under the project's license. If they don't do that, you're stuck: you can never implement that idea in the OSS project without running the risk of a lawsuit. Any work you do for the contract is potentially toxic, and must be withheld from the project and all its users, not to mention your future employers. You'd think IP lawyers would realize this is counter-productive, right? Contracts like this give you huge incentives to ignore the client's problems, to not listen to their ideas, to not think about solutions, because every novel thought carries an unknown risk of being locked away forever. I prefer informal contracts–an agreement that tries to express the reasonable obligations of two parties to each other in clear, sensible language. It's legally indefensible, I'm sure. I just want to understand my obligation to the company, and express my dedication to that task, my abilities and shortcomings, as well as possible. I also have an obligation to the open-source community–especially Riemann's users–to make improvements widely available. Balancing those takes care. So here's an example of the sort of agreement I usually propose, instead: Hello there! This is a contract between Kyle Kingsbury (I, me), and FooCorp. Time I'm going to help you instrument your systems with Riemann. I'll do my best to be available from 1000 to 1800 Pacific, every weekday, to speak with FooCorp's engineers, and may be available during other times as well. We can negotiate together to figure out what schedule makes sense. FooCorp will probably ask for features, research, documentation, or other improvements to Riemann. In addition to regular business hours, I may work on these problems “whenever I feel like it”–nights, weekends, etc, so long as the FooCorp employee I'm working with approves. I'll do my best to provide realistic time estimates for any significant undertaking, and suggest alternatives where sensible. I'll keep a daily log of the hours I work, and a high-level overview of what I accomplished each day. Termination of contract I'll complete up to 80 hours of work specifically for FooCorp. At any time, either I or FooCorp may terminate this agreement for any reason–for instance, if I complete all the work FooCorp asks for, if my work is unsatisfactory, or if I accept a job offer which prohibits outside employment. I'll clearly communicate if any circumstances like this arise, and ask FooCorp to kindly do the same. If this happens I'll do my best to reach a good stopping point, and continue supporting FooCorp through Riemann's open-source channels. Ownership of work Riemann is an open-source project. Any code, documentation, features, etc I produce which are suitable for the whole community will be integrated into Riemann's codebase, published on Github, and licensed under the EPL. FooCorp may, at their discretion, be thanked in the web site for their feedback, advice, financial support, etc. “This feature brought to you by…”, that sort of thing. Riemann has specific design goals. If FooCorp requests features which don't make sense for Riemann's design, I may refuse to make those changes. However, I'll do my best to suggest an alternate design (e.g. a library or standalone program) and help build that, instead. Whenever FooCorp requests, I can create works (documentation, software, etc.) which are not released as a part of the open-source project. This closed-source work will be delivered to FooCorp and will be their responsibility to maintain. I assign to FooCorp full ownership of this closed-source work, and unlimited reproduction rights, distribution rights, sublicensability, transferability, etc. Open-source code will likely be easier for FooCorp to maintain, and will receive community-generated improvements. For instance, I may fix bugs in open-source features later, and you can take advantage of those improvements. Ownership of ideas I may sign a nondisclosure agreement with FooCorp. Since all of Riemann's ideas are open-source, there is no risk in disclosing those ideas to FooCorp. I will not, to the best of my abilities, make use of or disclose proprietary or secret information from FooCorp in any context other than our work together. I may make use of proprietary or secret information to improve the open-source Riemann, but not in a way which discloses that information. For instance, I might discover that your company needs to push information about 2 million users through Riemann, and improve performance to allow that. I won't disclose that you have 2 million users–but I will write and release the code to make it possible. Any unique information, algorithms, data structures, vague notions, etc. I invent or research during this contract are not FooCorp's property, and may be disclosed or integrated into my work at any time. For example, if I realize I can improve performance by reorganizing streams in a certain way, FooCorp can't sue me if I make that performance improvement after our contract is over. No warranty I make no guarantee as to the correctness, safety, performance, etc of any works produced, but I'll certainly do my best during the contract. My goal is to get clean, fast, tested, runnable code into your hands. FooCorp is welcome to ask for help after our contract ends, through open-source, personal, or business channels–but I am under no contractual obligation to fulfill those requests. In practical terms, if I build something for you, let's test it during the contract and make sure it works! That way I can fix it if there's something wrong. Payment When our contract is over (due to early termination or at the end of 80 hours), or every 30 days, whichever comes first, I'll email an invoice to FooCorp for the hours of work completed. FooCorp will mail a check to Kyle Kingsbury within 30 days of that email receipt. My hourly rate for this contract is$100/hr.

Thanks for your consideration, and I look forward to working with you!

–Kyle

My contracts are significantly shorter than the standardized consulting contracts I usually see. They also emphasize different things. Typical contracts spend a lot of time concerned with listing rights, which is important because in a legal dispute you need to point to those exact words in the document. Typical contracts, on the other hand, give little guidance about how the relationship should work. I try to emphasize the role of good communication–pointing out places where we might disagree, or where it's important to come to a shared understanding as the relationship evolves. I try to suggest specific hours for my availability, which is something many contracts don't address at all. I also try to give examples to justify the terms I'd like to use–under the hypothesis that if we understand the spirit of the agreement, we're less likely to argue over technicalities.