tl;dr Riemann is a monitoring system, so it emphasizes liveness over safety.

Riemann is aimed at high-throughput (millions of events/sec/node), partial-harvest event processing, where it is acceptable to trade completeness for throughput at low latencies. For instance, it’s probably fine to drop half of your request latency events on the floor, if you’re calculating a lossy histogram with sampling anyway. It’s also typically acceptable to have nondeterministic behavior with respect to time windows: if one node’s clock is skewed, it’s better to process it “soonish” rather than waiting an unbounded amount of time for it to check in.

There is no synchronization or relationship between events. Events are immutable and have a total order, even though a given server or client may only have a fraction of the relevant events for a system. The events are, in a sense, the transaction log–except that the semantics of those transactions depend on the stream configuration.

Riemann is only trivially distributed: clients send events to servers. Servers can act as clients themselves. The protocol provides synchronous acknowledgement of each received event… which could mean “your write is durably stored on disk” or “I threw your write on a queue, good luck have fun”, or any mixture in between, like “I queued your write for use by a windowing stream, I queued it for submission to Librato metrics, and reacted to the failure condition by sending an email which has been acked by the mail system.”

All of these guarantees are present only for a single server. At some point Riemann will need to be available during partitions.

The “Fuck it, no coordination” model, which I have now, allows for degraded harvest and low latencies for data which it’s OK to lose some of. A simple strategy is to carpetbomb every Riemann server in the cluster with your events with the usable tunable write-replica threshold. Each server might have a slightly different view of the world, depending on where it was partitioned and how long.

Stronger consistency

Some events (which happen infrequently) need strong coordination. We need to guarantee, for example, that of three Riemann servers responsible for this datacenter, exactly one sends the “hey, the web server’s broken” email. These events require bounded guarantees of both liveness: “Someone must send an email in five seconds” and safety: “I don’t care who but one of you better do it”.

I’m pretty sure these constraints on side effects essentially violate CAP, in the face of arbitrary partitions. If a node decides “I’ll send it”, sends the email, then explodes just before telling the others “I sent it!”, the remaining nodes have no choice but to send a duplicate message.

In the event of these failure modes (like a total partition), duplicates are preferable to doing nothing. Waaay better to page someone twice than to risk not paging them at all.

However, there are some failure modes where I can provide delivered-once guarantees of side effects. For example, up to floor(n/2) node failures, or a partition which leaves a fully-connected quorum. In these circumstances, 2PC or Paxos can give me strong consistency guarantees, and I can detect (in many cases, I think) the failure modes which would result in sacrificing consistency and requiring a duplicate write. A Riemann server can call someone and say,

“Hey, I just paged you, and this is crazy, but I’ve got split brain, I’ll call twice maybe.”

Since events are values, I can serialize and compare them. That means you might actually be able to write, in the streams config, an expression which means “attempt to ensure these events are processed on exactly one host in the cluster.”

  (where (state "critical")
    ; This is unsynchronized and proceeds on all nodes concurrently
    #(prn "Uh oh, this thing's broken!" %)

      ; Any events inside master are executed on exactly one node if 
      ; quorum is preserved, or maybe multiple hosts if a node fails before
      ; acking.
      (email ""))))

…which is most useful when clients can reach a majority of servers (and allows clients to guarantee whether or not their event was accepted.) I can also provide a weaker guarantee along the lines of “Try to prevent all connected peers from sending this event within this time window,” which is useful for scenarios where you want to know about errors which occurred in minority partitions and it’s likely that clients will be partitioned with their servers; e.g. one Riemann per agg switch or DC.

This doesn’t guarantee all nodes have the same picture of the world which led up to that failure. I think doing that would require full coordination between all nodes about the event stream (and its ordering), which would impose nontrivial synchronization costs. Explicit causal consistency could improve this, but we’d need a way to express and compute those causal relationships between arbitrary stream functions somehow.

Realistically, this may not be a problem. When Riemann sees a quorum loss it can wake someone up, and when the partition is resolved nodes will converge rapidly on “hey, that service still isn’t checking in.”

A third path

What I don’t know yet is whether there’s a role for events which don’t need the insane overhead of 2PC or paxos for every… single… event… but do need some kind of distributed consistency. HAT is interesting because it provides reasonably strong consistency guarantees for an AP system, but at the cost of liveness. Is that liveness tradeoff suitable for Riemann, where responding Right Now is critical? Probably not. But it might be useful for historical stores, or expressing distributed multi-event transactions–which currently don’t exist. I don’t even know what this would mean in an event-oriented context.

Why? Riemann’s event model treats events as values. Well-behaved clients provide a total order and identity over events based on their host, service, and timestamps. This means reconstructing any linear subset of the event stream can be done in an eventually consistent way. if Riemann were to become a historical store, reconciling divergent histories would simply be the set union of all received events.

Except for derived events. What happens when a partition separates two Riemann servers measuring request throughput? Each receives half of the events it used to, and their rate streams start emitting events with a metric half as big as they used to. If both Riemann servers are logging these events to a historical store, the store will show only half the throughput it used to.

One option is to log only raw events and reconstruct derived events by replaying the merged event log. What was the rate at noon? Apply all the events from 11:55 to 12:00 to the rate stream and see.

Another option might be for rate streams themselves to be transactional in nature, but I’m not sure how to do that in a way which preserves liveness guarantees.

Post a Comment

Comments are moderated. Links have nofollow. Seriously, spammers, give it a rest.

Please avoid writing anything here unless you're a computer. This is also a trap:

Supports Github-flavored Markdown, including [links](, *emphasis*, _underline_, `code`, and > blockquotes. Use ```clj on its own line to start an (e.g.) Clojure code block, and ``` to end the block.