Riemann 0.2.0 is ready. There's so much left that I want to build, but this release includes a ton of changes that should improve usability for everyone, and I'm excited to announce its release.

Version 0.2.0 is a fairly major improvement in Riemann's performance and capabilities. Many things have been solidified, expanded, or tuned, and there are a few completely new ideas as well. There are a few minor API changes, mostly to internal structure–but a few streams are involved as well. Most functions will continue to work normally, but log a deprecation notice when used.

I dedicated the past six months to working on Riemann full-time. I was fortunate to receive individual donations as well as formal contracts with Blue Mountain Capital, SevenScale, and Iovation during that time. That money gave me months of runway to help make these improvements–but even more valuable was the feedback I received from production users, big and small. I've used your complaints, frustrations, and ideas to plan Riemann's roadmap, and I hope this release reflects that.

This release includes contributions from a broad cohort of open-source developers, and I want to recognize everyone who volunteered their time and energy to make Riemann better. In particular, I'd like to call out Pierre-Yves Ritschard, lwf, Ben Black, Thomas Omans, Dave Cottlehuber, and, well, the list goes on and on. You rock.

These months have seen not only improvements to Riemann itself, but to the dashboard, clients, and integration packages. While I'm spending most of my time working on the core Riemann server, it's really this peripheral software that make Riemann useful for instrumenting production systems. There's no way I could hope to understand, let alone write and test the code to integrate with all these technologies–which makes your work particularly valuable.

This week I started my new job at Factual. I won't be able to work 10 hours each day on Riemann any more, but I'm really happy with what we've built together, and I'll definitely keep working on the next release.

To all Riemann's users and contributors, thank you. Here's to 0.2.0.

New features

  • Arbitrary key-value (string) pairs on events
  • Hot config reloading
  • Integrated nrepl server
  • streams/sdo: bind together multiple streams as one
  • streams/split: like (cond), dispatch an event to the first matching stream
  • streams/splitp: like split, but on the basis of a specific predicate
  • config/delete-from-index: explicitly remove (similar) events from the index
  • streams/top: streaming top-k
  • streams/tag: add tags to events
  • RPM packaging
  • Init scripts, proper log dirs, and users for debian and RPM packages. Yeah, this means you can /etc/init.d/riemann reload, and Stuff Just Works ™.
  • folds/difference, product, and quotient.
  • Folds come in sloppy and strict variants which should “Do What I Mean” in most contexts.
  • Executor Services for asynchronous queued processing of events.
  • streams/exception-stream: captures exceptions and converts them to events.

Improvements

  • http://riemann.io site
  • Lots more documentation and examples
  • Config file syntax errors are detected early
  • Cleaned up server logging
  • Helpful messages (line numbers! filenames!) for configuration errors
  • Silence closed channel exceptions
  • Cores can preserve services like pubsub, the index, etc through reloads
  • Massive speedups in TCP and UDP server throughput
  • streams/rate works in real-time: no need for fill-in any more
  • Graphite client is faster, more complete
  • Config files can include other files by relative path
  • streams/coalesce passes on expired events
  • riemann.email/mailer can take custom :subject and :body functions
  • riemann.config includes some common time/scheduling functions
  • streams/where returns whether it matched an event, which means (where) is now re-usable as a predicate in lots of different contexts.
  • streams/tagged-any and tagged-all return whether they matched
  • streams/counter is resettable to a particular metric, and supports expiry
  • Bring back “hyperspace core online”
  • Update to netty 3.6.1
  • Reduced the number of threadpools used by the servers
  • Massive speedup in Netty performance by re-organizing execution handlers
  • core/reaper takes a :keep-keys option to specify which fields on an event are preserved
  • streams/smap ignores nil values for better use with folds
  • Update to aleph 0.3.0-beta15
  • Config files ship with emacs modelines, too

Bugfixes

  • Fixed a bug in part-time-fast causing undercounting under high contention
  • Catch exceptions while processing expired events
  • Fix a bug escaping metric names for librato
  • riemann.email/mailer can talk to SMTP relays again
  • graphite-path-percentiles will convert decimals of three or more places to percentile strings
  • streams/rollup is much more efficient; doesn't leak tasks
  • streams/rollup aggregates and forwards expired events instead of stopping
  • Fixed a threadpool leak from Netty
  • streams/coalesce: fixed a bug involving lazy persistence of transients
  • streams/ddt: fixed a few edge cases

Internals

  • Cleaned up the test suite's logging
  • Pluggable transports for netty servers
  • Cores are immutable
  • Service protocol: provides lifecycle management for internal components
  • Tests for riemann.config
  • riemann.periodic is gone; replaced by riemann.time
  • Tried to clean up some duplicated functions between core, config, and streams
  • riemann.common/deprecated
  • Cleaned up riemann.streams, removing unused commented-out code
  • Lots of anonymous functions have names now, to help with profiling
  • Composing netty pipeline factories is much simpler
  • Clojure 1.5

Known bugs

  • Passing :host to websocket-server does nothing: it binds to * regardless.
  • Folds/mean throws when it receives empty lists
  • graphite-server has no tests
  • Riemann will happily overload browsers via websockets
  • streams/rate doesn't stop its internal poller correctly when self-expiring
  • When Netty runs out of filehandles, it'll hang new connections

The Netty redesign of riemann-java-client made it possible to expose an end-to-end asynchronous API for writes, which has a dramatic improvement on messages with a small number of events. By introducing a small queue of pipelined write promises, riemann-clojure-client can now push 65K events per second, as individual messages, over a single TCP socket. Works out to about 120 mbps of sustained traffic.

single-events.png

I'm really happy about the bulk throughput too: three threads using a single socket, sending messages of 100 events each, can push around 185-200K events/sec, at over 200 mbps. That throughput took 10 sockets and hundreds of threads to achieve in earlier tests.

bulk.png

This isn't a particularly useful feature as far as clients go; it's unlikely most users will want to push this much from a single client. It is critical, however, for optimizing Riemann's server performance. The server, running the bulk test, consumes about 115% CPU on my 2.5Ghz Q8300. I believe this puts a million events/sec within reach for production hardware, though at that throughput CAS contention in the streams may become a limiting factor. If I can find a box (and network) powerful enough to test, I'd love to give it a shot!

This is the last major improvement for Riemann 0.2.0. I'll be focusing on packaging and documentation tomorrow. :)

tl;dr Riemann is a monitoring system, so it emphasizes liveness over safety.

Riemann is aimed at high-throughput (millions of events/sec/node), partial-harvest event processing, where it is acceptable to trade completeness for throughput at low latencies. For instance, it's probably fine to drop half of your request latency events on the floor, if you're calculating a lossy histogram with sampling anyway. It's also typically acceptable to have nondeterministic behavior with respect to time windows: if one node's clock is skewed, it's better to process it “soonish” rather than waiting an unbounded amount of time for it to check in.

There is no synchronization or relationship between events. Events are immutable and have a total order, even though a given server or client may only have a fraction of the relevant events for a system. The events are, in a sense, the transaction log–except that the semantics of those transactions depend on the stream configuration.

Riemann is only trivially distributed: clients send events to servers. Servers can act as clients themselves. The protocol provides synchronous acknowledgement of each received event… which could mean “your write is durably stored on disk” or “I threw your write on a queue, good luck have fun”, or any mixture in between, like “I queued your write for use by a windowing stream, I queued it for submission to Librato metrics, and reacted to the failure condition by sending an email which has been acked by the mail system.”

All of these guarantees are present only for a single server. At some point Riemann will need to be available during partitions.

The “Fuck it, no coordination” model, which I have now, allows for degraded harvest and low latencies for data which it's OK to lose some of. A simple strategy is to carpetbomb every Riemann server in the cluster with your events with the usable tunable write-replica threshold. Each server might have a slightly different view of the world, depending on where it was partitioned and how long.

Stronger consistency

Some events (which happen infrequently) need strong coordination. We need to guarantee, for example, that of three Riemann servers responsible for this datacenter, exactly one sends the “hey, the web server's broken” email. These events require bounded guarantees of both liveness: “Someone must send an email in five seconds” and safety: “I don't care who but one of you better do it”.

I'm pretty sure these constraints on side effects essentially violate CAP, in the face of arbitrary partitions. If a node decides “I'll send it”, sends the email, then explodes just before telling the others “I sent it!”, the remaining nodes have no choice but to send a duplicate message.

In the event of these failure modes (like a total partition), duplicates are preferable to doing nothing. Waaay better to page someone twice than to risk not paging them at all.

However, there are some failure modes where I can provide delivered-once guarantees of side effects. For example, up to floor(n/2) node failures, or a partition which leaves a fully-connected quorum. In these circumstances, 2PC or Paxos can give me strong consistency guarantees, and I can detect (in many cases, I think) the failure modes which would result in sacrificing consistency and requiring a duplicate write. A Riemann server can call someone and say,

“Hey, I just paged you, and this is crazy, but I've got split brain, I'll call twice maybe.”

Since events are values, I can serialize and compare them. That means you might actually be able to write, in the streams config, an expression which means “attempt to ensure these events are processed on exactly one host in the cluster.”

(streams (where (state "critical") ; This is unsynchronized and proceeds on all nodes concurrently #(prn "Uh oh, this thing's broken!" %) (master ; Any events inside master are executed on exactly one node if ; quorum is preserved, or maybe multiple hosts if a node fails before ; acking. (email "aphyr@aphyr.com"))))

…which is most useful when clients can reach a majority of servers (and allows clients to guarantee whether or not their event was accepted.) I can also provide a weaker guarantee along the lines of “Try to prevent all connected peers from sending this event within this time window,” which is useful for scenarios where you want to know about errors which occurred in minority partitions and it's likely that clients will be partitioned with their servers; e.g. one Riemann per agg switch or DC.

This doesn't guarantee all nodes have the same picture of the world which led up to that failure. I think doing that would require full coordination between all nodes about the event stream (and its ordering), which would impose nontrivial synchronization costs. Explicit causal consistency could improve this, but we'd need a way to express and compute those causal relationships between arbitrary stream functions somehow.

Realistically, this may not be a problem. When Riemann sees a quorum loss it can wake someone up, and when the partition is resolved nodes will converge rapidly on “hey, that service still isn't checking in.”

A third path

What I don't know yet is whether there's a role for events which don't need the insane overhead of 2PC or paxos for every… single… event… but do need some kind of distributed consistency. HAT is interesting because it provides reasonably strong consistency guarantees for an AP system, but at the cost of liveness. Is that liveness tradeoff suitable for Riemann, where responding Right Now is critical? Probably not. But it might be useful for historical stores, or expressing distributed multi-event transactions–which currently don't exist. I don't even know what this would mean in an event-oriented context.

Why? Riemann's event model treats events as values. Well-behaved clients provide a total order and identity over events based on their host, service, and timestamps. This means reconstructing any linear subset of the event stream can be done in an eventually consistent way. if Riemann were to become a historical store, reconciling divergent histories would simply be the set union of all received events.

Except for derived events. What happens when a partition separates two Riemann servers measuring request throughput? Each receives half of the events it used to, and their rate streams start emitting events with a metric half as big as they used to. If both Riemann servers are logging these events to a historical store, the store will show only half the throughput it used to.

One option is to log only raw events and reconstruct derived events by replaying the merged event log. What was the rate at noon? Apply all the events from 11:55 to 12:00 to the rate stream and see.

Another option might be for rate streams themselves to be transactional in nature, but I'm not sure how to do that in a way which preserves liveness guarantees.

I've been doing a lot of performance tuning in Riemann recently, especially in the clients–but I'd like to share a particularly spectacular improvement from yesterday.

The Riemann protocol

Riemann's TCP protocol is really simple. Send a Msg to the server, receive a response Msg. Messages might include some new events for the server, or a query; and a response might include a boolean acknowledgement or a list of events matching the query. The protocol is ordered; messages on a connection are processed in-order and responses sent in-order. Each Message is serialized using Protocol Buffers. To figure out how large each message is, you read a four-byte length header, then read length bytes, and parse that as a Msg.

time ---> send: [length1][msg1] [length2][msg2] recv: [length1][msg1] [length2][msg2]

The optimization I discussed last time–pipelining requests–allows a client to send multiple messages before receiving their acknowledgements. There are many queues in between a client saying “send a message” and that message actually being parsed in Riemann: Java IO buffers, the kernel TCP stack, the network card, various pieces of networking hardware, the wires themselves… all act like queues. This means throughput is often limited by latency, so by writing messages asynchronously we can achieve higher throughput with only minor latency costs.

The other optimization I've been working on is batching. For various reasons, this kind of protocol performs better when messages are larger. If you can pack 100 events into a message, the server can buffer and parse it in one go, resulting in much higher throughputs at the cost of significantly higher latencies–especially if your event needs to sit in a buffer for a while, waiting for other events to show up so they can be sent in a Msg.

Netty's threadpools

For any given connection, Netty (as used in Riemann) has two threadpools handling incoming bytes: the IO worker pool, and a handler pool which actually handles Riemann events. The IO worker pool is busy shuttling bytes back and forth from the TCP connection buffers through the pipeline–but if an IO worker spends too much time on a single channel, it won't be able to handle other channels and latencies will rise. An ExecutionHandler takes over at some point in the pipeline, which uses the handler pool to do long-running work like handling a Msg.

Earlier versions of Riemann put the ExecutionHandler very close to the end of the pipeline, because all the early operations in the pipeline are really fast. The common advice goes, “Wrap long-running tasks in an execution handler, so they don't block”. OK, makes sense.

(channel-pipeline-factory int32-frame-decoder (int32-frame-decoder) ; Read off 32-bit length headers ^:shared int32-frame-encoder (int32-frame-encoder) ; Add length header on the way out ^:shared protobuf-decoder (protobuf-decoder) ; Decode bytes to a Msg ^:shared protobuf-encoder (protobuf-encoder) ; Encode a Msg to bytes ^:shared msg-decoder (msg-decoder) ; Convert Msg to a record ^:shared msg-encoder (msg-encoder) ; Convert a record to a Msg ^:shared executor (execution-handler) ; Switch to handler threadpool ^:shared handler (gen-tcp-handler ; Actually process the Msg core channel-group tcp-handler))

Now… a motivated or prescient reader might ask, “How, exactly, does the execution handler get data from an IO thread over to a handler thread?”

It puts it on a queue. Like every good queue it's bounded–but not by number of items, since some items could be way bigger than others. It's bounded by memory.

(defn execution-handler "Creates a new netty execution handler." [] (ExecutionHandler. (OrderedMemoryAwareThreadPoolExecutor. 16 ; Core pool size 1048576 ; 1MB per channel queued 10485760 ; 10MB total queued )))

How does the Executor know how much memory is in a given item? It uses a DefaultObjectSizeEstimator, which knows all about Bytes and Channels and Buffers… but absolutely nothing about the decoded Protobuf objects which it's being asked to enqueue. So the estimator goes and digs into the item's fields using reflection:

int answer = 8; // Basic overhead. for (Class<?> c = clazz; c != null; c = c.getSuperclass()) { Field[] fields = c.getDeclaredFields(); for (Field f : fields) { if ((f.getModifiers() & Modifier.STATIC) != 0) { // Ignore static fields. continue; } answer += estimateSize(f.getType(), visitedClasses);

Of course, I didn't know this at the time. Netty is pretty big, and despite extensive documentation it's not necessarily clear that an OrderedMemoryAwareThreadPoolExecutor is going to try and guess how much memory is in a given object, recursively.

So I'm staring at Yourkit, completely ignorant of everything I've just explained, and wondering why the devil DefaultObjectSizeEstimator is taking 38% of Riemann's CPU time. It takes me ~15 hours of digging through Javadoc and source and blogs and StackOverflow to realize that all I have to do is…

  1. Build my own ObjectSizeEstimator, or
  2. Enqueue things I already know the size of.
(channel-pipeline-factory int32-frame-decoder (int32-frame-decoder) ^:shared int32-frame-encoder (int32-frame-encoder) ^:shared executor (execution-handler) ; <--+ ^:shared protobuf-decoder (protobuf-decoder) ; | ^:shared protobuf-encoder (protobuf-encoder) ; | ^:shared msg-decoder (msg-decoder) ; | ^:shared msg-encoder (msg-encoder) ; ___| ^:shared handler (gen-tcp-handler core channel-group tcp-handler))

Just move one line. Now I enqueue buffers with known sizes, instead of complex Protobuf objects. DefaultObjectSizeEstimator runs in constant time. Throughput doubles. Minimum latency drops by a factor of two.

drop tcp event batch throughput.png

drop tcp event batch latency.png

Throughput here is measured in messages, each containing 100 events, so master is processing 200,000–215,000 events/sec. Latency is for synchronous calls to client.sendEvents(anEvent). The dropoff at the tail end of the time series is the pipelining client draining its message queue. Client and server are running on the same quad-core Q8300, pushing about 20 megabytes/sec of traffic over loopback. Here's what the riemann-bench session looks like, if you're curious.

Why didn't you figure this out sooner?

I wrote most of this code, and what code I didn't write, I reviewed and tested. Why did it take me so long to figure out what was going on?

When I started working on this problem, the code looked nothing like the pipeline I showed you earlier.

The Netty pipeline evolved piecemeal, by trial-and-error, and went through several refactorings. The UDP server, TCP server, and Graphite server share much of the same code, but do very different things. I made several changes to improve performance. In making these changes I tried to minimize API disruption–to keep function interfaces the same–which gradually pulled the pipeline into several interacting pieces. Since Netty's API is well-written, flexible Java code, it comes with literally hundreds of names to keep track of. Keeping function and variable names distinct became a challenge.

By the point I started digging into the problem, I was hard pressed to figure out what a channel pipeline factory was, let alone how it was constructed.

In order to solve the bug I had to understand the code, which meant inventing a new language to talk about pipelines. Once I'd expressed the pipeline clearly, it was obvious how the pieces interacted. Experimenting with new pipelines took a half hour, and I was able to almost double throughput with a single-line change.

I've been putting more work into riemann-java-client recently, since it's definitely the bottleneck in performance testing Riemann itself. The existing RiemannTcpClient and RiemannRetryingTcpClient were threadsafe, but almost fully mutexed; using one essentially serialized all threads behind the client itself. For write-heavy workloads, I wanted to do better.

There are two logical optimizations I can make, in addition to choosing careful data structures, mucking with socket options, etc. The first is to bundle multiple events into a single Message, which the API supports. However, your code may not be structured in a way to efficiently bundle events, so where higher latencies are OK, the client can maintain a buffer of outbound events and flush it regularly.

The second optimization is to take advantage of request pipelining. Riemann's protocol is simple and synchronous: you send a Message over a TCP connection, and receive exactly one TCP message in response. The existing clients, however, forced you to wait n milliseconds for the message to cross the network, be processed by Riemann, and receive an acknowledgement. We can do better by pipelining requests: sending new requests before waiting for the previous responses, and matching up received messages with their corresponding requests later.

ThreadedClient does exactly that. All threads enqueue Messages into a lockfree queue, and receive Promise objects to be fulfilled when their response is available. The standard synchronous API is still available, and allows N threads to pipeline their requests together. Meanwhile, a writer thread sucks messages out of the write queue and sends them to Riemann, enqueuing written messages onto an in-flight queue. A reader thread pulls responses out of the socket and matches them to enqueued messages. Bounded queues provide backpressure, which limits the number of requests that can be in-flight at any time. This allows for reasonable bounds on event loss in the event of failure.

Here's what the naive client (wait for round-trip requests) looks like on loopback:

throughput-tcp.png

And here's the same test with a RiemannThreadedClient:

throughput-threaded.png

I've done no tuning or optimization to this algorithm, and error handling is rough at best. It should perform best across real-world networks where latency is nontrivial. Even on loopback, though, I'm seeing roughly double the throughput at the cost of roughly double per-event latency.

In the just-released riemann-java-client 0.0.6, riemann-clojure-client 0.0.6, riemann-ruby-client 0.0.8, and the upcoming riemann 0.1.4 (presently in master), Riemann will support two new types of metrics for events.

https://github.com/aphyr/riemann-java-client/blob/master/src/main/proto/riemann/proto.proto#L24

# signed 64-bit integers (variable-width-encoded) optional sint64 metric_sint64 = 13; # double-precision IEEE 754 floats optional double metric_d = 14;

Events still have only a single logical metric; this change simply allows the protocol to represent a broader range of numbers. Clients should prefer metric_sint64 for all integer values from -263 to 263 - 1, and prefer metric_d for double-precision floats. Clients should also write metric_f wherever possible and fall back to reading metric_f, for compatibility with older versions of Riemann.

Here's how the Ruby client does it. Note that Ruby's Float is actually a double.

def metric metric_d || metric_sint64 || metric_f end def metric=(m) if Integer === m and (-(2**63)...2**63) === m # Long self.metric_sint64 = m self.metric_f = m.to_f else self.metric_d = m.to_f self.metric_f = m.to_f end end

Javascript has only one numeric type: double-precision IEE754 floats (sort of). Riemann uses Cheshire and will emit JSON numbers which may exceed the scale or resolution of a JS VM, e.g.:

{"metric":12345678901234567890,"time":"1970-01-01T00:00:01.000Z"}

… which in V8 will be parsed as the number 12345678901234567000. Larger values may become +/- Infinity when parsed. Rationals are serialized as decimals.

Since most JS clients are engaged in visualization, loss of precision seems preferable to emitting JSON strings (e.g. 123 -> “123”) and forcing clients to use arbitrary-precision bignum libraries with custom operators. I'd like your feedback. :)

Ready? Grab the tarball or deb from http://aphyr.github.com/riemann/

0.1.3 is a consolidation release, comprising 2812 insertions and 1425 deletions. It includes numerous bugfixes, performance improvements, features–especially integration with third-party tools–and clearer code. This release includes the work of dozens of contributors over the past few months, who pointed out bugs, cleaned up documentation, smoothed over rough spots in the codebase, and added whole new features. I can't say thank you enough, to everyone who sent me pull requests, talked through designs, or just asked for help. You guys rock!

I also want to say thanks to Boundary, Blue Mountain Capital, Librato, and Netflix for contributing code, time, money, and design discussions to this release. You've done me a great kindness.

Bugfixes

  • streams.tagged-all and tagged-any can take single strings now, not just vectors of tags to match.
  • bin/riemann scripts exec java now, instead of launching in a subprocess.
  • Servers bind to 127.0.0.1 by default, instead of (possibly) ipv6 localhost only.
  • Fixed the use of the obsolete :metric_f in the default package configs.
  • Thoroughly commented and restructured the default configs to address common points of confusion.
  • Deb packages will not overwrite /etc/riemann/riemann.config by default, but consider it a conffile.

Major features

  • Librato metrics adapter is now built in.
  • riemann.graphite includes a graphite server which can accept events sent via the graphite protocol.
  • Scheduled tasks are more accurate and consume fewer threads. Riemann's clock can switch between wall-clock and virtual modes, which allows for much faster, more reliable tests.
  • Unified stream window API provides fixed and moving windows over time and events.
  • riemann.time: controllable centralized clocks and task schedulers.
  • riemann.pool: a threadsafe fixed-size bounded-latency regenerating resource pool.

New streams

  • where*: like where, but takes a function instead of an expression.
  • smap: streaming map.
  • sreduce: streaming reduce.
  • fold-interval: reduce over time periods.
  • fixed-time-window: stream which passes on a set of events every n seconds.
  • fixed-event-window: pass on sets of every n disjoint events.
  • moving-time-window: pass on the last n seconds of events.
  • moving-event-window: pass on the last n events.

Enhancements

  • (where) can take an (else) clause for streams which are called when the expression does not match a given event.
  • Converted useless multimethods to regular methods.
  • TCP and UDP servers are roughly 15% faster.
  • New Match protocol for matching predicates like functions, values, and regexes. Used in (where) and (match).
  • streams/match is simpler and more powerful, thanks to Match.
  • Numerous concurrency improvements.
  • Pagerduty adapter is included in config by default.
  • Graphite adapter includes a connection pool, reconnects properly, bounded latency.
  • Email formatting shows more state information in the body.
  • Indexes are seqable.
  • Travis-CI support.
  • Unified protocol buffer parsing paths.
  • Clearer, faster tests, especially in streams.
  • New tasks for packaging under lein2.

Experimental

  • riemann.deps provides an experimental dependency resolver. API subject to change. If you're working with dependent services in Riemann, I'd like your feedback.

What's next for Riemann?

We have quite a few new features in riemann-tools master, so that release should be coming up shortly. The dashboard is in a poor state right now, halfway between old-and-nextgen interfaces: I need to reach feature parity with the old UI and finish styling, then make a release of riemann-dash. I'm also going to rework the web site to be friendlier to beginners, and add a howto section with cookbook-style directions for solving specific problems in Riemann.

In Riemann itself, I have plans to improve Netty performance, and I want to write some Serious Benchmarks to explore concurrency tuning. After that I plan to tackle a Big Project: either persistent indexes or high availability. Those two features will comprise 0.1.4 and 0.1.5.

If you're interested in funding any of this work, please let me know. :)

For the last three years Riemann (and its predecessors) has been a side project: I sketched designs, wrote code, tested features, and supported the community through nights and weekends. I was lucky to have supportive employers which allowed me to write new features for Riemann as we needed them. And yet, I've fallen behind.

Dozens of people have asked for sensible, achievable Riemann improvements that would help them monitor their systems, and I have a long list of my own. In the next year or two I'd like to build:

  • Protocol enhancements: high-resolution times, groups, pubsub, UDP drop-rate estimation
  • Expanding the websockets dashboard
  • Maintain index state through restarts
  • Expanded documentation
  • Configuration reloading
  • SQL-backed indexes for faster querying and synchronizing state between multiple Riemann servers
  • High-availability Riemann clusters using Zookeeper
  • Some kind of historical data store, and a query interface for it
  • Improve throughput by an order of magnitude

I believe that Riemann has great potential to help others understand and monitor their infrastructure–and as an open-source author I can think of no higher goal than to make this possible. I'm going to work full-time on Riemann for the foreseeable future.

I live in San Francisco. My burn rate (rent, utilities, insurance, and food) is roughly $2800 per month. I made 110K for some time and saved well, so I'm extending myself a $5000 gift: to do what I love, and what I think is important. I'll work on Riemann every day, and get as far as I can. I'll write new documentation and review the rapidly expanding body of pull requests from great contributors like Pyr, Banjiewen, and Perezd. I'll provide full-time support on Freenode (#riemann) and on the mailing list, and meet with users to figure out how I can help them best.

If Riemann has been of value to you or your team, and you'd like to support the project, you can help in three ways:

  1. Volunteer. I need your feature requests, your howto guides, your failing tests, your bugfixes and features. I'll do my best to give every one of them my full consideration.

  2. Employ me. I've been honored to receive some really cool job offers in the past few days, but I also plan to take my time. I want to work with intelligent, creative, and down-to-earth people, in high-level languages, and devote at least half of my time to working on Riemann. If you think your team might be a good fit, and you want direct influence over the project's direction, please get in touch.

  3. Donate money. With funding, I can work on Riemann for longer. If you just want to say “thanks”, that's great. If you need a particular capability, want to build a new visualization for your dash, or would like help integrating Riemann into your stack, you can earmark your donation and I'll devote my full attention to your goal. I can work closely with your team, either in-person or remote. I'm happy to sign any NDAs required, so long as functionality that would help everyone is published in the open-source Riemann projects. Either way, I'll thank you as a sponsor in the Riemann documentation and web site.

To give employers a reasonable timetable for hiring me, I'll only accept a few weeks of earmarked donations at a time, or agree to refund a prorated amount if I accept a job offer. If there's significant interest in funding Riemann independently, I'll block off longer stretches of time.

Let me know what you think: aphyr@aphyr.com.

As a quick follow-up, I managed to squeeze an extra 10% or so out of riemann.server by adding a few type hints.

drop tcp events latency.png

drop tcp events throughput.png

I've been focusing on Riemann client libraries and optimizations recently, both at Boundary and on my own time.

Boundary uses the JVM extensively, and takes advantage of Coda Hale's Metrics. For our applications I've written a Riemann Java UDP and TCP client, which also includes a Metrics reporter. The Metrics reporter (I'll be submitting that to metrics-contrib later) will just send periodic events for each of the metrics in a registry, and optionally some VM statistics as well. It can prefix each service, filter with predicates, and has been reporting for two of our production systems for about a week now.

The Java client has been integrated into Riemann itself, replacing the old Aleph client. It's about on par with the old Aleph client, owing to its use of standard Socket and friends as opposed to Netty. MÃ¥rten Gustafson and Edward Ribeiro have been instrumental in getting the Java client up and running, so my sincere thanks go out to both of them.

I also removed the last traces of Aleph from riemann.server, replacing the TCP server with a pure Netty implementation. I also replaced Gloss with Netty-provided length header parsers, which cuts down on copying somewhat. Here's the performance of a single-threaded localhost client which sends an event and receives a OK response:

Aleph Raw Netty
drop tcp events latency.png drop tcp events latency 2.png
drop tcp events throughput.png drop tcp events throughput 2.png

Steady-state throughput with raw Netty is about 2.5 times faster. Median and 95% latency is significantly decreased, though occasional 20ms spikes are still present (I presume due to GC). Please keep in mind these graphs can only be compared with each other; they depend significantly on the hardware and JVM. This also does not represent concurrent performance—I'm trying to optimize the simplest system first before moving up. With that in mind, Riemann's real-world performance with these changes should be “much faster”.

Next up I'll be replacing clojure-protobuf with direct use of the Java protobuf classes; as I'm copying data into a standard Map anyway it should be slightly faster and consolidate codepaths between server and client. I'll also begin type-hinting key sections of the server and message parser to reduce use of reflection.

The initial stable release of Riemann 0.1.0 is available for download. This is the culmination of the 0.0.3 development path and 2 months of production use at Showyou.

Is it production ready? I think so. The fundamental stream operators are in place. A comprehensive test suite checks out. Riemann has never crashed. Its performance characteristics should be suitable for a broad range of scales and applications.

There is a possible memory leak, on the order of 1% per day in our production setup. I can't replicate it under a variety of stress tests. It's not clear to me whether this is legitimate state information (i.e. an increase in tracked data), GC/malloc implementations being greedy, or an actual memory leak. Profiling and understanding this is my top priority for Riemann. If this happens to you, restarting the daemon every few weeks should not be prohibitive; it takes about five seconds to reload. Should you encounter this issue, please drop me a line with your configuration; it may help me identify the cause.

The Riemann talk tonight at Boundary is sold out, but I may deliver another in the next month or so. Thanks for your interest, suggestions, and patches. I hope you enjoy Riemann. :)

When I designed UState, I had a goal of a thousand state transitions per second. I hit about six hundred on my Macbook Pro, and skirted 1000/s on real hardware. Eventmachine is good, but I started to bump up against concurrency limits in MRI's interpreter lock, my ability to generate and exchange SQL with SQLite, and protobuf parse times. So I set out to write a faster server. I chose Clojure for its expressiveness and powerful model of concurrent state–and more importantly, the JVM, which gets me Netty, a mature virtual machine with a decent thread model, and a wealth of fast libraries for parsing, state, and statistics. That project is called Riemann.

Today, I'm pleased to announce that Riemann crossed the 10,000 event/second mark in production. In fact it's skirting 11k in my stress tests. (That final drop in throughput is an artifact of the graph system showing partially-complete data.)

throughput.png

cpu.png

By the way, we push about 200 events/sec through a single Riemann server from all of Showyou's infrastructure. There's a lot of headroom.

I did the dumbest, easiest things possible. No profiling. A heavy abstraction (aleph) on top of netty. I haven't even turned on warn-on-reflection or provided type hints yet. All operations are over synchronous TCP. This benchmark measures Riemann's ability to thread events through a complex set of streams including dozens of (where) filters and updating the index with every received event.

10k.png

I'm in the final stages of packaging Riemann for initial public release this week. Boundary has also kindly volunteered their space for a tech talk on Riemann: Thursday, March 1st, at Boundary's offices, likely at 7 PM. I'll post a Meetup link here and on Twitter shortly.

Copyright © 2015 Kyle Kingsbury.
Non-commercial re-use with attribution encouraged; all other rights reserved.
Comments are the property of respective posters.