Skip to content

ChangeStreamService

CE Change Data Capture: tail oplog segments via gRPC server-side streaming. Single-consumer per shard. Client reconnects with a ResumeToken to continue from last acked position. At-least-once delivery. EE extensions (NATS/Kafka sinks, CEL pattern matching, cross-shard merge-sort, exactly-once delivery, multiple concurrent consumers) are in R400.

Methods

Subscribe

Open a long-lived server-side streaming RPC that tails oplog segments. The server streams ChangeEvent messages as new entries are sealed. When caught up, the server polls every ~100ms for new sealed segments. The stream stays open until the client cancels or the connection drops. On reconnect, pass the last received ResumeToken to resume without gaps. Omit resume_token (or set all fields to zero) to start from the oldest available segment.

Transport: Server-streaming gRPC only (no HTTP transcoding)

Request: SubscribeRequest

FieldTypeDescription
resume_tokenResumeTokenToken from the last received ChangeEvent.position. Omit (all-zero) to start from the oldest available segment.
filtersCdcFiltersOptional filter to reduce bandwidth.

Response: stream ChangeEvent

FieldTypeDescription
tsuint64HLC timestamp of the entry (upper 46 bits = wall-clock ms, lower 18 = logical).
termuint64Raft term when this entry was proposed.
log_indexuint64Raft log index. Monotonically increasing per shard.
shard_iduint32Shard that produced this entry.
is_migrationboolTrue if this entry was written during a shard migration (e.g. bulk data transfer). Most CDC consumers should skip migration entries.
opsChangeOp[]The mutations carried by this log entry.
positionResumeTokenClient should persist this token and pass it on reconnect.

Types

CdcFilters

Filters applied server-side before streaming events to the client.

FieldTypeDescription
edge_typesstring[]If non-empty, only deliver events that touch at least one adjacency or edge-property key whose edge type matches one of these strings. Example: ["FOLLOWS", "LIKES"] Leave empty to receive all edge types.
is_migrationboolWhen set: only deliver events whose is_migration flag equals this value. When absent (HasField returns false): deliver both migration and non-migration events.

ChangeEvent

A single committed oplog entry delivered to the CDC consumer.

FieldTypeDescription
tsuint64HLC timestamp of the entry (upper 46 bits = wall-clock ms, lower 18 = logical).
termuint64Raft term when this entry was proposed.
log_indexuint64Raft log index. Monotonically increasing per shard.
shard_iduint32Shard that produced this entry.
is_migrationboolTrue if this entry was written during a shard migration (e.g. bulk data transfer). Most CDC consumers should skip migration entries.
opsChangeOp[]The mutations carried by this log entry.
positionResumeTokenClient should persist this token and pass it on reconnect.

ChangeOp

A single storage mutation within a ChangeEvent.

FieldTypeDescription
typeChangeOpType
partitionuint32Storage partition (see coordinode_storage::engine::partition::Partition).
keybytesStorage key. Format depends on partition type.
valuebytesValue bytes. Empty for DELETE ops.

ResumeToken

Position within the oplog used to resume after disconnect.

FieldTypeDescription
shard_iduint32Shard this stream targets. Always 0 in CE single-shard mode.
segment_iduint64Identifies the segment file: equals the first_index embedded in the filename (oplog-<segment_id:020>.bin). A value of 0 means "start of oldest available segment".
entry_offsetuint64Number of entries from this segment already consumed. The next event delivered will be at offset entry_offset (0-based).

SubscribeRequest

FieldTypeDescription
resume_tokenResumeTokenToken from the last received ChangeEvent.position. Omit (all-zero) to start from the oldest available segment.
filtersCdcFiltersOptional filter to reduce bandwidth.

Enums

ChangeOpType

ValueNumberDescription
CHANGE_OP_TYPE_UNSPECIFIED0
INSERT1
DELETE2
MERGE3
NOOP4
RAFT_ENTRY5
RAFT_TRUNCATION6