Exactly-once Semantics in Gazette

Overview

Gazette provides exactly-once processing semantics over messages, sometimes also known as “effectively-once”. The formal guarantees that Gazette offer are that:

  • A message A will be processed in exactly one completed consumer transaction.
  • Any updates to the consumer’s stateful store which are derived from A will commit exactly once.
  • Any published messages B, derived from A, will be read exactly once by any downstream read-committed reader (including other consumers).

These guarantees also cascade through multiple consumers, providing an overall end-to-end assurance that the effects of a message will be committed just once regardless of how that message is transformed, decomposed, aggregated, re-combined, etc as it passes from consumer to consumer.

It’s important to note that Gazette cannot guarantee that the application’s ConsumeMessage or FinalizeTxn functions will be invoked exactly once for a given message. Also possible is that duplicate read-uncommitted messages B derived from A will be published, or that changes to the store from A may be staged multiple times. The precise guarantee Gazette makes is that exactly one of those transactions will go on to commit, and similarly that one message B will be read by a read-committed reader.

Message UUIDs

Journal appends in Gazette are at-least-once, meaning a particular message byte serialization may commit to a journal multiple times.

To account for this, messages in Gazette are sequenced using RFC 4122 v1 UUIDs, which are composed of:

  • A “node ID” which identifies the unique message source (also known within Gazette as a ProducerID).
  • A timestamp, with resolution to 100 nanoseconds.
  • A clock sequence, which provides further bits to distinguish UUIDs having the same node ID and timestamp. Gazette re-purposes some of these bits to use as flags.

Within Gazette, UUID timestamps and clock sequence bits are generated from a strictly monotonic Clock which ticks with each UUID generated. Every Publisher likewise draws a new and random ProducerID.

The combination of these properties allows a reader to efficiently de-duplicate messages by tracking the largest Clock seen for each ProducerID. Read messages having smaller clocks are presumed duplicates and can be discarded.

UUIDs are also used to represent transaction semantics via encoded flags. An application can author a set of messages which will be atomically applied or rolled-back, by publishing each “pending” message with flag CONTINUE_TXN, and applying them via an ACK_TXN message, which commits all pending messages having a smaller Clock (and rolls-back those having a larger one).

ReadUncommittedIter reads uncommitted messages from a journal, which may include duplicates and pending messages which have not yet committed, or may be rolled back.

Sequencer observes read-uncommitted messages from journals and sequences them into read-committed messages. For efficiency it uses a tunable ring-buffer of read messages so that, in most cases, messages can be directly read from the ring upon observing an ACK_TXN which commits them. In cases where the ring buffer is insufficient, Sequencer will re-read the relevant portion of the journal to deliver acknowledged messages. An advantage of the design is that no head-of-line blocking occurs: committed messages are immediately dequed upon observing their corresponding ACK_TXN, even if they’re interleaved with still-pending messages of other producers. Gazette is also able to guarantee that downstream consumers of a published-to journal will process the entire set of acknowledged messages within a single consumer transaction (this guarantee does not extend beyond a single journal, however).

ReadCommittedIter composes a ReadUncommittedIter with a Sequencer to read committed messages of a journal.

Gazette messages are arbitrary user-defined types, and journals themselves hold only raw user data. Gazette therefore asks that user Message types help with representation by taking, serializing, and when asked, returning UUIDs generated by Gazette. UUIDs may also be directly useful to users, as they’re universally unique and they encode a precise publishing timestamp. In some cases, user types may be unable to represent a UUID. In these cases, the Message interface can be implemented as no-ops to opt the type out of exactly-once processing, falling back to at-least-once semantics.

Consumer Shard Transaction Lifecycle

Each shard of a consumer Application processes messages in pipelined transactions. While each transaction runs:

  • At least one Message will be processed from a source journal.
  • Reads and modifications of a Store are made, scoped to a transaction provided by the store (eg, a RocksDB WriteBatch, or a SQL transaction).
  • A number of uncommitted messages may be published to downstream journals, identified by a Publisher & ProducerID which is unique to the shard assignment.

So far, no effects of the transaction are yet observable by a read-committed reader or from the Store itself. Eventually the consumer transaction will begin to commit (eg because further messages are not immediately available for processing).

At this point a consumer Checkpoint is assembled, which consists of:

  • Offsets of each source journal through which uncommitted messages have been read.
  • ProducerStates of each producer tracked by the Sequencer.
  • “Intents” of messages to be written, which acknowledge uncommitted messages published during the transaction (also known as “ACK intents”).

The checkpoint is added to and then commits with the store transaction. Only once the transaction commits (and never before), ACK intents of the checkpoint are written to inform downstream readers that pending messages have committed.

It’s possible that a fault may occur after transaction commit, but before ACK intents are written (or after they’re partially written). For this reason, on process assignment / shard startup, shards recover the most recent Checkpoint from their Store and immediately publish (or re-publish) its ACK intents. If a fault previously occurred after commit but before all intents were published, then this ensures delivery of those acknowledgements. If a fault instead occurred in the middle of a transaction, this rolls-back any pending messages of that abandoned transaction. ACK intents captured in checkpoints thus represent an atomic commit (or roll-back) of messages published in the course of building that checkpoint, even where those messages are large in number and span many journals, or in the presence of arbitrary faults.

Every Publisher uses a unique ProducerID, which must be independently tracked by every Sequencer. While the state for each producer is lightweight (just a byte offset and Clock), this can add up over time. Consumers employ pruning to age-out the state of a ProducerID

Store Fencing

Shards are assigned (and re-assigned) across many distributed consumer processes which may come, go, and fail arbitrarily. There may be races where a shard which is re-assigned from process Old to New may be processed by both simultaneously (here, Old is sometimes referred to as a “zombie”). During this race there’s a potential for:

  • Old to commit transaction T1, then
  • New to recover T1, then
  • Old to commit T2, write ACK intents, and exit.

This breaks exactly-once semantics: some derived messages may be written more than once, and messages may be applied to the store multiple times. To account for this, it must be the case that after New recovers T1, it’s no longer possible for Old to commit a transaction. In other words, New must place a write fence on startup.

A remote Store may leverage transactional features of a remote store to implement fences. For example, SQLStore increments a SQL fence on RestoreCheckpoint, causing a future commit of a zombie process to fail.

A local Store such as JSONFileStore or RocksDB – which make use of recovery logs – use journal “registers” to implement fencing across processes.

Recovery Logs and Register Fencing

Journal Registers are (very small) collections of arbitrary keys and values which are associated to a journal, and participate in that journal’s transactional append machinery. Append RPCs may check that registers match an expectation in order to proceed, and so long as they append at least one byte, they may update the registers of the journal.

When a shard process assignment becomes primary, it completes recovery log playback by injecting a “handoff” – which takes over sequencing of recorded operations in the log – and places an updated “author” register fence upon it. As it processes transactions, its Recorder to the log verifies the “author” fence it previously placed with each recorded operation.

When the next assigned process finishes playback and injects a hand-off, it fences off any further appends of this Recorder from applying to the journal, ensuring it can no longer commit a consumer transaction checkpoint.