A few weeks ago I criticized a proposal by Antirez for a hypothetical linearizable system built on top of Redis WAIT and a strong coordinator. I showed that the coordinator he suggested was physically impossible to build, and that anybody who tried to actually implement that design would run into serious problems. I demonstrated those problems (and additional implementation-specific issues) in an experiment on Redis' unstable branch.

Antirez' principal objections, as I understand them, are:

  1. Some readers mistakenly assumed that the system I discussed was a proposal for Redis Cluster.
  2. I showed that the proposal was physically impossible, but didn’t address its safety if it were possible.
  3. The impossible parts of the proposed system could be implemented in a real asynchronous network by layering in additional constraints on the leader election process.

I did not assert that this was a design for Redis Cluster, and the term “Redis Cluster” appeared nowhere in the post. To be absolutely clear; at no point in these posts have I discussed Redis Cluster. Antirez acknowledges that Cluster makes essentially no safety guarantees, so I haven’t felt the need to write about it.

I did, however, provide ample reference to multiple points in the mailing list thread where Antirez made strong claims about the consistency of hypothetical systems built with Redis WAIT and strong failover coordinators, and cited the gist in question as the canonical example thereof. I also thought it was clear that the system Antirez proposed were physically impossible, and that in addition to those flaws I analyzed weaker, practically achievable designs. However, comments on the post, on Twitter, and on Hacker News suggest a clarification is in order.

If Aphyr was interested in a real discussion, I could just agree about the obvious, that if you can totally control the orchestration of the system then synchronous replication is obviously a building block that can be part of a strong consistent system. Apparently he as just interested in spreading FUD. Congratulations.

Allow me to phrase this unambiguously: not only is this system impossible to build, but even if it were possible, it would not be linearizable.

There are obvious flaws in Antirez’s proposal, but I’m not convinced that simply explaining those flaws will do enough good. This is unlikely to be the last of Antirez'–or anyone else’s–consistency schemes, and I can’t possibly QA all of them! Instead, I’d like to raise the level of discussion around linearizability by showing how to find problems in concurrent algorithms–even if you don’t know where those problems lie.

So here, have a repo.

Knossos

Named after the ruins where Linear B was discovered, Knossos identifies whether or not a history of events from a concurrent system is linearizable. We’re going to be working through knossos.core and knossos.redis in this post. I’ll elide some code in this post for clarity, but it’s all there in the repo.

In Knossos, we analyze histories. Histories are a sequence of operations, each of which is a map:

[{:process :c2, :type :invoke, :f :write, :value 850919} {:process :c1, :type :ok, :f :write, :value 850914} {:process :c1, :type :invoke, :f :read, :value 850919} {:process :c1, :type :ok, :f :read, :value 850919}]

:process is the logical thread performing the operation. :invoke marks the start of an operation, and :ok marks its completion. :f is the kind of operation being invoked, and :value is an arbitrary argument for that operation, e.g. the value of a read or write. The interpretation of :f and :value depends on the datatype we’re modeling: for a set, we might support :add, :remove, and :contains.

To verify the history, we need a model which verifies that a sequence of operations applied in a particular order is valid. For instance, if we’re describing a register (e.g. a variable in a programming language–a mutable reference that points to a single value), we would like to enforce that every read sees the most recently written value. If we write 1, then write 2, then read, the read should see 2.

We represent a model with the function knossos.core/step which takes a model’s state, and an operation, and returns a new model. If the operation applied to that state would be invalid, step throws.

(defprotocol Model (step [model op])) (defrecord Register [value] Model (step [r op] (condp = (:f op) :write (Register. (:value op)) :read (if (or (nil? (:value op)) ; We don't know what the read was (= value (:value op))) ; Read was a specific value r (throw (RuntimeException. (str "read " (pr-str (:value op)) " from register " value)))))))

A Register implements the Model protocol and defines two functions: :write, which returns a modified copy of the register with the new value, and :read, which returns the register itself–if the read corresponds to the current value.

In a real experiment (as opposed to a mathematical model), we may not know what the read’s value will be until it returns. We allow any read with an unknown (nil?) value to succeed; when the read comes back we can re-evaluate the model with the value in mind.

This definition of step lets us reduce a sequence of operations over the model to produce a final state:

user=> (reduce step (Register. 0) [{:process :c1, :type :ok, :f :write, :value 4} {:process :c2, :type :ok, :f :read, :value 4}] #knossos.core.Register{:value 4} user=> (reduce step (Register. 0) [{:process :c1, :type :ok, :f :write, :value 4} {:process :c2, :type :ok, :f :read, :value 7}]) RuntimeException read 7 from register 4 knossos.core.Register (core.clj:43)

Now our problem consists of taking a history with pairs of (invoke, ok) operations, and finding an equivalent history of single operations which is consistent with the model. This equivalent single-threaded history is called a linearization; a system is linearizable if at least one such history exists. The actual definition is a bit more complicated, accounting for unmatched invoke/ok pairs, but this a workable lay definition.

Finding linearizations

The space of possible histories is really big. If we invoke 10 operations and none of them return OK, any subset of those operations might have taken place. So first we have to take the power set for any incomplete operations: that’s 2n. Then for each of those subsets we have to compute every possible interleaving of operations. If every operations' invocation and completion overlap, we construct a full permutation. That’s m!.

“Did you just tell me to go fuck myself?”

“I believe I did, Bob.”

My initial approach was to construct a radix tree of all possible histories (or, equivalently, a transition graph), and try to exploit degeneracy to prune the state space. Much of the literature on linearizability generates the full set of sequential histories and tests each one separately. Microsoft Research’s PARAGLIDER, in the absence of known linearization points, relies on this brute-force approach using the SPIN model checker.

A straightforward way to automatically check whether a concurrent history has a corresponding linearization is to simply try all possible permutations of the concurrent history until we either find a linearization and stop, or fail to find one and report that the concurrent history is not linearizable. We refer to this approach as Automatic Linearization… Despite its inherent complexity costs, we do use this method for checking concurrent histories of small length (e.g. less than 20). In practice, the space used for concurrent algorithms is typically small because incorrect algorithms often exhibit an incorrect concurrent history which is almost sequential.

In my experiments, enumerating all interleavings and testing each one started to break down around 12 to 16-operation histories.

Burckhardt, Dern, Musuvathi, and Tan wrote Line-Up, which verifies working C# algorithms by enumerating all thread interleavings through the CHESS model checker. This limits Line-Up to verifying only algorithms with a small state space–though this tradeoff allows them to do some very cool things around blocking and data races.

Two papers I know of attempt to reduce the search space itself. Golab, Li, and Shah developed a wicked-smart online checker using Gibbons and Korach’s algorithm and dynamic programming, but GK applies only to registers; I’d like to be able to test sets, queues, and other, more complex datatypes. Yang Liu, et al use both state space symmetry and commutative operations to drastically prune the search space for their linearizability analysis, using the PAT model checker.

I haven’t built symmetry reduction yet, but I do have a different trick: pruning the search space incrementally, as we move through the history, by using the model itself. This is a lot more complex than simply enumerating all possible interleavings–but if we can reject a branch early in the history, it saves huge amounts of work later in the search. The goal is to keep the number of possible worlds bounded by the concurrency of the history, not the length of the history.

So let’s do something paradoxical. Let’s make the problem even harder by multiplying the state space by N. Given a history of four invocations

[a b c d]

Let’s consider the N histories

[] [a] [a b] [a b c] [a b c d]

[] is trivially linearizable; nothing happens. [a] has two possible states: in one timeline, a completes. In another, a does not complete–remember, calls can fail. Assuming [a] passes the model, both are valid linearizations.

For the history [a b] we have five options. Neither a nor b can occur, one or the other could occur, or both could occur, in either order.

[] [a] [b] [a b] [b a]

Let’s say the register is initially nil, a is “write 5”, and b is “read 5”. [b] can’t take place on its own because we can’t read nil and get 5, and [b a] is invalid for the same reason. So we test five possibilities and find three linearizations. Not too bad, but we’re starting to see a hint of that n! explosion. By the third juncture we’ll have 16 sequential histories to test:

user=> (mapcat permutations (subsets ['a 'b 'c])) ([] (a) (b) (c) (a b) (b a) (a c) (c a) (b c) (c b) (a b c) (a c b) (b a c) (b c a) (c a b) (c b a))

And by the time we have to work with 10 operations concurrently, we’ll be facing 9864101 possible histories to test; it’ll take several minutes to test that many. But here’s the key: only some of those histories are even reachable, and we already have a clue as to which.

The 3-operation histories will include some histories which we already tested. [b a c], for instance, begins with [b a]; so if we already tested [b a] and found it impossible, we don’t even have to test [b a c] at all. The same goes for [b c a]–and every history, of any length, which begins with b.

So instead of testing all six 3-operation histories, we only have to test four. If the model rejects some of those, we can use those prefixes to reject longer histories, and so on. This dramatically cuts the state space, allowing us to test much longer histories in reasonable time.

Knossos uses Clojure’s shared-state immutable data structures to implement this search efficiently. We reduce over the history in order, maintaining a set of possible worlds. Every invocation bifurcates the set of worlds into those in which the operation happens immediately, and those in which it is deferred for later. Every completion prunes the set of worlds to only those in which the given operation completed. We can then ask Knossos to produce a set of worlds–linearized histories and the resulting states of their models–which are consistent with a given concurrent history.

Testing a model

Now let’s apply that linearizability checker to a particular system. We could measure a system experimentally, like I’ve done with Jepsen, or we could generate histories based on a formal model of a system. As an example, let’s test the model suggested by Antirez, describing a linearizable system built on top of Redis, WAIT, and a magical coordinator. As I described earlier, this model is physically impossible; it can not be built because the coordinator would need to violate the laws of physics. But let’s pretend we live on Planet Terah, and see whether the system is actually sound.

Antirez writes:

There are five nodes, using master-slave replication. When we start A is the master.

The nodes are capable of synchronous replication, when a client writes, it gets as relpy the number or replicas that received the write. A client can consider a write accepted only when “3” or more is returned, otherwise the result is not determined (false negatives are possbile).

Every node has a serial number, called the replication offset. It is always incremented as the replication stream is processed by a replica. Replicas are capable of telling an external entity, called “the controller”, what is the replication offset processed so far.

At some point, the controller, dictates that the current master is down, and decides that a failover is needed, so the master is switched to another one, in this way:

  1. The controller completely partition away the current master.
  2. The controller selects, out of a majority of replicas that are still available, the one with the higher replication offset.
  3. The controller tells all the reachable slaves what is the new master: the slaves start to get new data from the new master.
  4. The controller finally reconfigure all the clients to write to the new master.

So everything starts again. We assume that a re-appearing master, or other slaves that are again available after partitions heal, are capable of understand what the new master is. However both the old master and the slaves can’t accept writes. Slaves are read-only, while the re-apprearing master will not be able to return the majority on writes, so the client will not be able to consider the writes accepted.

In this model, it is possible to reach linearizability? I believe, yes, because we removed all the hard part, for which the strong protocols like Raft use epochs.

If you’ve spotted some of the problems in this approach, good work! But let’s say there were no obvious problems, and we weren’t sure how to find some. To do this, we’ll need a description of the system which is unambiguous and complete. Something a computer can understand.

First off, let’s describe a node:

(defn node "A node consists of a register, a primary it replicates from, whether it is isolated from all other nodes, a local replication offset, and a map of node names to known replication offsets." [name] {:name name :register nil :primary nil :isolated false :offset 0 :offsets {}})

Seems straightforward enough. This is a really simple model of a Redis server–one which only has a single register to read and write. We could extend it with more complex types, like lists and sets, but we’re trying to keep things simple. Notice how things like “Each node has a serial number, called the replication offset” have been translated into a field in a structure. We’ve also encoded things which were implicit in the proposal, like the fact that the WAIT command relies on the node knowing the replication offsets of its peers.

Remember, in proofs we try to deal as much as possible with immutable, pure systems; Clojure, Erlang, ML, and Haskell all lend themselves naturally to this approach. If you’re writing your checker in something like Ruby or Java, try to write immutable code anyway. It may be a bit unnatural, but it’ll really simplify things later.

(defn client "A client is a singlethreaded process which can, at any time, have at most one request in-flight to the cluster. It has a primary that it uses for reads and writes, and an in-flight request. Clients can be waiting for a response, in which case :wait will be the replication offset from the primary they're awaiting. :waiting is the value they're waiting for, if conducting a write." [name] {:name name :node nil :writing nil :waiting nil})

We’ll also need a coordinator. This one’s simple:

(defn coordinator "A controller is an FSM which manages the election process for nodes. It comprises a state (the phase of the election cycle it's in), and the current primary." [primary] {:state :normal :primary primary})

Next we’ll put all these pieces together into a full system. Phrases like “When we start A is the master,” are translated into code which picks the first node as the primary, and code which ensures that primary state is reflected by the other nodes and the coordinator.

(defn system "A system is comprised of a collection of nodes, a collection of clients, and a coordinator; plus a *history*, which is the set of operations we're verifying is linearizable." [] (let [node-names [:n1 :n2 :n3] nodes (->> node-names (map node) ; Fill in offset maps (map (fn [node] (->> node-names (remove #{(:name node)}) (reduce #(assoc %1 %2 0) {}) (assoc node :offsets))))) ; Initial primary/secondary state [primary & secondaries] nodes nodes (cons primary (map #(assoc % :primary (:name primary)) secondaries)) ; Construct a map of node names to nodes nodes (->> nodes (map (juxt :name identity)) (into {})) ; Construct clients clients (->> [:c1 :c2] (map client) (map #(assoc % :node (:name primary))) (map (juxt :name identity)) (into {}))] {:coordinator (coordinator (:name primary)) :clients clients :nodes nodes :history []}))

Note that we’ve introduced, for any given state of the system, the history of operation which brought us to this point. This is the same history that we’ll be evaluating using our linearizability checker.

This formally describes the state of the model. Now we need to enumerate the state transitions which bring the system from one state to another.

State transitions

First, we need a model of Redis reads and writes. Writes have two phases: an invocation and a response to the client–implemented with WAIT.

(def write-state (atom 0)) (defn client-write "A client can send a write operation to a node." [system] (->> system clients (filter free-client?) (filter (partial valid-client? system)) (map (fn [client] (let [; Pick a value to write value (swap! write-state inc) ; Find the node name for this client node (:node client) ; And the new offset. offset (inc (get-in system [:nodes node :offset]))] (-> system (assoc-in [:nodes node :register] value) (assoc-in [:nodes node :offset] offset) (assoc-in [:clients (:name client) :waiting] offset) (assoc-in [:clients (:name client) :writing] value) (log (invoke-op (:name client) :write value))))))))

client-write is a function which takes a system and returns a sequence of possible systems, each of which corresponds to one client initiating a write to its primary. We encode multiple constraints here:

  1. Clients can only initiate a write when they are not waiting for another response–i.e. clients are singlethreaded.
  2. Clients must be connected to a node which is not isolated and thinks that it is a primary. Note that this assumes a false linearization point: in the real world, these checks are not guaranteed to be instantaneous. We are being overly generous to simplify the model.

For each of these clients, we generate a unique number to write using (swap write-state inc), set the primary’s register to that value, increment the primary’s offset, and update the client–it is now waiting for that particular replication offset to be acknowledged by a majority of nodes. We also keep track of the value we wrote, just so we can fill it into the history later.

Finally, we update the history of the system, adding an invocation of a write, from this client, for the particular value.

When a client’s primary determines that a majority of nodes have acknowledged the offset that the client is waiting for, we can complete the write operation.

(defn client-write-complete "A reachable primary node can inform a client that its desired replication offset has been reached." [system] (->> system clients (remove free-client?) (filter (partial valid-client? system)) (keep (fn [client] (let [offset (-> system :nodes (get (:node client)) majority-acked-offset)] (when (<= (:waiting client) offset) (-> system (assoc-in [:clients (:name client) :waiting] nil) (assoc-in [:clients (:name client) :writing] nil) (log (ok-op (:name client) :write (:writing client))))))))))

Again, note the constraints: we can’t always complete writes. Only when the client is waiting, and the client is connected to a valid non-isolated primary, and the replication offset is acked by a majority of nodes, can these transitions take place.

keep is a Clojure function analogous to map and filter combined: only non-nil results appear in the output sequence. We use keep here to compactly express that only clients which have satisfied the majority offset acknowledgement constraint are eligible for completion.

Reads are similar to writes, but we make another generous allowance: reads are assumed to be a linearization point of the model, and therefore take place instantaneously. We add both invocation and completion operations to the log in one step.

(defn client-read "A client can read a value from its node, if primary and reachable. Reads are instantaneous." [system] (->> system clients (filter free-client?) (filter (partial valid-client? system)) (map (fn [client] (let [node (:node client) value (get-in system [:nodes node :register])] (-> system (log (invoke-op (:name client) :read nil)) (log (ok-op (:name client) :read value))))))))

Replication

Redis replication is asynchronous: in one phase the client copies data from the primary, and after that, updates the primary with its replication offset. We assume that each phase takes place instantaneously. Is replication actually a linearization point in Redis? I don’t know–but we’ll be generous again.

(defn replicate-from-primary "A node can copy the state of its current primary, if the primary is reachable." [system] (->> system nodes (remove :isolated) (keep (fn [node] (when-let [primary (get-node system (:primary node))] (when-not (:isolated primary) (-> system (assoc-in [:nodes (:name node) :register] (:register primary)) (assoc-in [:nodes (:name node) :offset] (:offset primary)) (log (op (:name node) :info :replicate-from-primary (:primary node))))))))))

Pretty straightforward: each node can, if it has a primary and neither is isolated, copy its register state and offset. We’ll be generous and assume the primary’s total oplog is applied instantly and atomically.

The acknowledgement process is basically the reverse: we update the offset cache in the primary, so long as nodes are connected.

(defn ack-offset-to-primary "A node can inform its current primary of its offset, if the primary is reachable." [system] (->> system nodes (remove :isolated) (keep (fn [node] (when-let [primary (get-node system (:primary node))] (when-not (:isolated primary) (-> system (assoc-in [:nodes (:primary node) :offsets (:name node)] (:offset node)) (log (op (:name node) :info :ack-offset-to-primary (:primary node))))))))))

Failover

Four functions, corresponding to each of the four steps in the algorithm. We ensure they happen in order by ensuring that a transition can only take place if the coordinator just completed the previous step.

1) The controller completely partition away the current master.

(defn failover-1-isolate "If the coordinator is in normal mode, initiates failover by isolating the current primary." [system] (let [coord (:coordinator system)] (when (= :normal (:state coord)) (-> system (assoc-in [:coordinator :state] :isolated) (assoc-in [:coordinator :primary] nil) (assoc-in [:nodes (:primary coord) :isolated] true) (log (op :coord :info :failover-1-isolate (:primary coord)))))))

Notice how we formalized the English statement by encoding properties of the network throughout the model: each state transition checks the partitioned state of the nodes involved. This is an oversimplification of the real system, because this part of the algorithm is impossible in an asynchronous network: it modifies the current primary’s state directly instead of sending it a message. We’re being generous by assuming the network propagates messages instantly; a more thorough model would explicitly model the loss and delay of messages.

2) The controller selects, out of a majority of replicas that are still available, the one with the higher replication offset.

Again, translation is straightforward; in the model we can freely violate the laws of physics.

(defn failover-2-select "If the coordinator has isolated the old primary, selects a new primary by choosing the reachable node with the highest offset." [system] (let [coord (:coordinator system)] (when (= :isolated (:state coord)) (let [candidates (->> system nodes (remove :isolated))] ; Gotta reach a majority (when (<= (inc (Math/floor (/ (count (nodes system)) 2))) (count candidates)) (let [primary (:name (apply max-key :offset candidates))] (-> system (assoc-in [:coordinator :state] :selected) (assoc-in [:coordinator :primary] primary) (log (op :coord :info :failover-2-select primary)))))))))

3) The controller tells all the reachable slaves what is the new master: the slaves start to get new data from the new master.

You know the drill. We create a false point of linearization and assume this broadcast is atomic.

(defn failover-3-inform-nodes "If the coordinator has selected a new primary, broadcasts that primary to all reachable nodes." [system] (let [coord (:coordinator system) primary (:primary coord)] (when (= :selected (:state coord)) (-> system (assoc-in [:coordinator :state] :informed-nodes) (assoc :nodes (->> system :nodes (map (fn [ [name node] ] [name (cond ; If the node is isolated, state is ; unchanged. (:isolated node) node ; If this is the new primary node, make ; it a primary. (= primary name) (assoc node :primary nil) ; Otherwise, set the primary. :else (assoc node :primary primary))])) (into {}))) (log (op :coord :info :failover-3-inform-nodes primary))))))

4) The controller finally reconfigure all the clients to write to the new master.

Here too!

(defn failover-4-inform-clients "If the coordinator has informed all nodes of the new primary, update all client primaries." [system] (let [coord (:coordinator system) primary (:primary coord)] (when (= :informed-nodes (:state coord)) (-> system (assoc-in [:coordinator :state] :normal) (assoc :clients (->> system :clients (map (fn [ [name client] ] [name (assoc client :node primary)])) (into {}))) (log (op :coord :info :failover-4-inform-clients primary))))))

At each step there is exactly one failover transition that can happen–since the coordinator is magically sequential and never fails.

(defn failover "All four failover stages combined." [system] (when-let [system' (or (failover-1-isolate system) (failover-2-select system) (failover-3-inform-nodes system) (failover-4-inform-clients system))] (list system')))

Putting it all together

We assume that a re-appearing master, or other slaves that are again available after partitions heal, are capable of understand what the new master is.

I struggled with this, and I actually don’t know how to interpret this part of the proposal. Erring on the side of safety, let’s omit any resurrection of isolated nodes. Once a node fails, it stays dead forever. If you let them come back, things get much more dangerous.

Only one last part remains: we need to express, in a single function, every allowable state transition.

(defn step "All systems reachable in a single step from a given system." [system] (concat (client-write system) (client-write-complete system) (client-read system) (replicate-from-primary system) (ack-offset-to-primary system) (failover system)))

OK. So now we can evolve any particular state of the system in various directions. Let’s take a look at a basic system:

user=> (use 'knossos.redis) nil user=> (-> (system) pprint) {:coordinator {:state :normal, :primary :n1}, :clients {:c1 {:name :c1, :node :n1, :writing nil, :waiting nil}, :c2 {:name :c2, :node :n1, :writing nil, :waiting nil}}, :nodes {:n1 {:name :n1, :register nil, :primary nil, :isolated false, :offset 0, :offsets {:n3 0, :n2 0}}, :n2 {:name :n2, :register nil, :primary :n1, :isolated false, :offset 0, :offsets {:n3 0, :n1 0}}, :n3 {:name :n3, :register nil, :primary :n1, :isolated false, :offset 0, :offsets {:n2 0, :n1 0}}}, :history []}

What happens if we do a write?

user=> (-> (system) client-write rand-nth pprint) {:coordinator {:state :normal, :primary :n1}, :clients {:c1 {:name :c1, :node :n1, :writing nil, :waiting nil}, :c2 {:name :c2, :node :n1, :writing 10, :waiting 1}}, :nodes {:n1 {:name :n1, :register 10, :primary nil, :isolated false, :offset 1, :offsets {:n3 0, :n2 0}}, :n2 {:name :n2, :register nil, :primary :n1, :isolated false, :offset 0, :offsets {:n3 0, :n1 0}}, :n3 {:name :n3, :register nil, :primary :n1, :isolated false, :offset 0, :offsets {:n2 0, :n1 0}}}, :history [{:process :c2, :type :invoke, :f :write, :value 10}]}

Notice that client-write returns two systems: one in which :c1 writes, and one in which :c2 writes. We pick a random possibility using rand-nth. In this case, :c2 wrote the number 10 to :n1, and is waiting for replication offset 1 to be acknowledged. :n1, but not :n2 or :n3, has received the write. Note the history of this system reflects the invocation, but not the completion, of this write.

Let’s try to complete the write:

user=> (-> (system) client-write rand-nth client-write-complete pprint) ()

There are no possible worlds where the write can complete at this point. Why? Because the replication offset on the primary hasn’t completed yet. This is the whole point of Redis WAIT: we can’t consider a write complete until it’s been acknowledged.

user=> (-> (system) client-write rand-nth replicate-from-primary first ack-offset-to-primary first client-write-complete pprint) ({:coordinator {:state :normal, :primary :n1}, :clients {:c1 {:name :c1, :node :n1, :writing nil, :waiting nil}, :c2 {:name :c2, :node :n1, :writing nil, :waiting nil}}, :nodes {:n1 {:name :n1, :register 15, :primary nil, :isolated false, :offset 1, :offsets {:n3 0, :n2 1}}, :n2 {:name :n2, :register 15, :primary :n1, :isolated false, :offset 1, :offsets {:n3 0, :n1 0}}, :n3 {:name :n3, :register nil, :primary :n1, :isolated false, :offset 0, :offsets {:n2 0, :n1 0}}}, :history [{:process :c1, :type :invoke, :f :write, :value 15} {:process :n2, :type :info, :f :replicate-from-primary, :value :n1} {:process :n2, :type :info, :f :ack-offset-to-primary, :value :n1} {:process :c1, :type :ok, :f :write, :value 15}]})

A successful write! The value 15 has been replicated to both :n1 and :n2, and with the offset map on :n1 updated, the WAIT request for the client can complete. The history reflects the invocation and completion of :c1’s write request.

Having written down the state of the system, and encoded all possible state transitions in the step function, we can find random trajectories through the system by interleaving calls to step and rand-nth. Because we don’t allow the resurrection of nodes, this system can simply halt, unable to make progress. In that case, we simply return the terminal state.

(defn trajectory "Returns a system from a randomized trajectory, `depth` steps away from the given system." [system depth] (if (zero? depth) system (let [possibilities (step system)] (if (empty? possibilities) ; Dead end system ; Descend (recur (rand-nth possibilities) (dec depth))))))

Because our trajectory evolution is randomized, the histories it generates will often contain extraneous garbage–repeated sequences of identical reads, for instance, or replicating the same state over and over again. We could go back and re-explore the state space, omitting certain transitions in the search of a simpler trajectory–but for now, we’ll take the random trajectories.

Model checking

We’ve built a simple model of a single-threaded linearizable register, a concurrent model of a hypothetical Redis system, and a verifier which tests that a history is linearizable with respect to a singlethreaded model. Now let’s combine these three elements.

First, a way to show the system that we wound up in, and the history that led us there. We’ll use linearizable-prefix to find the longest string of the history that was still linearizable–that’ll help show where, exactly, we ran out of options.

(defn print-system [system history] (let [linearizable (linearizable-prefix (->Register nil) history)] (locking *out* (println "\n\n### No linearizable history for system ###\n") (pprint (dissoc system :history)) (println "\nHistory:\n") (pprint linearizable) (println "\nUnable to linearize past this point!\n") (pprint (drop (count linearizable) history)))))

Then we’ll generate a bunch of trajectories of, say, 15 steps apiece, and show any which have nonlinearizable histories.

(deftest redis-test (dothreads [i 4] ; hi haters (dotimes [i 10000] (let [system (trajectory (system) 15)] ; Is this system linearizable? (let [history (complete (:history system)) linears (linearizations (->Register nil) history)] (when (empty? linears) (print-system system history)) (is (not (empty? linears)))))))

And we’re ready to go. Is the model Antirez proposed linearizable?

$ lein test knossos.redis-test ### No linearizable history for system ### {:coordinator {:state :normal, :primary :n2}, :clients {:c1 {:name :c1, :node :n2, :writing nil, :waiting nil}, :c2 {:name :c2, :node :n2, :writing 9, :waiting 2}}, :nodes {:n1 {:name :n1, :register 9, :primary nil, :isolated true, :offset 2, :offsets {:n3 0, :n2 1}}, :n2 {:name :n2, :register 5, :primary nil, :isolated false, :offset 1, :offsets {:n3 0, :n1 0}}, :n3 {:name :n3, :register nil, :primary :n2, :isolated false, :offset 0, :offsets {:n2 0, :n1 0}}}} History: [{:process :c2, :type :invoke, :f :write, :value 5} {:process :n2, :type :info, :f :replicate-from-primary, :value :n1} {:process :n2, :type :info, :f :ack-offset-to-primary, :value :n1} {:process :c2, :type :ok, :f :write, :value 5} {:process :n2, :type :info, :f :replicate-from-primary, :value :n1} {:process :c2, :type :invoke, :f :write, :value 9} {:process :n3, :type :info, :f :ack-offset-to-primary, :value :n1} {:process :c1, :type :invoke, :f :read, :value 9} {:process :c1, :type :ok, :f :read, :value 9} {:process :coord, :type :info, :f :failover-1-isolate, :value :n1} {:process :coord, :type :info, :f :failover-2-select, :value :n2} {:process :coord, :type :info, :f :failover-3-inform-nodes, :value :n2} {:process :coord, :type :info, :f :failover-4-inform-clients, :value :n2} {:process :n3, :type :info, :f :ack-offset-to-primary, :value :n2} {:process :n3, :type :info, :f :ack-offset-to-primary, :value :n2} {:process :c1, :type :invoke, :f :read, :value 5}] Unable to linearize past this point! ({:process :c1, :type :ok, :f :read, :value 5}) lein test :only knossos.redis-test/redis-test FAIL in (redis-test) (redis_test.clj:44) expected: (not (empty? linears)) actual: (not (not true)) Ran 1 tests containing 38340 assertions. 6 failures, 0 errors.

No, it isn’t.

What happened here?

Knossos generated a state of the system which it believes is possible, under the rules of the Redis model we constructed, but not linearizable with respect to the register model. Up until the final read, Knossos could still construct a world where things made sense, but that last read was inconsistent with every possible interpretation. Why?

Well, in the final state, n1 is isolated with value 9 at offset 2, n2 is a primary with value 5 at offset 1, and n3 thinks the value is nil. n3’s offset is 0; it never participated in this history, so we can ignore it.

  1. First, c2 writes 5 to n1. n2 replicates and acknowledges the write of 5, and c2’s write completes.
  2. n2 initiates a (noop) replication from n1.
  3. c2 initiates a write of 9 to n1. c1 concurrently initiates a read from n1, which will see 9.
  4. n2 completes its replication; state is unchanged.
  5. Then, a failover occurs. The coordinator performs all four steps atomically, so no concurrency questions there. n2 is selected as the new primary and n1 is isolated.
  6. n3 acknowledges its offset of 0 to n2 twice; both of which are noops since n2 already thinks n3’s offset is 0.
  7. Finally, c1 invokes a read from n2 and sees 5. This is the read which proves the system is inconsistent. Up until this point the history has been linearizable–we could have assumed, for instance, that the write of 9 failed and the register has always been 5, but that assumption was invalidated by the successful read of 9 by c1 earlier. We also could have assumed that the final read of 5 failed–but when it succeeded, Knossos ran out of options.

This case demonstrates that reads are a critical aspect of linearizability. Redis WAIT is not transactional. It allows clients to read unreplicated state from the primary node, which is just as invalid as reading stale data from secondaries.

I hope this illustrates beyond any shred of doubt: not only is Antirez’s proposal physically impossible, but even wildly optimistic formal interpretations of his proposal are trivially non-linearizable.

Yeah, this is Fear, Uncertainty, and Doubt. You should be uncertain about algorithms without proofs. You should doubt a distributed system without a formal model. You should be fearful that said system will not live up to its claims.

Now you’ve got another tool to validate that uncertainty for yourself.

Math is hard; let’s go model checking

Proving linearizability is hard. Much harder than proving a system is not linearizable, when you get down to it. All I had to do here was find a single counterexample; but proving linearizability requires showing that every history is valid. Traditionally one does this by identifying all the linearization points of an algorithm–the points where things take place atomically–which is a subtle and complex process, especially where the linearization point depends on runtime behavior or lies outside the code itself.

Moreover, I am not that great at proofs–and I don’t want to exclude readers who don’t have the benefit of formal training. I want to equip ordinary programmers with the motivation and tools to reason about their systems–and for that, model checking is a terrific compromise.

There are many tools available for formal modeling of concurrency. Leslie Lamport’s TLA+ is the canonical tool for concurrency proofs, but its learning curve is steep to say the least and I have a lot of trouble trying to compose its models. Bell Labs' Spin is more accessible for programmers, encoding its models in a language called Promela. Spin has excellent tooling–it can even extract models from C code with assistance. There’s also Erigone, a reimplementation of Spin, and the aforementioned Line-Up for C#.

Knossos is a dead-simple verification system I hacked out in a week; it takes advantage of Clojure’s concise data-structure literals, immutable shared-state data structures, and concise syntax to make designing models and checking their linearizability easier. Knossos probably has some bugs, so be sure to check the failure cases by hand!

No matter which model checker you use, all of these systems let you formalize your algorithms by writing them down in a concise, unambiguous form–either a modeling language or a full programming language– and then verify that those models conform to certain invariants by exploring the state space. By working with a proof assistant, some of these specifications can also prove that the invariants hold always, instead of only proving that the invariants can fail to hold.

We verified a toy system in this blog post, but all the key elements are there. State, transition functions, invariants, and a model to verify against. We use hierarchical data structures and functions to break up the model into smaller, more manageable pieces. We generated counterexamples from probabilistic trajectories through the model.

Real models look just like this. Take a look at the model and proof sketch of the RAFT consensus algorithm, and see if you can spot the definitions of state, transitions, and invariants. Note that this isn’t a full proof–more like a sketch–and it relies on some propositions like type safety which are not mechanically verified, but this paper illustrates both formal and English proof techniques nicely.

This is the kind of argument you need to make, as a database engineer, before asserting a given system is linearizable. Formal verification will catch both obvious and subtle bugs before you, or your readers, try to implement them.

In a recent blog post, antirez detailed a new operation in Redis: WAIT. WAIT is proposed as an enhancement to Redis' replication protocol to reduce the window of data loss in replicated Redis systems; clients can block awaiting acknowledgement of a write to a given number of nodes (or time out if the given threshold is not met). The theory here is that positive acknowledgement of a write to a majority of nodes guarantees that write will be visible in all future states of the system.

As I explained earlier, any asynchronously replicated system with primary-secondary failover allows data loss. Optional synchronous replication, antirez proposes, should make it possible for Redis to provide strong consistency for those operations.

WAIT means that if you run three nodes A, B, C where every node contains a Sentinel instance and a Redis instance, and you “WAIT 1” after every operation to reach the majority of slaves, you get a consistent system.

WAIT can be also used, by improving the failover procedure, in order to have a strong consistent system (no writes to the older master from the point the failure detection is positive, to the end of the failover when the configuration is updated, or alternative, disconnect the majority of slaves you can reach during the failure detection so that every write will fail during this time).

Antirez later qualified these claims:

I understand this not the “C” consistency of “CAP” but, before: the partition with clients and the (old) master partitioned away would receive writes that gets lost. after: under certain system models the system is consistent, like if you assume that crashed instances never start again. Of course, the existence of synchronous replication does not prove that the system is linearizable; only some types of failover preserve the ordering of writes.

As I showed in Call me maybe: Redis, Redis Sentinel will enter split-brain during network partitions, causing significant windows of data loss. Exactly how much data loss depends on the sentinel configuration and the failure topology. Antirez finally suggested that if we replace Redis Sentinel with a strongly consistent coordination service for failover, Redis WAIT could provide full linearizability.

The failover proposal

In a five-node cluster, assume every write is followed by WAIT 2 to ensure that a majority of nodes have received the write. In the event of a failure, a strong external coordinator goes through the following election process:

  1. Totally partition the old primary P1.
  2. Of all reachable nodes, identify the node with the highest replication offset. Let that node be P2.
  3. Promote P2.
  4. Inform all reachable nodes that they are to follow P2.
  5. Have all reachable clients switch to the new primary.

There are several serious problems with this design. I hinted at these issues in the mailing list with limited success. Kelly Sommers pointed out repeatedly that this design has the same issues as Cassandra’s CL.ALL. Replication alone does not ensure linearizability; we have to be able to roll back operations which should not have happened in the first place. If those failed operations can make it into our consistent timeline in an unsafe way, perhaps corrupting our successful operations, we can lose data.

… surprisingly I think that transactional rollbacks are totally irrelevant.

Ultimately I was hoping that antirez and other contributors might realize why their proposal for a custom replication protocol was unsafe nine months ago, and abandon it in favor of an established algorithm with a formal model and a peer-reviewed proof, but that hasn’t happened yet. Redis continues to accrete homegrown consensus and replication algorithms without even a cursory nod to formal analysis.

OK, fine. Let’s talk about the failover coordinator.

The coordinator

Redis Sentinel is not linearizable; nor are its proposed improvements. Whatever failover system you’re planning to use here is going to need something stronger. In fact, we can’t even guarantee safety using a strong coordination service like ZooKeeper to serialize the failover operations, because ZooKeeper cannot guarantee the mutual exclusion of two services in the presence of message delays and clock skews. Let’s paper over that issue by introducing large delays and carefully ordering our timeouts.

It gets worse. Even if we did have a perfect mutex around the coordinator, two coordinators could issue messages to the same Redis nodes which arrive out of order. TCP does not guarantee ordering between two distinct TCP streams, which means we might see coordinator A initiate a failover process then time out halfway; followed by coordinator B which begins the failover process, only to be interrupted on some nodes by messages en-route through the network from coordinator A. Don’t believe me? TCP message delays have been reported in excess of ninety seconds. That one took out Github.

It gets even worse. If the original primary P1 is isolated from the coordinator, the coordinator will not be able to force P1 to step down. Indeed, P1 could remain a primary for the entire duration of a failover, accepting writes, making state changes, and attempting to replicate those changes to other nodes. This is dangerous because we cannot atomically guarantee that the new majority of nodes will reject those writes.

  1. A client writes to P1, which replicates to secondaries S2, S3, S4, and S5.
  2. The coordinator attempts to elect a new primary, and sees S2, S3, S4, and S5.
  3. Without loss of generality, assume S2 has the highest replication offset. The coordinator promotes S2 to P2.
  4. P1 receives acks from S3, S4, and S5, and, having reached a majority, returns success to the client.
  5. The coordinator reparents S3, S4, and S5 to P2, destroying the write.

You might try to solve this by forcing S2–S5 into a read-only, non-replicating mode before attempting to promote a new primary, but that gets into a whole other morass of issues around multiple state transitions and partial failures. Suffice it to say: it’s difficult to solve this by simply pausing nodes first. Maybe impossible? I’m not sure.

Typically, replication protocols solve this problem by guaranteeing that writes from S1 can not be accepted after S2–S5 acknowledge to the coordinator that they will participate in a new cohort. This often takes the form of a ballot (Paxos), epoch (ZAB, Viewstamped Replication), or term (RAFT). Redis has no such construct, and antirez seems to eschew it as unnecessary:

In this model, it is possible to reach linearizability? I believe, yes, because we removed all the hard part, for which the strong protocols like Raft use epochs.

This brings us to a different, but related series of problems.

The servers

By using the offset in the replication log as the determining factor in which nodes are promotable, the proposed failover design opens the door for significant data loss.

Imagine the following sequence:

  1. The primary P1, with log offset O1, becomes isolated from S3, S4, and S5.
  2. Clients writing to P1 see their operations using WAIT 2 fail.
  3. S3 is promoted to P3, with offset O1=O3. Clients writing to P3 see their writes succeed, replicated to S4 and S5.
  4. More operations occur on P1 than on P3. O1 becomes greater than O3.
  5. The partition heals; the coordinator can see both P1 and P3.
  6. The coordinator sees that O1 is higher than O3, and chooses P1 as the new primary.
  7. P3 is demoted, and all its acknowledged writes are destroyed.

Don’t believe me? Here, let’s try it. Here’s a function which implements (more or less) the proposed coordinator algorithm. Note that we’re not demoting the original primary because it may not be reachable.

(defn elect! "Forces an election among the given nodes. Picks the node with the highest replication offset, promotes it, and re-parents the secondaries." [nodes] (let [highest (highest-node nodes)] (log "Promoting" highest) (with-node highest (redis/slaveof "no" "one")) (doseq [node (remove #{highest} nodes)] (log "Reparenting" node "to" highest) (with-node node (redis/slaveof highest 6379)))))

And in the test, we’ll use WAIT to ensure that only writes which are successfully replicated to 2 or more replicas are considered successful:

(add [app element] (try (redis/with-conn pool spec (redis/sadd key element)) ; Block for 2 secondaries (3 total) to ack. (let [acks (redis/with-conn pool spec (taoensso.carmine.protocol/send-request! "WAIT" 2 1000))] (if (< acks 2) (do (log "not enough copies: " acks) error) ok)) (catch Exception e (if (->> e .getMessage (re-find #"^READONLY")) error (throw e))))

I’m gonna punt on informing clients which node is the current primary; we’ll just issue set-add requests to each node independently. Jepsen only cares about whether successful writes are lost, so we’ll let those writes fail and log ‘em as unsuccessful.

Initially, the offset for all 5 nodes is 15. Writes complete successfully on P1 and fail on S2–S5.

healthy.png

We cut off P1 and S2 from S3, S4, and S5. S3, S4, and S5 all have equal offsets (1570), so we promote S3 to P3. As soon as the partition takes effect, writes to P1 begin to fail–we see not enough copies: 1, and an :error status for write 110, 115, and so on. Latencies on P1 jump to 1 second, since that’s how long we’re blocking for using WAIT.

failover1.png

Writes complete successfully on P3, since it can see a majority of nodes: itself, S4, and S5. We heal the partition and initiate a second election. Since P1’s offset (8010) is higher than P3’s (6487), we preserve P1 as a primary and demote all other nodes to follow it. All P3’s writes accepted during the partition are silently destroyed.

failover2.png

Note that there’s actually a window here where writes can successfully take place on either of P1 or P2 in a mixed sequence, depending on the order in which the secondaries are reparented. Both 560 and 562 complete successfully, even though 562 was written to S3, which was demoted at that point in time. Some weird opportunity for timing anomalies there.

results.png

These results are catastrophic. In a partition which lasted for roughly 45% of the test, 45% of acknowledged writes were thrown away. To add insult to injury, Redis preserved all the failed writes in place of the successful ones.

Additional issues

Two bugs amplify this problem. Note that this is the unstable branch, so this isn’t a huge deal right now:

First, Redis secondaries return -1 for their offset when they detect the primary is down. Returning a special status code makes sense… but not if you’re using the offset to determine which nodes become the primary. This could cause the highest nodes to appear the lowest, and vice versa. If a fresh node has offset 0, and all other nodes return offset -1, this could cause a cluster to erase all data ever written to it.

Second, Redis resets the replication offset to zero every time a node is promoted. Again, a reasonable choice in isolation, but it actually maximizes the chances that this particular failure mode will occur. The current design is biased towards data loss.

Even if these bugs were corrected, the problem could still occur. All that’s required is for more operations to happen on P1 than P3 after the two diverge.

Going forward

Distributed systems design is really hard, but engineers continue to assume otherwise:

However I think that distributed systems are not super hard, like kernel programming is not super hard, like C system programming is not super hard. Everything new or that you don’t do in a daily basis seems super hard, but it is actually different concepts that are definitely things everybody here in this list can master.

For sure a few months of exposure will not make you able to provide work like Raft or Paxos, but the basics can be used in order to try to design practical systems, that can be improved over time.

I assert just the opposite: we need formal theory, written proofs, computer verification, and experimental demonstration that our systems make the tradeoffs we think they make. Throughout the Redis criticism thread and discussion on Twitter, I see engineers assuming that they understand the tradeoffs despite the presence of gaping holes in the system’s safety net.

This behavior endangers users.

These list threads and blog posts are the sources that users come to, years later, to understand the safety properties of our systems. They’ll read our conjectures and idle thoughts and tease out some gestalt, and use that to build their systems on top of ours. They’ll miss subtle differences in phrasing and they won’t read every reply. Most won’t do any reading at all; they’re not even aware that these problems could exist.

Engineers routinely characterize Redis’s reliability as “rock solid”.

This is part of why I engage in these discussions so vocally. As systems engineers, we continually struggle to erase the assumption of safety before that assumption causes data loss or downtime. We need to clearly document system behaviors so that users can make the right choices.

We must understand our systems in order to explain them–and distributed systems are hard to understand. That’s why it’s so important that we rely on formal models, on proofs, instead of inventing our own consensus protocols–because much of the hard work of understanding has been done already. We can build on that work. Implementing a peer-reviewed paper is vastly simpler than trying to design and verify an algorithm from scratch–or worse, evolving one piecemeal, comprised of systems which encode subtly different assumptions about their responsibilities to the world. Those designs lead to small gaps which, viewed from the right angle, become big enough to drive a truck through.

I wholeheartedly encourage antirez, myself, and every other distributed systems engineer: keep writing code, building features, solving problems–but please, please, use existing algorithms, or learn how to write a proof.

In response to my earlier post on Redis inconsistency, Antirez was kind enough to help clarify some points about Redis Sentinel's design.

First, I'd like to reiterate my respect for Redis. I've used Redis extensively in the past with good results. It's delightfully fast, simple to operate, and offers some of the best documentation in the field. Redis is operationally predictable. Data structures and their performance behave just how you'd expect. I hear nothing but good things about the clarity and quality of Antirez' C code. This guy knows his programming.

I think Antirez and I agree with each other, and we're both saying the same sorts of things. I'd just like to expand on some of these ideas a bit, and generalize to a broader class of systems.

First, the distributed system comprised of Redis and Redis Sentinel cannot be characterized as consistent. Nor can MongoDB with anything less than WriteConcern.MAJORITY, or MySQL with asynchronous replication, for that matter. Antirez writes:

What I'm saying here is that just the goal of the system is:

1) To promote a slave into a master if the master fails.
2) To do so in a reliable way.

Redis Sentinel does reliably promote secondaries into primaries. It is so good at this that it can promote two, three, or all of your secondaries into primaries concurrently, and keep them in that state indefinitely. As we've seen, having causally unconnected primaries in this kind of distributed system allows for conflicts–and since Redis Sentinel will destroy the state on an old primary when it becomes visible to a quorum of Sentinels, this can lead to arbitrary loss of acknowledged writes to the system.

Ok I just made clear enough that there is no such goal in Sentinel to turn N Redis instances into a distributed store,

If you use any kind of failover, your Redis system is a distributed store. Heck, reading from secondaries makes Redis a distributed store.

So you can say, ok, Sentinel has a limited scope, but could you add a feature so that when the master feels in the minority it no longer accept writes? I don't think it's a good idea. What it means to be in the minority for a Redis master monitored by Sentinels (especially given that Redis and Sentinel are completely separated systems)?

Do you want your Redis master stopping to accept writes when it is no longer able to replicate to its slaves?

Yes. This is required for a CP system with failover. If you don't do it, your system can and will lose data. You cannot achieve consistency in the face of a partition without sacrificing availability. If you want Redis to be AP, then don't destroy the data on the old primaries by demoting them. Preserve conflicts and surface them to the clients for merging.

You could do this as an application developer by setting every Redis node to be a primary, and writing a proxy layer which uses, say, consistent hashing and active anti-entropy to replicate writes between nodes. Take a look at Antirez's own experiments in this direction. If you want a CP system, you could follow Datomic's model and use immutable shared-structure values in Redis, combined with, say, Zookeeper for mutable state.

Why topology matters

Antirez recommends a different approach to placing Sentinels than I used in my Redis experiments:

… place your Sentinels and set your quorum so that you are defensive enough against partitions. This way the system will activate only when the issue is really the master node down, not a network problem. Fear data loss and partitions? Have 10 Linux boxes? Put a Sentinel in every box and set quorum to 8.

I… can't parse this statement in a way that makes sense. Adding more boxes to a distributed system doesn't reduce the probability of partitions–and more to the point, trying to determine the state of a distributed system from outside the system itself is fundamentally flawed.

I mentioned that having the nodes which determine the cluster state (the Sentinels) be separate from the nodes which actually perform the replication (the Redis servers) can lead to worse kinds of partitions. I'd like to explain a little more, because I'm concerned that people might actually be doing this in production.

In this image, S stands for Sentinel, R stands for a Redis server, and C stands for Client. A box around an R indicates that node is a primary, and where it is able to replicate data to a secondary Redis server, an arrow is shown on that path. Lines show open network connections, and the jagged border shows a network partition.

Sentinels separate from clients and servers

Let's say we place our sentinels on 3 nodes to observe a three-node cluster. In the left-hand scenario, the majority of Sentinels are isolated, with two servers, from the clients. They promote node 2 to be a new primary, and it begins replicating to node 3. Node 1, however, is still a primary. Clients will continue writing to node 1, even though a.) its durability guarantees are greatly diminished–if it dies, all writes will be lost, and b.) the node doesn't have a quorum, so it cannot safely accept writes. When the partition resolves, the Sentinels will demote node 1 to a secondary and replace its data with the copy from N2, effectively destroying all writes during the partition.

On the right-hand side, a fully connected group of Sentinels can only see one Redis node. It's not safe to promote that node, because it doesn't have a majority and servers won't demote themselves when isolated, but the sentinels do it anyway. This scenario could be safely available to clients because a majority is present, but Redis Sentinel happily creates a split-brain and obliterates the data on the first node at some later time.

Sentinels with clients

If you take Antirez' advice and colocate the sentinels with your clients, we can still get in to awful states. On the left, an uneven partition between clients and servers means we elect a minority Redis server as the primary, even though it can't replicate to any other nodes. The majority component of the servers can still accept writes, but they're doomed: when the clients are able to see those nodes again, they'll wipe out all the writes that took place on those 2 nodes.

On the right, we've got the same partition topology I demonstrated in the Redis post. Same deal: split brain means conflicting writes and throwing away data.

If you encounter intermittent or rolling partitions (which can happen in the event of congestion and network failover), shifting quorums coupled with the inability of servers to reason about their own cluster state could yield horrifying consequences, like every node being a primary at the same time. You might be able to destroy not only writes that took place during the partition, but all data ever written–not sure if the replication protocol allows this or if every node just shuts down.

Bottom line: if you're building a distributed system, you must measure connectivity in the distributed system itself, not by what you can see from the outside. Like we saw with MongoDB and Riak, it's not the wall-clock state that matters–it's the logical messages in the system. The further you get from those messages, the wider your windows for data loss.

It's not just Sentinel

I assert that any system which uses asynchronous primary-secondary replication, and can change which node is the primary, is inconsistent. Why? If you write an operation to the primary, and then failover occurs before the operation is replicated to the node which is about to become the new primary, the new primary won't have that operation. If your replication strategy is to make secondaries look like the current primary, the system isn't just inconsistent, but can actually destroy acknowledged operations.

Here's a formal model of a simple system which maintains a log of operations. At any stage, one of three things can happen: we can write an operation to the primary, replicate the log of the primary to the secondary, or fail over:

------------------------------ MODULE failover ------------------------------ EXTENDS Naturals, Sequences, TLC CONSTANT Ops \* N1 and N2 are the list of writes made against each node VARIABLES n1, n2 \* The list of writes acknowledged to the client VARIABLE acks \* The current primary node VARIABLE primary \* The types we allow variables to take on TypeInvariant == /\ primary \in {1, 2} /\ n1 \in Seq(Ops) /\ n2 \in Seq(Ops) /\ acks \in Seq(Ops) \* An operation is acknowledged if it has an index somewhere in acks. IsAcked(op) == \E i \in DOMAIN acks : acks[i] = op \* The system is *consistent* if every acknowledged operation appears, \* in order, in the current primary's oplog: Consistency == acks = SelectSeq((IF primary = 1 THEN n1 ELSE n2), IsAcked) \* We'll say the system is *potentially consistent* if at least one node \* has a superset of our acknowledged writes in order. PotentialConsistency == \/ acks = SelectSeq(n1, IsAcked) \/ acks = SelectSeq(n2, IsAcked) \* To start out, all oplogs are empty, and the primary is n1. Init == /\ primary = 1 /\ n1 = <<>> /\ n2 = <<>> /\ acks = <<>> \* A client can send an operation to the primary. The write is immediately \* stored on the primary and acknowledged to the client. Write(op) == IF primary = 1 THEN /\ n1' = Append(n1, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n2, primary>> ELSE /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED <<n1, primary>> \* For clarity, we'll have the client issues unique writes WriteSomething == \E op \in Ops : ~IsAcked(op) /\ Write(op) \* The primary can *replicate* its state by forcing another node \* into conformance with its oplog Replicate == IF primary = 1 THEN /\ n2' = n1 /\ UNCHANGED <<n1, acks, primary>> ELSE /\ n1' = n2 /\ UNCHANGED <<n2, acks, primary>> \* Or we can failover to a new primary. Failover == /\ IF primary = 1 THEN primary' = 2 ELSE primary = 1 /\ UNCHANGED <<n1, n2, acks>> \* At each step, we allow the system to either write, replicate, or fail over Next == \/ WriteSomething \/ Replicate \/ Failover

This is written in the TLA+ language for describing algorithms, which encodes a good subset of ZF axiomatic set theory with first-order logic and the Temporal Law of Actions. We can explore this specification with the TLC model checker, which takes our initial state and evolves it by executing every possible state transition until it hits an error:

Invariant Consistency is violated.

This protocol is inconsistent. The fields in red show the state changes during each transition: in the third step, the primary is n2, but n2's oplog is empty, instead of containing the list <<2>>. In fact, this model fails the PotentiallyConsistent invariant shortly thereafter, if replication or a write occurs. We can also test for the total loss of writes; it fails that invariant too.

That doesn't mean primary-secondary failover systems must be inconsistent. You just have to ensure that writes are replicated before they're acknowledged:

\* We can recover consistency by making the write protocol synchronous SyncWrite(op) == /\ n1' = Append(n1, op) /\ n2' = Append(n2, op) /\ acks' = Append(acks, op) /\ UNCHANGED primary \* This new state transition satisfies both consistency constraints SyncNext == \/ \E op \in Ops : SyncWrite(op) \/ Replicate \/ Failover

And in fact, we don't have to replicate to all nodes before ack to achieve consistency–we can get away with only writing to a quorum, if we're willing to use a more complex protocol like Paxos.

The important bit

So you skimmed the proof; big deal, right? The important thing that it doesn't matter how you actually decide to do the failover: Sentinel, Mongo's gossip protocol, Heartbeat, Corosync, Byzantine Paxos, or a human being flipping the switch. Redis Sentinel happens to be more complicated than it needs to be, and it leaves much larger windows for write loss than it has to, but even if it were perfect the underlying Redis replication model is fundamentally inconsistent. We saw the same problem in MongoDB when we wrote with less than WriteConcern.MAJORITY. This affects asynchronous replication in MySQL and Postgres. It affects DRBD (yeaaaahhh, this can happen to your filesystem). If you use any of this software, you are building an asynchronous distributed system, and there are eventualities that have to be acknowledged.

Look guys, there's nothing new here. This is an old proof and many mature software projects (for instance, DRBD or RabbitMQ) explain the inconsistency and data-loss consequences of a partition in their documentation. However, not everyone knows. In fact, a good number of people seem shocked.

Why is this? I think it might be because software engineering is a really permeable field. You can start out learning Rails, and in two years wind up running four distributed databases by accident. Not everyone chose or could afford formal education, or was lucky enough to have a curmudgeonly mentor, or happened to read the right academic papers or find the right blogs. Now they might be using Redis as a lock server, or storing financial information in MongoDB. Is this dangerous? I honestly don't know. Depends on how they're using the system.

I don't view this so much as an engineering problem as a cultural one. Knives still come with sharp ends. Instruments are still hard for beginners to play. Not everything can or should be perfectly safe–or accessible. But I think we should warn people about what can happen, up front.

Tangentially: like many cultures, much of our collective understanding about what is desirable or achievable in distributed systems is driven by advertising. Yeah, MongoDB. That means you. ;-)

Bottom line

I don't mean to be a downer about all this. Inconsistency and even full-out data loss aren't the end of the world. Asynchronous replication is a good deal faster, both in bulk throughput and client latencies. I just think we lose sight, occasionally, of what that means for our production systems. My goal in writing Jepsen has been to push folks to consider their consistency properties carefully, and to explain them clearly to others. I think that'll help us all build safer systems. :)

Previously on Jepsen, we explored two-phase commit in Postgres. In this post, we demonstrate Redis losing 56% of writes during a partition.

Redis is a fantastic data structure server, typically deployed as a shared heap. It provides fast access to strings, lists, sets, maps, and other structures with a simple text protocol. Since it runs on a single server, and that server is single-threaded, it offers linearizable consistency by default: all operations happen in a single, well-defined order. There’s also support for basic transactions, which are atomic and isolated from one another.

Because of this easy-to-understand consistency model, many users treat Redis as a message queue, lock service, session store, or even their primary database. Redis running on a single server is a CP system, so it is consistent for these purposes.

What about high availability?

-026.jpg

Redis offers asynchronous primary->secondary replication. A single server is chosen as the primary, which can accept writes. It relays its state changes to secondary servers, which follow along. Asynchronous means that you don’t have to wait for a write to be replicated before the primary returns a response to the client. Writes will eventually arrive on the secondaries, if we wait long enough. In our application, all 5 clients will read from the primary on n1, and n2–n5 will be secondaries.

This is still a CP system, so long as we never read from the secondaries. If you do read from the secondaries, it’s possible to read stale data. That’s just fine for something like a cache! However, if you read data from a secondary, then write it to the primary, you could inadvertently destroy writes which completed but weren’t yet replicated to the secondaries.

What happens if the primary fails? We need to promote one of the secondary servers to a new primary. One option is to use Heartbeat or a STONITH system which keeps a link open between two servers, but if the network partitions we don’t have any way to tell whether the other side is alive or not. If we don’t promote the primary, there could be no active servers. If we do promote the primary, there could be two active servers. We need more nodes.

If one connected component of the network contains a majority (more than N/2) of nodes, we call it a quorum. We’re guaranteed that at most one quorum exists at any point in time–so if a majority of nodes can see each other, they know that they’re the only component in that state. That group of nodes (also termed a “component”) has the authority to promote a new primary.

Redis has a system called Sentinel, which, when configured correctly, will try to establish a quorum between Sentinel nodes, agree on which Redis servers are alive, and promote any which appear to have failed. If we colocate the Sentinel nodes with the Redis nodes, this should allow us to promote a new primary in the majority component (should one exist).

-029.jpg
-030.jpg

What are the consistency and availability properties of Sentinel? Antirez, the author of Redis, says:

Redis Cluster for instance is a system biased towards consistency rather than availability. Redis Sentinel itself is an HA solution with the dogma of consistency and master slave setups.“

So we expect this system to be CP. Nodes in the minority component will become unavailable during the partition, and the majority component will elect a new primary. The Sentinels will then order clients to abandon the old primary and reconnect to the new one.

-031.jpg

Before we begin, it’s important to recognize that Redis does not guarantee durability. Since writes to disk and replication to secondaries are asynchronous, we can lose up to N seconds of the most recent writes. We should not, however, see gaps in the write log. If write n is present, so are writes 0, 1, … n-2, n-1.

Partitioning the cluster

Here’s a simple application which writes a list of numbers to a Redis set. At this time Carmine, the Clojure Redis client, doesn’t yet support failover using Sentinel. I’ve implemented a stricter version of the Sentinel client algorithm here: asking the server for a new primary before every write. Sentinel actually states that clients should only select new primaries when their connection is closed, which leaves a wider window for clients to disagree about which primary to use–leading to the possibility of more conflicting writes.

Let’s give it a shot. First, set up Redis:

salticid redis.setup

Then, in two terminals, start up Redis and Redis Sentinel:

salticid redis.startsalticid redis.sentinel

You should see messages go by as the sentinels discover one another and ensure all the nodes are properly configured. You can check the replication status with salticid redis.replication. salticid redis.stop will shut down the Redis servers and sentinels alike.

Now let’s run our application with lein run redis, then partition nodes n1 and n2 away from n3, n4, and n5 by running salticid jepsen.partition.

376 :ok 378 :ok 382 :ok 384 :ok 380 :ok 381 :ok 383 :ok 389 :ok 385 :ok

The first thing you’ll notice is that even though n1 can’t possibly be replicating its writes to n3, n4, and n5, writes against it are still completing successfully. N1 still thinks it’s the primary, and since replication is asynchronous, it’s acknowledging writes before they’re sent to others in the cluster. The sentinels notice the failure, and n3, n4, and n5’s sentinels promote a new primary:

19 May 00:37:36.314 # +sdown master mymaster 10.10.3.242 6379 19 May 00:37:36.616 # +sdown slave 10.10.3.52:6379 10.10.3.52 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.52:26379 10.10.3.52 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.242:26379 10.10.3.242 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:37.521 # +odown master mymaster 10.10.3.242 6379 #quorum 3/3 19 May 00:37:48.041 # +failover-detected master mymaster 10.10.3.242 6379 19 May 00:37:48.142 * +slave-reconf-inprog slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:48.143 * +slave-reconf-inprog slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.243 # +failover-end master mymaster 10.10.3.242 6379

Now n5 is a new primary–but n1 is still a primary too! Run salticid redis.replication to see the replication status of all nodes. We have two primary nodes, one in each component of the system. During this time both primaries are accepting writes independently. This is a classic split-brain scenario–and it violates the C in CP. Writes (and reads) in this state are not linearizable, because clients will see different results based on which node they’re talking to.

-037.jpg
-038.jpg

Healing the partition

What happens when the network comes back online? salticid jepsen.heal repairs the partition, and the Sentinel nodes will discover each other again.

Redis Sentinel used to leave both primaries running indefinitely, which meant that any scenario like a partition or crash leading to failover would result in permanent split-brain. That’s fixed in version 2.6.13, which came out last week. Now, Sentinel demotes the old primary on n1 when it comes back into contact with the majority component. The client sees:

1687 :ok 1686 READONLY You can't write against a read only slave. 1690 READONLY You can't write against a read only slave. 1693 :ok

… since n1 stepped down just after a Sentinel told us it was a primary. Clients are a part of the distributed system too. If a system’s correctness depends on clients choosing specific nodes at specific times, the clients are now engaged in a distributed consensus problem–not to mention a clock synchronization problem. This is damn hard to do correctly.

Results

1991 :ok 1995 :ok 1996 :ok Hit enter when ready to collect results. Writes completed in 42.002 seconds 2000 total 1998 acknowledged 872 survivors 1126 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 50 51 52 53 54 55 ... 1671 1675 1676 1680 1681 1685 0.999 ack rate 0.5635636 loss rate 0.0 unacknowledged but successful rate

Out of 2000 writes, Redis claimed that 1998 of them completed successfully. However, only 872 of those integers were present in the final set. Redis threw away 56% of the writes it told us succeeded.

-041.jpg

There are two problems at work here. First, notice that all the clients lost writes at the beginning of the partition: (50, 51, 52, 53, …). That’s because they were all writing to n1 when the network dropped–and since n1 was demoted later, any writes made during that window were destroyed.

The second problem was caused by split-brain: both n1 and n5 were primaries up until the partition healed. Depending on which node they were talking to, some clients might have their writes survive, and others have their writes lost. The last few numbers in the set, mod 5, are all 0 and 1: the clients which kept using n1 as a primary, in the minority partition.

Note that both of these failure modes violate the durability guarantees we claimed earlier for Redis, because there are gaps in the write log.

Redis strategies

So you’re running a distributed Redis install, and have realized that the design of Redis Sentinel (or, for that matter, any other failover system on top of an asynchronously replicated primary-secondary design) means you can lose a lot of data when a partition occurs. What can you do?

From an operations perspective, I recommend you try to understand the Sentinel consensus algorithm. I don’t, and I’ve read it a dozen times.

I tried to write a formal verification of the algorithm in TLA+, and failed. There are dozens of interacting rules which can lead to phenomenally complex edge cases. The whole thing relies on clocks–and a special mode, TILT, which tries to detect sudden clock skew. You can specify a quorum which is smaller than the number of sentinels, allowing multiple quorums to operate simultaneously. Since the system auto-discovers peers, you’ve got to make sure nobody lets a new sentinel talk to your cluster, or you might find yourself with a quorum smaller than N/2. Client, sentinel, and Redis server topologies are all different things, which (I think) means…

  • Sentinels could promote a node no clients can see
  • Sentinels could demote the only node clients can actually reach
  • Sentinels could assume a totally connected group of servers is unavailable
  • Sentinels could promote an isolated node in a minority component, then destroy data on the majority by demoting their primary later

I (tentatively) recommend running exactly one sentinel on each server node, to force server and sentinel network topologies to align. Unless the partition doesn’t happen in the network, but somewhere upwards of layer 3. Let’s not talk about that possibility.

As an application developer working with Redis, one option is simply to estimate and accept your data loss. Not all applications have to be consistent. Microsoft estimates their WAN links have about 99.5% availability, and their datacenter networks are about 10x more reliable, going down for about 4 hours per year. Not all network failures result in this kind of partition. If you’re running good network hardware in redundant configurations in real datacenters (e.g. not EC2), you cut your probability of partition down pretty far. Plenty of important applications can tolerate data loss for a few hours a year.

If you can’t tolerate data loss, Redis Sentinel (and by extension Redis Cluster) is not safe for use as:

  • A lock service
  • A queue
  • A database

If you use Redis as a lock service, this type of partition means you can take out the same lock twice–or up to N times for N nodes! Or maybe multiple times concurrently, against the same node, if you want to get weird about it. Write loss means locks can be resurrected from the dead, or vanish even when supposedly held. Bottom line: distributed lock services must be CP. Use a CP consensus system, like Zookeeper.

If you use Redis as a queue, it can drop enqueued items. However, it can also re-enqueue items which were removed. An item might be delivered zero, one, two, or more times. Most distributed queue services can provide reliable at-most-once or at-least-once delivery. CP queue systems can provide reliable exactly-once delivery with higher latency costs. Use them if message delivery is important.

If you use Redis as a database, be prepared for clients to disagree about the state of the system. Batch operations will still be atomic (I think), but you’ll have no inter-write linearizability, which almost all applications implicitly rely on. If you successfully write A, then B, you expect that any client which can see B can also see A. This is not the case. Be prepared for massive write loss during a partition, depending on client, server, and sentinel topology.

Because Redis does not have a consensus protocol for writes, it can’t be CP. Because it relies on quorums to promote secondaries, it can’t be AP. What it can be is fast, and that’s an excellent property for a weakly consistent best-effort service, like a cache. Redis Sentinel can do a great job of keeping your caches warm even in the face of network and node failure, and helping clients to gradually discover the correct nodes to interact with. Use Redis Sentinel for caching, sampling, statistics, and messaging where getting the wrong answer doesn’t hurt much. Occasional windows of 50% write loss may be just fine for your user feeds, hit counters, or upvotes.

In the next post, we’ll learn about a database with a related replication architecture: MongoDB.

Copyright © 2015 Kyle Kingsbury.
Non-commercial re-use with attribution encouraged; all other rights reserved.
Comments are the property of respective posters.