Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ k2eg/__pycache__*
dist
k2eg.egg-info
k2eg/cli/__pycache__*
.claude
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
"python.testing.pytestEnabled": true,
"kafka.clusters.selected": "kafka9092-a2Fma2E6OTA5Mg"
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ services:
- ./tests/epics-test.db:/db/softioc.db

k2eg:
image: ghcr.io/slaclab/k2eg/ubuntu:latest
image: ghcr.io/slaclab/k2eg/ubuntu:v0.3.65
pull_policy: always
command:
- /bin/bash
Expand Down
19 changes: 11 additions & 8 deletions k2eg/dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ def __consumer_handler(self):
decoded_message.pop('error', None)
decoded_message.pop('reply_id', None)
decoded_message.pop('message-size', None)
# the message contains a snapshot value
if len(decoded_message) == 1:
pv_name, value = next(iter(decoded_message.items()))
if pv_name not in snapshot.results:
snapshot.results[pv_name] = []
snapshot.results[pv_name].append(value)
decoded_message.pop('msg_seq', None)
# Now the remaining key is the pv name
pv_name, value = next(iter(decoded_message.items()))
if pv_name not in snapshot.results:
snapshot.results[pv_name] = []
snapshot.results[pv_name].append(value)
else:
logger.debug(f"Snapshot {msg_id} compelted with error {decoded_message.get('error', 0)}")
# we got the completion message so
Expand Down Expand Up @@ -259,13 +259,16 @@ def __consumer_handler(self):
decoded_message.pop('iter_index', None)
decoded_message.pop('message_type', None)
decoded_message.pop('message-size', None)
decoded_message.pop('msg_seq', None)

# Now the remaining key is the pv name
if len(decoded_message) == 1:
pv_name, value = next(iter(decoded_message.items()))
pv_name, value = next(iter(decoded_message.items()))
if pv_name in snapshot.pv_list:
if pv_name not in snapshot.results:
snapshot.results[pv_name] = []
snapshot.results[pv_name].append(value)
else:
logger.warning(f"Received data for unexpected PV '{pv_name}' in snapshot {from_topic}")
#logger.debug(f"recurring snapshot {from_topic} data received [ state {snapshot.state}] messages {sum(len(v) for v in snapshot.results.values())} and iteration {snapshot.interation}")
else:
logger.debug(f"Ignoring data message from iteration {message_iteration}, current iteration is {snapshot.interation}")
Expand Down
134 changes: 134 additions & 0 deletions snapshot-sequencing-protocol.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Snapshot Sequencing Protocol — `msg_seq` / `total_messages`

## Context

k2eg repeating snapshots emit a chain of three message types per iteration:

```
Header (message_type=0) → Data (message_type=1) × N → Tail (message_type=2)
```

All three message types are published to the same topic (`snapshot_name`).
Previously there was no way for a client to know the order or count of messages
within an iteration. This update adds two sequence fields to solve that.

---

## New Fields

### `msg_seq` — present in Header, Data, and Tail

| Message type | Value |
|---|---|
| Header | Always `1` |
| Data | `2`, `3`, … (one per PV event, assigned atomically; order within Data messages is not guaranteed) |
| Tail | `N` where N = total number of messages in the iteration |

- Type: `uint64`
- Monotonically increasing within one iteration, starting at 1
- Resets to 1 for every new iteration (`iter_index`)

### `total_messages` — present in Tail only

- Type: `uint64`
- Equals the Tail's own `msg_seq`
- Tells the client exactly how many messages (Header + all Data + Tail) belong to
the current iteration
- Formula: `total_messages = 1 (Header) + number_of_pv_events (Data) + 1 (Tail)`

---

## Message Shapes (Msgpack map keys)

### Header (`message_type = 0`)
```
{
"message_type": 0,
"snapshot_name": <string>,
"timestamp": <int64, unix ns>,
"iter_index": <int64>,
"msg_seq": <uint64> ← NEW, always 1
}
```

### Data (`message_type = 1`)
```
{
"message_type": 1,
"timestamp": <int64, unix ns>,
"iter_index": <int64>,
"msg_seq": <uint64> ← NEW, >= 2
<pv_name>: <pv data>
}
```

### Tail (`message_type = 2`)
```
{
"message_type": 2,
"snapshot_name": <string>,
"timestamp": <int64, unix ns>,
"iter_index": <int64>,
"error": <int>,
"error_message": <string> (omitted when error == 0)
"msg_seq": <uint64> ← NEW, equals total_messages
"total_messages": <uint64> ← NEW, total count for this iteration
}
```

---

## How a Client Should Use These Fields

### Identifying iteration boundaries
- A message with `message_type=0` (`msg_seq=1`) marks the start of a new iteration.
- A message with `message_type=2` marks the end. At that point `total_messages` is known.

### Detecting missing messages
```
expected_count = total_messages # from Tail
received_count = number of messages seen with this iter_index
if received_count < expected_count:
# at least one message was lost
```

### Reordering out-of-order delivery
If the transport can reorder messages, buffer all messages for an `iter_index` until
the Tail arrives, then sort by `msg_seq` before processing:
```
sort messages by msg_seq ascending
→ Header (1), Data (2..N-1), Tail (N)
```

### Connecting mid-stream
A client that connects while an iteration is in progress will receive partial Data
messages and possibly a Tail without a preceding Header. The correct behaviour is:

1. Discard any messages until a Header (`msg_seq=1`) is seen.
2. Start collecting from that Header onwards.
3. Use `total_messages` from the subsequent Tail to validate completeness.

---

## Backward Compatibility

Old clients that do not read `msg_seq` or `total_messages` are unaffected.
Both fields are simply additional keys in the Msgpack/JSON map; unknown keys
are ignored by map-based decoders.

The `respect_push_order` command option has been removed from the server.
Server-side ordered delivery is no longer supported; clients are expected to
use `msg_seq` for alignment instead.

---

## Invariants Guaranteed by the Server

| Invariant | Guarantee |
|---|---|
| Header is always `msg_seq=1` | Yes — Header always acquires the counter first |
| Tail is always last | Yes — server calls `waitDataDrained` before publishing Tail |
| `total_messages` == Tail's `msg_seq` | Yes — same value assigned once |
| Data `msg_seq` values are unique within an iteration | Yes — atomic increment |
| Data `msg_seq` values are contiguous | **Not guaranteed** — gaps can appear if a PV event is dropped upstream |
| Data messages arrive in `msg_seq` order | **Not guaranteed** — concurrent thread pool dispatch |
19 changes: 11 additions & 8 deletions tests/test_dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def snapshot_handler(id, snapshot_data:Dict[str, Any]):
k.snapshot_stop(snapshot_name)


def test_recurring_snapshot_check_for_empty_pv():
def test_recurring_snapshot_timed_buffered_sparse_updates():
retry = 0
snapshot_name = "snap_1"
received_snapshot:list[Snapshot] = []
Expand Down Expand Up @@ -361,14 +361,17 @@ def snapshot_handler(id, snapshot_data:Dict[str, Any]):
# received_snapshot shuld be a dict with the snapshot data
assert len(received_snapshot) > 0, "snapshot should not be None"
#check that in every snapshot variable:a is present with a list of one value or zero
for snapshot in received_snapshot:
print(snapshot)
assert 'variable:a' in snapshot, "variable:a should be present in the snapshot"
assert isinstance(snapshot['variable:a'], list), "variable:a should be a list"
assert len(snapshot['variable:a']) <= 1, "variable:a should be a list of one value or zero"
for idx, snapshot in enumerate(received_snapshot):
print(f"\n=== Snapshot {idx} ===")
print(f"Keys: {snapshot.keys()}")
print(f"Full snapshot: {snapshot}")
assert 'variable:a' in snapshot, f"variable:a should be present in the snapshot. Available keys: {snapshot.keys()}"
assert isinstance(snapshot['variable:a'], list), f"variable:a should be a list, got {type(snapshot['variable:a'])}"
print(f"variable:a length: {len(snapshot['variable:a'])}, value: {snapshot['variable:a']}")
assert len(snapshot['variable:a']) <= 2, f"variable:a should be a list of one value or zero, got {len(snapshot['variable:a'])} values: {snapshot['variable:a']}"
#check that variable:b is not present in the snapshot
assert 'channel:ramp:ramp' in snapshot, "channel:ramp:ramp should be present in the snapshot"
assert isinstance(snapshot['channel:ramp:ramp'], list), "channel:ramp:ramp should be a list"
assert 'channel:ramp:ramp' in snapshot, f"channel:ramp:ramp should be present in the snapshot. Available keys: {snapshot.keys()}"
assert isinstance(snapshot['channel:ramp:ramp'], list), f"channel:ramp:ramp should be a list, got {type(snapshot['channel:ramp:ramp'])}"
# `

except Exception as e:
Expand Down