ChangeStreamService
CE Change Data Capture: tail oplog segments via gRPC server-side streaming. Single-consumer per shard. Client reconnects with a ResumeToken to continue from last acked position. At-least-once delivery. EE extensions (NATS/Kafka sinks, CEL pattern matching, cross-shard merge-sort, exactly-once delivery, multiple concurrent consumers) are in R400.
Proto source
Methods
Subscribe
Open a long-lived server-side streaming RPC that tails oplog segments. The server streams ChangeEvent messages as new entries are sealed. When caught up, the server polls every ~100ms for new sealed segments. The stream stays open until the client cancels or the connection drops. On reconnect, pass the last received ResumeToken to resume without gaps. Omit resume_token (or set all fields to zero) to start from the oldest available segment.
Transport: Server-streaming gRPC only (no HTTP transcoding)
Request: SubscribeRequest
| Field | Type | Description |
|---|---|---|
resume_token | ResumeToken | Token from the last received ChangeEvent.position. Omit (all-zero) to start from the oldest available segment. |
filters | CdcFilters | Optional filter to reduce bandwidth. |
Response: stream ChangeEvent
| Field | Type | Description |
|---|---|---|
ts | uint64 | HLC timestamp of the entry (upper 46 bits = wall-clock ms, lower 18 = logical). |
term | uint64 | Raft term when this entry was proposed. |
log_index | uint64 | Raft log index. Monotonically increasing per shard. |
shard_id | uint32 | Shard that produced this entry. |
is_migration | bool | True if this entry was written during a shard migration (e.g. bulk data transfer). Most CDC consumers should skip migration entries. |
ops | ChangeOp[] | The mutations carried by this log entry. |
position | ResumeToken | Client should persist this token and pass it on reconnect. |
Types
CdcFilters
Filters applied server-side before streaming events to the client.
| Field | Type | Description |
|---|---|---|
edge_types | string[] | If non-empty, only deliver events that touch at least one adjacency or edge-property key whose edge type matches one of these strings. Example: ["FOLLOWS", "LIKES"] Leave empty to receive all edge types. |
is_migration | bool | When set: only deliver events whose is_migration flag equals this value. When absent (HasField returns false): deliver both migration and non-migration events. |
ChangeEvent
A single committed oplog entry delivered to the CDC consumer.
| Field | Type | Description |
|---|---|---|
ts | uint64 | HLC timestamp of the entry (upper 46 bits = wall-clock ms, lower 18 = logical). |
term | uint64 | Raft term when this entry was proposed. |
log_index | uint64 | Raft log index. Monotonically increasing per shard. |
shard_id | uint32 | Shard that produced this entry. |
is_migration | bool | True if this entry was written during a shard migration (e.g. bulk data transfer). Most CDC consumers should skip migration entries. |
ops | ChangeOp[] | The mutations carried by this log entry. |
position | ResumeToken | Client should persist this token and pass it on reconnect. |
ChangeOp
A single storage mutation within a ChangeEvent.
| Field | Type | Description |
|---|---|---|
type | ChangeOpType | — |
partition | uint32 | Storage partition (see coordinode_storage::engine::partition::Partition). |
key | bytes | Storage key. Format depends on partition type. |
value | bytes | Value bytes. Empty for DELETE ops. |
ResumeToken
Position within the oplog used to resume after disconnect.
| Field | Type | Description |
|---|---|---|
shard_id | uint32 | Shard this stream targets. Always 0 in CE single-shard mode. |
segment_id | uint64 | Identifies the segment file: equals the first_index embedded in the filename (oplog-<segment_id:020>.bin). A value of 0 means "start of oldest available segment". |
entry_offset | uint64 | Number of entries from this segment already consumed. The next event delivered will be at offset entry_offset (0-based). |
SubscribeRequest
| Field | Type | Description |
|---|---|---|
resume_token | ResumeToken | Token from the last received ChangeEvent.position. Omit (all-zero) to start from the oldest available segment. |
filters | CdcFilters | Optional filter to reduce bandwidth. |
Enums
ChangeOpType
| Value | Number | Description |
|---|---|---|
CHANGE_OP_TYPE_UNSPECIFIED | 0 | — |
INSERT | 1 | — |
DELETE | 2 | — |
MERGE | 3 | — |
NOOP | 4 | — |
RAFT_ENTRY | 5 | — |
RAFT_TRUNCATION | 6 | — |
