diff --git a/.gitignore b/.gitignore index 6c5395d..2899fea 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ k2eg/__pycache__* dist k2eg.egg-info k2eg/cli/__pycache__* +.claude \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json index 9b38853..81aae21 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,5 +3,6 @@ "tests" ], "python.testing.unittestEnabled": false, - "python.testing.pytestEnabled": true + "python.testing.pytestEnabled": true, + "kafka.clusters.selected": "kafka9092-a2Fma2E6OTA5Mg" } \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index e1f6ce5..cda1248 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,7 +35,7 @@ services: - ./tests/epics-test.db:/db/softioc.db k2eg: - image: ghcr.io/slaclab/k2eg/ubuntu:latest + image: ghcr.io/slaclab/k2eg/ubuntu:v0.3.65 pull_policy: always command: - /bin/bash diff --git a/k2eg/dml.py b/k2eg/dml.py index b453ecf..dfcd8e9 100644 --- a/k2eg/dml.py +++ b/k2eg/dml.py @@ -216,12 +216,12 @@ def __consumer_handler(self): decoded_message.pop('error', None) decoded_message.pop('reply_id', None) decoded_message.pop('message-size', None) - # the message contains a snapshot value - if len(decoded_message) == 1: - pv_name, value = next(iter(decoded_message.items())) - if pv_name not in snapshot.results: - snapshot.results[pv_name] = [] - snapshot.results[pv_name].append(value) + decoded_message.pop('msg_seq', None) + # Now the remaining key is the pv name + pv_name, value = next(iter(decoded_message.items())) + if pv_name not in snapshot.results: + snapshot.results[pv_name] = [] + snapshot.results[pv_name].append(value) else: logger.debug(f"Snapshot {msg_id} compelted with error {decoded_message.get('error', 0)}") # we got the completion message so @@ -259,13 +259,16 @@ def __consumer_handler(self): decoded_message.pop('iter_index', None) decoded_message.pop('message_type', None) decoded_message.pop('message-size', None) + decoded_message.pop('msg_seq', None) # Now the remaining key is the pv name - if len(decoded_message) == 1: - pv_name, value = next(iter(decoded_message.items())) + pv_name, value = next(iter(decoded_message.items())) + if pv_name in snapshot.pv_list: if pv_name not in snapshot.results: snapshot.results[pv_name] = [] snapshot.results[pv_name].append(value) + else: + logger.warning(f"Received data for unexpected PV '{pv_name}' in snapshot {from_topic}") #logger.debug(f"recurring snapshot {from_topic} data received [ state {snapshot.state}] messages {sum(len(v) for v in snapshot.results.values())} and iteration {snapshot.interation}") else: logger.debug(f"Ignoring data message from iteration {message_iteration}, current iteration is {snapshot.interation}") diff --git a/snapshot-sequencing-protocol.md b/snapshot-sequencing-protocol.md new file mode 100644 index 0000000..8882f96 --- /dev/null +++ b/snapshot-sequencing-protocol.md @@ -0,0 +1,134 @@ +# Snapshot Sequencing Protocol — `msg_seq` / `total_messages` + +## Context + +k2eg repeating snapshots emit a chain of three message types per iteration: + +``` +Header (message_type=0) → Data (message_type=1) × N → Tail (message_type=2) +``` + +All three message types are published to the same topic (`snapshot_name`). +Previously there was no way for a client to know the order or count of messages +within an iteration. This update adds two sequence fields to solve that. + +--- + +## New Fields + +### `msg_seq` — present in Header, Data, and Tail + +| Message type | Value | +|---|---| +| Header | Always `1` | +| Data | `2`, `3`, … (one per PV event, assigned atomically; order within Data messages is not guaranteed) | +| Tail | `N` where N = total number of messages in the iteration | + +- Type: `uint64` +- Monotonically increasing within one iteration, starting at 1 +- Resets to 1 for every new iteration (`iter_index`) + +### `total_messages` — present in Tail only + +- Type: `uint64` +- Equals the Tail's own `msg_seq` +- Tells the client exactly how many messages (Header + all Data + Tail) belong to + the current iteration +- Formula: `total_messages = 1 (Header) + number_of_pv_events (Data) + 1 (Tail)` + +--- + +## Message Shapes (Msgpack map keys) + +### Header (`message_type = 0`) +``` +{ + "message_type": 0, + "snapshot_name": , + "timestamp": , + "iter_index": , + "msg_seq": ← NEW, always 1 +} +``` + +### Data (`message_type = 1`) +``` +{ + "message_type": 1, + "timestamp": , + "iter_index": , + "msg_seq": ← NEW, >= 2 + : +} +``` + +### Tail (`message_type = 2`) +``` +{ + "message_type": 2, + "snapshot_name": , + "timestamp": , + "iter_index": , + "error": , + "error_message": (omitted when error == 0) + "msg_seq": ← NEW, equals total_messages + "total_messages": ← NEW, total count for this iteration +} +``` + +--- + +## How a Client Should Use These Fields + +### Identifying iteration boundaries +- A message with `message_type=0` (`msg_seq=1`) marks the start of a new iteration. +- A message with `message_type=2` marks the end. At that point `total_messages` is known. + +### Detecting missing messages +``` +expected_count = total_messages # from Tail +received_count = number of messages seen with this iter_index +if received_count < expected_count: + # at least one message was lost +``` + +### Reordering out-of-order delivery +If the transport can reorder messages, buffer all messages for an `iter_index` until +the Tail arrives, then sort by `msg_seq` before processing: +``` +sort messages by msg_seq ascending +→ Header (1), Data (2..N-1), Tail (N) +``` + +### Connecting mid-stream +A client that connects while an iteration is in progress will receive partial Data +messages and possibly a Tail without a preceding Header. The correct behaviour is: + +1. Discard any messages until a Header (`msg_seq=1`) is seen. +2. Start collecting from that Header onwards. +3. Use `total_messages` from the subsequent Tail to validate completeness. + +--- + +## Backward Compatibility + +Old clients that do not read `msg_seq` or `total_messages` are unaffected. +Both fields are simply additional keys in the Msgpack/JSON map; unknown keys +are ignored by map-based decoders. + +The `respect_push_order` command option has been removed from the server. +Server-side ordered delivery is no longer supported; clients are expected to +use `msg_seq` for alignment instead. + +--- + +## Invariants Guaranteed by the Server + +| Invariant | Guarantee | +|---|---| +| Header is always `msg_seq=1` | Yes — Header always acquires the counter first | +| Tail is always last | Yes — server calls `waitDataDrained` before publishing Tail | +| `total_messages` == Tail's `msg_seq` | Yes — same value assigned once | +| Data `msg_seq` values are unique within an iteration | Yes — atomic increment | +| Data `msg_seq` values are contiguous | **Not guaranteed** — gaps can appear if a PV event is dropped upstream | +| Data messages arrive in `msg_seq` order | **Not guaranteed** — concurrent thread pool dispatch | diff --git a/tests/test_dml.py b/tests/test_dml.py index 3b40b57..1ab986b 100644 --- a/tests/test_dml.py +++ b/tests/test_dml.py @@ -326,7 +326,7 @@ def snapshot_handler(id, snapshot_data:Dict[str, Any]): k.snapshot_stop(snapshot_name) -def test_recurring_snapshot_check_for_empty_pv(): +def test_recurring_snapshot_timed_buffered_sparse_updates(): retry = 0 snapshot_name = "snap_1" received_snapshot:list[Snapshot] = [] @@ -361,14 +361,17 @@ def snapshot_handler(id, snapshot_data:Dict[str, Any]): # received_snapshot shuld be a dict with the snapshot data assert len(received_snapshot) > 0, "snapshot should not be None" #check that in every snapshot variable:a is present with a list of one value or zero - for snapshot in received_snapshot: - print(snapshot) - assert 'variable:a' in snapshot, "variable:a should be present in the snapshot" - assert isinstance(snapshot['variable:a'], list), "variable:a should be a list" - assert len(snapshot['variable:a']) <= 1, "variable:a should be a list of one value or zero" + for idx, snapshot in enumerate(received_snapshot): + print(f"\n=== Snapshot {idx} ===") + print(f"Keys: {snapshot.keys()}") + print(f"Full snapshot: {snapshot}") + assert 'variable:a' in snapshot, f"variable:a should be present in the snapshot. Available keys: {snapshot.keys()}" + assert isinstance(snapshot['variable:a'], list), f"variable:a should be a list, got {type(snapshot['variable:a'])}" + print(f"variable:a length: {len(snapshot['variable:a'])}, value: {snapshot['variable:a']}") + assert len(snapshot['variable:a']) <= 2, f"variable:a should be a list of one value or zero, got {len(snapshot['variable:a'])} values: {snapshot['variable:a']}" #check that variable:b is not present in the snapshot - assert 'channel:ramp:ramp' in snapshot, "channel:ramp:ramp should be present in the snapshot" - assert isinstance(snapshot['channel:ramp:ramp'], list), "channel:ramp:ramp should be a list" + assert 'channel:ramp:ramp' in snapshot, f"channel:ramp:ramp should be present in the snapshot. Available keys: {snapshot.keys()}" + assert isinstance(snapshot['channel:ramp:ramp'], list), f"channel:ramp:ramp should be a list, got {type(snapshot['channel:ramp:ramp'])}" # ` except Exception as e: