Jepsen is a library for writing tests of concurrent systems: everything from single-node data structures to distributed databases and queues. A key part of this process is recording a history of operations performed during the test. Jepsen checkers analyze a history to find consistency anomalies and to compute performance metrics. Traditionally Jepsen has stored the history in a Clojure vector (an immutable in-memory data structure like an array), and serialized it to disk at the end of the test. This limited Jepsen to histories on the order of tens of millions of operations. It also meant that if Jepsen crashed during a several-hour test run, it was impossible to recover any of the history for analysis. Finally, saving and loading large tests involved long wait times—sometimes upwards of ten minutes.

Over the last year I’ve been working on ways to resolve these problems. Generators are up to ten times faster, and can emit up to 50,000 operations per second. A new operation datatype makes each operation smaller and faster to access. Jepsen’s new on-disk format allows us to stream histories incrementally to disk, to work with histories of up to a billion operations far exceeding available memory, to recover safely from crashes, and to load tests almost instantly by deserializing data lazily. New history datatypes support both densely and sparsely indexed histories, and efficiently cache auxiliary indices. They also support lazy disk-backed map and filter. These histories support both linear and concurrent folds, which dramatically improves checker performance on multicore systems: real-world checkers can readily analyze 250,000 to 900,000 operations/sec. Histories support multi-query optimization: when multiple threads fold over the same history, a query planner automatically fuses those folds together to perform them in a single pass. Since Jepsen often folds dozens of times over the same history, this saves a good deal of disk IO and deserialization time. These features are enabled by a new, transactional, dependency-aware task executor.

Background

Jepsen histories are basically a list of operations in time order. The meaning of an operation depends on the system being tested, but an operation could be something like “set x to three”, “read y and z as 2 and 7, respectively”, or “add 64 to the queue”. Each of these logical operations has two operation elements in the history: an invocation when it begins, and a completion when it ends. This tells checkers which operations were performed concurrently, and which took place one after the next. For convenience, we use “operation” to refer both to a single entry in the history, and to an invocation/completion pair—it’s usually clear from context.

For example, here’s a history of transactions from an etcd test:

[{:type :invoke, :f :txn, :value [[:w 2 1]], :time 3291485317, :process 0, :index 0}
 {:type :invoke, :f :txn, :value [[:r 0 nil] [:w 1 1] [:r 2 nil] [:w 1 2]], :time 3296209422, :process 2, :index 1}
 {:type :fail, :f :txn, :value [[:r 0 nil] [:w 1 1] [:r 2 nil] [:w 1 2]], :time 3565403674, :process 2, :index 2, :error [:duplicate-key "etcdserver: duplicate key given in txn request"]}
 {:type :ok, :f :txn, :value [[:w 2 1]], :time 3767733708, :process 0, :index 3}]

This shows two concurrent transactions executed by process 0 and 2 respectively. Process 0’s transaction attempted to write key 2 = 1 and succeeded, but process 2’s transaction tried to read 0, write key 1 = 1, read 2, then write key 1 = 2, and failed.

Each operation is logically a map with several predefined keys. The :type field determines whether an operation is an invocation (:invoke) or a successful (:ok), failed (:fail), or indeterminate (:info) completion. The :f and :value fields indicate what the operation did—their interpretation is up to the test in question. We use :process to track which logical thread performed the operation. :index is a strictly-monotonic unique ID for every operation. In dense histories, indices go 0, 1, 2, …. FIltering or limiting a history to a subset of those operations yields a sparse history, with IDs like 3, 7, 8, 20, …. Operations can also include all kinds of additional fields at the test’s discretion.

During a test Jepsen decides what operation to invoke next by asking a generator: a functional data structure that evolves as operations are performed. Generators include a rich library of combinators that can build up complex sequences of operations, including sequencing, alternation, mapping, filtering, limiting, temporal delays, concurrency control, and selective nondeterminism.

Generator Performance

One of the major obstacles to high-throughput testing with Jepsen was that the generator system simply couldn’t produce enough operations to keep up with the database under test. Much of this time was spent in manipulating the generator context: a data structure that keeps track of which threads are free and which logical processes are being executed by which threads. Contexts are often checked or restricted—for instance, a generator might want to pick a random free process in order to emit an operation, or to filter the set of threads in a context to a specific subset for various sub-generators: two threads making writes, three more performing reads, and so on.

Profiling revealed context operations as a major bottleneck in generator performance—specifically, manipulating the Clojure maps and sets we initially used to represent context data. In Jepsen 0.3.0 I introduced a new jepsen.generator.context namespace with high-performance data structures for common context operations. I wrote a custom translation table to map thread identifiers (e.g. 1, 2, … :nemesis) to and from integers. This let us encode context threads using Java BitSets, which are significantly more compact than hashmap-backed sets and require no pointer chasing. To restrict threads to a specific subset, we precompile a filter BitSet, which turns expensive set intersection into a blazing-fast bitwise and. This bought us an order-of-magnitude performance improvement in a realistic generator performance test—pushing up to 26,000 ops/second through a 1024-thread generator.

Based on YourKit profiling I made additional performance tweaks. Jepsen now memoizes function arity checking when functions are used as generators. A few type hints helped avoid reflection in hot paths. Additional micro-optimizations bought further speedups. The function which fills in operation types, times, and processes spent a lot of time manipulating persistent maps, which we avoided by carefully tracking which fields had been read from the underlying map. We replaced expensive mapcat with a transducer, replaced seq with count to optimize empty-collection checks, and replaced a few = with identical? for symbols.

In testing of list-append generators (a reasonably complex generator type) these improvements let us push 50,000 operations per second with 100 concurrent threads.

Off With Her Head

Another challenge in long-running Jepsen tests was a linear increase in memory consumption. This was caused by the test map, which is passed around ubiquitously and contains a reference to the initial generator that the test started with. Since generators are a lot like infinite sequences, “retaining the head” of the generator often resulted in keeping a chain of references to every single operation ever generated in memory.

Because the test map is available during every phase of the test, and could be accessed by all kinds of uncontrolled code, we needed a way to enforce that the generator wouldn’t be retained accidentally. Jepsen now wraps the generator in a special Forgettable reference type that can be explicitly forgotten. Forgetting that reference once the test starts running allows the garbage collector to free old generator states. Tests of 109 operations can now run in just a few hundred megabytes of heap.

Operations as Records

We initially represented operations as Clojure maps, which use arrays and hash tables internally. These maps require extra storage space, can only store JVM Objects, and require a small-but-noticeable lookup cost. Since looking up fields is so frequent, and memory efficiency important, we replaced them with Clojure records. Our new Op datatype stores its index and time as primitive longs, and the type, process, f, and value as Object fields. This is a significantly denser layout in memory, which also aids cache locality. Access to fields still looks like a map lookup (e.g. (:index some-op)), but compiles into a type cache check and a read of that field in the Op object. Much faster!

Clojure records work very much like maps—they store a reference to a hashmap, so test can still store any number of extra fields on an Op. This preserves compatibility with existing Jepsen tests. The API is slightly different: index and time fields are no longer optional. Jepsen always generates these fields during tests, so this change mainly affects the internal test suites for Jepsen checkers.

Streaming Disk Storage

Jepsen initially saved tests, including both histories and the results of analysis, as a single map serialized with Fressian. This worked fine for small tests, but writing large histories could take tens of minutes. Jepsen now uses a custom disk format. Jepsen files consist of a short header followed by an append-only log of immutable blocks. Each block contains a type, length, and checksum header, followed by data. These blocks form a persistent tree structure on disk: changes are written by appending new blocks to the end of the file, then flipping a single offset in the file header to point to the block that encodes the root of the tree.

Blocks can store arbitrary Fressian data, layered, lazily-decoded maps, streaming lists, and chunked vectors which store pointers to other vectors “stitched together” into a single large vector. Chunked vectors store the index of the first element in each chunk, so you can jump to the correct chunk for any index in constant time.

During a test Jepsen writes each operation into a series of chunks on disk—each a streaming list of Fressian-encoded data. When a chunk is full (by default, 16384 operations), Jepsen seals that chunk, writes its checksum, and constructs a new chunked vector block with pointers to each chunk in the history—re-using existing chunks. It then writes a new root block and flips the root pointer at the start of the file. This ensures that at most the last 16384 operations can be lost during a crash.

When Jepsen loads a test from disk it reads the header, decodes the root block, finds the base of the test map, and constructs a lazy map which decodes results and the history only when asked for. Results are also encoded as lazy maps, so tests with large results can still be decoded efficiently. This lets Jepsen read the high-level properties of tests—their names, timestamps, overall validity, etc., in a few milliseconds rather than tens of minutes. It also preserves compatibility: tests loaded from disk look and feel like regular hashmaps. Metadata allows those tests to be altered and saved back to disk efficiently, so you can re-analyze them later.

Jepsen loads a test’s history using a lazy chunked vector. The top-level vector keeps a cached count and lookup table to efficiently map indices to chunks. Each chunk is lazily decoded on first read and cached in a JVM SoftReference, which can be reclaimed under memory pressure. The resulting vector looks to users just like an in-memory vector, which ensures compatibility with extant Jepsen checkers.

History Datatypes

When working with histories checkers need to perform some operations over and over again. For instance, they often need a count of results, to look up an operation by its :index field, to jump between invocations and completions, and to schedule concurrent tasks for analysis. We therefore augment our lazy vectors of operations with an IHistory type which supports these operations. Our histories still act like a vector of operations, but also provide efficient, cached mapping of invocations to completions, a threadpool for scheduling analysis tasks, and a stateful execution planner for folds over the history, which we call a folder.

When Jepsen records a history it assigns each operation a sequential index 0, 1, 2, 3, …. We call these histories dense. This makes it straightforward to look up an operation by its index: (nth history idx). However, users might also filter a history to just some operations, or excerpt a slice of a larger history for detailed analysis. This results in a sparse history. Our sparse histories maintain a lazily-computed mapping of operation indices to vector indices to efficiently support this.

We also provide lazy, history-specific versions of map and filter. Checkers perform these operations on histories over and over again, but using standard Clojure map and filter would be inappropriate: once realized, the entire mapped/filtered sequence is stored in memory indefinitely. They also require linear-time lookups for nth. Our versions push down the map and filter operations into each access, trading off cachability for reduced memory consumption. This pushdown also enables multiple queries to share the same map or filter function. We also provide constant-time nth on mapped histories, and log-time nth on filtered ones.

Concurrent Folds

Because Jepsen’s disk files are divided into multiple chunks we can efficiently parallelize folds (reductions) over histories. Jepsen histories provide their own folding facility, similar to (and, indeed, directly compatible with) Tesser, a library I wrote for composing concurrent folds. Tesser is intended for commutative folds, but almost all of its operators—map, filter, group-by, fuse, facet, etc.—work properly in ordered reductions too. History folds can perform a concurrent reduction over each chunk independently, and then combine the chunks together in a linear reduction. Since modern CPUs can have dozens or even hundreds of cores, this yields a significant speedup. Since each core reads its chunk from the file in a linear scan, it’s also friendly to disk and memory readahead prediction. If this sounds like MapReduce on a single box, you’re right.

A diagram showing how concurrent folds proceed in independent reductions over each chunk, then combined in a second linear pass.

To support this access pattern each history comes with an associated Folder: a stateful object that schedules, executes, and returns results from folds. Performing a linear fold over a history is easy: the folder schedules a reduce of the first chunk of the history, then use that result as the input to a reduce of the second chunk, and so on. A concurrent fold schedules reductions over every chunk concurrently, and a series of combine tasks which fold the results of each chunk reduction in serial.

Folders are careful about thread locality and memory barriers during inter-thread handoff, which means folds can use mutable in-memory structures for their accumulators, not just immutable values. The reducing and combining functions don’t need to acquire locks of their own, and most access is completely unsynchronized. This works particularly well with the new mutable reducers in dom-top.

Folders and histories directly support the interfaces for plain-old Clojure reduce and reducers/fold, so existing checkers immediately take advantage of folder optimizations. They’re translated to folds over the history internally. Transducers work just like you’d expect.

Multi-Query Optimization

Jepsen tests often involve dozens of different checkers, each of which might perform many folds. When a history is larger than memory, some chunks are evicted from cache to make room for new ones. This means that a second fold might end up re-parsing chunks of history from disk that a previous fold already parsed–and forgot due to GC pressure. Running many folds at the same time also reduces locality: each fold likely reads an operation and follows pointers to other objects at different times and on different cores. This burns extra time in memory accesses and evicts other objects from the CPU’s cachelines. It would be much more efficient if we could do many folds in a single pass.

How should this coordination be accomplished? In theory we could have a pure fold composition system and ask checkers to return the folds they intended to perform, along with some sort of continuation to evaluate when results were ready—but this requires rewriting every extant checker and breaking the intuitive path of a checker’s control flow. It also plays terribly with internal concurrency in a checker. Instead, each history’s folder provides a stateful coordination service for folds. Checkers simply ask the folder to fold over the history, and the folder secretly fuses all those folds into as few passes as possible.

When a second fold is run, the folder computes a fused fold and tries to do as much work as possible in a single pass.

Imagine one thread starts a fold (orange) that looks for G1a (aborted read). That fold finishes reducing chunk 0 of the history and is presently reducing chunks 3 and 5, when another thread starts a fold to compute latency statistics (blue). The folder realizes it has an ongoing concurrent fold. It builds a fused fold (gray) which performs both G1a and latency processing in a single step. It then schedules latency reductions for chunks 0, 3, and 5, to catch up with the G1a fold. Chunks 1, 2, 4, and 6+ haven’t been touched yet, so the folder cancels their reductions. In their place it spawns new fused reduction tasks for those chunks, and new combine tasks for their results. It automatically weaves together the independent and fused stages so that they re-use as much completed work as possible. Most of the history is processed in a single pass, and once completed, the folder delivers results from both folds to their respective callers. All of this is done transactionally, ensuring exactly-once processing and thread safety even with unsynchronized mutable accumulators. A simpler join process allows the folder to fuse together linear folds.

The end result of this byzantine plumbing is that callers don’t have to think about how the underlying pass is executed. They just ask to fold over the history, and the folder figures out an efficient plan for executing it. This might involve a few independent passes over the first few chunks, but those chunks are likely retained in cache, which cuts down on deserialization. The rest of the history is processed in a single pass. We get high throughput, reduced IO, and cache locality.

Task Scheduling

To take advantage of multiple cores efficiently, we want checkers to run their analyses in multiple threads—but not too many, lest we impose heavy context-switching costs. In order to ensure thread safety and exactly-once processing during multi-query optimization, we need to ensure that fold tasks are cancelled and created atomically. We also want to ensure that tasks which block on the results of other tasks don’t launch until their dependencies are complete. This would lead to threadpool starvation and in some cases deadlock: a classic problem with Java threadpools.

To solve these problems Jepsen’s history library includes a transactional, dependency-aware task scheduler. Tasks are functions which receive their dependencies’ results as arguments, and can only start when their dependencies are complete. This prevents deadlock. Tasks are cancellable, which also cancels their dependencies. An exception in a task propagates to downstream dependencies, which simplifies error handling. Dereferencing a task returns its result, or throws if an exception occurred. These capabilities are backed by an immutable executor state which tracks the dependency graph, which tasks are ready and presently running, and the next task ID. A miniature effect system applies a log of “side effects” from changes to the immutable state. This approach allows us to support arbitrary transactions over the task executor: one can create and cancel any number of tasks in an atomic step.

This is powered by questionable misuses of Java’s ExecutorService & BlockingQueue: our “queue” is simply the immutable state’s dependency graph. We ignore the ExecutorService’s attempts to add things to the queue and instead mutate the state through our transaction channels. We wake up the executor with trivial runnables when tasks become available. This feels evil, but it does work: executor safety is validated by an extensive series of generative tests which stress both linear and concurrent behavior.

Compatibility

The most significant API change was replacing plain vectors of operations with a first-class IHistory type. Users who constructed their own histories and fed them to built-in checkers—for instance, by calling regular Clojure map or filter on a history—received a blunt warning of type incompatibility. The fix here is straightforward: either use jepsen.history/map and friends, or jepsen.history/history to wrap an existing collection in a new IHistory. These wrappers are designed to work both for the clean histories Jepsen produces as well as the incomplete test histories involved in Jepsen’s internal test suites.

The other significant change was making operation :time and :index mandatory. These are always provided by Jepsen, but might be missing from hand-coded histories used in internal test suites. Operations are automatically promoted to hashmaps by the history wrapper function, and indices can either be validated or automatically assigned for testing.

Otherwise the transition was remarkably successful. Existing code could continue to treat operations as hashmaps, to pretend histories were vectors, and to use standard Clojure collection operations as before. These all worked transparently, despite now being backed by lazily cached on-disk structures. I incrementally rewrote most of the core checkers in Jepsen and Elle to take advantage of the new Op type and fold system, and obtained significant speedups throughout the process.

Dropping :generator from the test map is a recent change. I don’t think that’s going to cause problems; I’m not aware of anything that actually read the generator state other than debugging code, but you never know.

Performance

With Jepsen 0.2.7 an etcd list-append transactional test on a five-node LXC cluster might perform ~7,000 ops/sec (counting both invocations and completions). At this point the bottleneck is etcd, not Jepsen itself. With a 16 GB heap, that test would exhaust memory and crash after roughly two hours. Given 90 minutes, it generated a single Fressian object of 2.2 GB containing 38,076,478 operations; this file was larger than MAX_INT bytes and caused Jepsen to crash.

When limited to an hour that test squeaked in under both heap and serialization limits: 25,461,508 operations (7.07 kHz) in a 1.5 GB Jepsen file. Analyzing that in-memory history for G1a, G1b, and internal consistency took 202 seconds (126 kHz). This timeseries plot of heap use and garbage collections shows the linear growth in memory use required to retain the entire history and generator state in memory.

A timeseries plot of heap use and garbage collections. Memory use rises linearly over the hour-long test run, followed by a spike in heap use and intense collection pressure during the final analysis phase.

The resulting Jepsen file took 366 seconds and a 58 GB heap to load the history—even just to read a single operation. There are various reasons for this, including the larger memory footprint of persistent maps and vectors, temporarily retained intermediate representations from the Fressian decoder, pointers and object header overhead, Fressian’s more-compact encodings of primitive types, and its caching of repeated values.

With Jepsen 0.3.3 that same hour-long test consumed only a few hundred megabytes of heap until the analysis phase, where it took full advantage of the 16 GB heap to cache most of the history and avoid parsing chunks multiple times. It produced a history of 40,358,966 elements (11.2 kHz) in a 1.9 GB Jepsen file. The analysis completed in 97 seconds (416 kHz). Even though the analysis phase has to re-parse the entire history from disk, it does so in parallel, which significantly speeds up the process.

A timeseries plot of heap use and garbage collections. Memory use is essentially constant over the hour-long test run, oldgen rising slightly and then falling during a few collections. Memory spikes at the end of the test, when we re-load the history and analyze it.

Given ~25.4 hours that same test suite recorded a billion (1,000,000,000) operations at roughly 11 kHz, producing a 108 GB Jepsen file. Checking that history for G1a, G1b, and internal anomalies took 1 hour and 6 minutes (249 kHz) with a 100 GB heap, and consumed 70-98% of 48 cores. Note that our heap needed to be much larger. G1a and G1b are global anomalies and require computing all failed and intermediate writes in an initial pass over the history, then retaining those index structures throughout a second pass. Under memory pressure, this analysis requires more frequent reloading of history chunks that have been evicted from cache. Simpler folds, like computing totals for ok, failed, and crashed operations, can execute at upwards of 900 kHz.

Thanks to the new file format’s lazy structure, parsing the top-level test attributes (e.g. time, name, validity) took just 20-30 milliseconds. Fetching a single element of the history (regardless of position) took ~500 milliseconds: a few disk seeks to load the root, test, and history block, and then checksumming and parsing the appropriate chunk. The history block itself was ~715 KB, and consisted of pointers to 61,000 chunks of 16384 operations each.

Conclusion

Jepsen 0.3.3 is capable of generating 50,000 operations per second, tackling histories of up to a billion operations and checking close to a million operations per second. Of course, these numbers depend on what the generator, client, and test are doing! Going further may be tricky: many JVM collection APIs are limited to 231 - 1 elements. Many of the checkers in Jepsen have been rewritten to run as concurrent folds, and to use more memory-efficient structures. However, only a few analyses are completely local: many require O(n) memory and may be impossible to scale past a few million operations.

You can start using Jepsen 0.3.3 today. I hope it opens up new territory for distributed systems testers everywhere.

o.yakushev
o.yakushev on

Hey Kyle, just watched your London Clojurians talk, got a lot of inspiration and ideas from that one. Thank you for sharing your optimization results and pieces of the code that got you there!

I have a couple of questions if I may: 1. Have you tried clj-async-profiler? With the lower-level optimizations that you perform, you might potentially be running into the safepoint bias problem which all of the three profilers you mentioned suffer from. 2. Have you by chance looked at https://github.com/cnuernber/ham-fisted compared to Bifurcan? I’ve been planning to do the comparison/measurements myself at some point, but maybe this is something you already did and have an opinion on it. 3. Is there a chance you release your disk streaming implementation in one form or another? This sounds incredibly useful. Does anything like that exist outside of Clojure context, do you happen to know?

Aphyr on

I haven’t tried clj-async-profiler, but YourKit does async sampling as well. :-)

I haven’t seen ham-fisted yet. I have caught small bugs in Bifurcan, even with Zach’s extensive generative tests–getting these kinds of libraries completely correct is a big task. Ham-fisted says it’s “minimally tested”, which gives me pause for the sort of large-scale safety testing work I do.

The disk streaming system, like all of Jepsen, is OSS and EPL-licensed. You’ll find it here. It’s coupled to several decisions that make sense in Jepsen’s context but would, if extracted to its own library, probably need configurable parameters or additional safety checks. The use of Fressian, the way tests are a first-class datatype with metadata, the particular write and read patterns Jepsen uses during a test, jepsen.history’s soft chunked vectors, etc.

Aphyr

Post a Comment

Comments are moderated. Links have nofollow. Seriously, spammers, give it a rest.

Please avoid writing anything here unless you're a computer. This is also a trap:

Supports Github-flavored Markdown, including [links](http://foo.com/), *emphasis*, _underline_, `code`, and > blockquotes. Use ```clj on its own line to start an (e.g.) Clojure code block, and ``` to end the block.