diff --git a/k2eg/dml.py b/k2eg/dml.py index 3337019..e5655e7 100644 --- a/k2eg/dml.py +++ b/k2eg/dml.py @@ -228,66 +228,71 @@ def __consumer_handler(self): snapshot.results ) elif from_topic in self.reply_recurring_snapsthot_message: - # iit should be a recurring snapshot + # it should be a recurring snapshot snapshot = self.reply_recurring_snapsthot_message[from_topic] message_type = decoded_message.get('message_type', None) if message_type is None: continue + + # Get the current iteration from the message + message_iteration = decoded_message.get('iter_index', 0) + if message_type == 0 and (snapshot.state == SnapshotState.INITIALIZED or snapshot.state == SnapshotState.TAIL_RECEIVED): snapshot.state = SnapshotState.HEADER_RECEVED - #get the timestsamp and iteration + # Get the timestamp and iteration snapshot.timestamp = decoded_message.get('timestamp', None) snapshot.interation = decoded_message.get('iter_index', 0) logging.debug(f"recurring snapshot {from_topic} header received [ state {snapshot.state}] and iteration {snapshot.interation}") - elif message_type == 1 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING): - ## we are acquireing data for the snapshtot - snapshot.state = SnapshotState.DATA_ACQUIRING - # remove the timestamp, iter_index, message_type and message_size from the message - decoded_message.pop('timestamp', None) - decoded_message.pop('iter_index', None) - decoded_message.pop('message_type', None) - decoded_message.pop('message-size', None) - #now the remaining key is the pv name and should be used as key - # add the message to the snapshot results and be added to snapshot.results[key] = key-value - if len(decoded_message) == 1: - pv_name, value = next(iter(decoded_message.items())) - if pv_name not in snapshot.results: - snapshot.results[pv_name] = [] - snapshot.results[pv_name].append(value) - logging.debug(f"recurring snapshot {from_topic} data received [ state {snapshot.state}] messages {len(snapshot.results)} and iteration {snapshot.interation}") + + elif message_type == 1 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING): + # Only process data messages that match the current iteration + if message_iteration == snapshot.interation: + snapshot.state = SnapshotState.DATA_ACQUIRING + # Remove metadata from the message + decoded_message.pop('timestamp', None) + decoded_message.pop('iter_index', None) + decoded_message.pop('message_type', None) + decoded_message.pop('message-size', None) + + # Now the remaining key is the pv name + if len(decoded_message) == 1: + pv_name, value = next(iter(decoded_message.items())) + if pv_name not in snapshot.results: + snapshot.results[pv_name] = [] + snapshot.results[pv_name].append(value) + logging.debug(f"recurring snapshot {from_topic} data received [ state {snapshot.state}] messages {sum(len(v) for v in snapshot.results.values())} and iteration {snapshot.interation}") + else: + logging.debug(f"Ignoring data message from iteration {message_iteration}, current iteration is {snapshot.interation}") + elif message_type == 2 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING): - # we got the completion message on snapshot that we are managing - # renew the snapshot - snapshot.state = SnapshotState.TAIL_RECEIVED - logging.debug(f"recurring snapshot {from_topic} tail received [ state {snapshot.state}] messages {len(snapshot.results)} and iteration {snapshot.interation}") - # Prepare a dictionary with only PV values, inter_index, and timestamp - handler_data = {} - # Add 'iter_index' and 'timestamp' if present and not already set - handler_data["iteration"] = snapshot.interation - # timetamp of the header - handler_data["header_timestamp"] = snapshot.timestamp - # timstamp of the tail - handler_data["tail_timestamp"] = decoded_message.get('timestamp', snapshot.timestamp) - # add key and value from snapshot.results - # to the handler data - for pv_name, values in snapshot.results.items(): - handler_data[pv_name] = values - - - # and call async handler in another thread - executor.submit( - snapshot.handler, - msg_id, - handler_data - ) - # clear the dictionary - snapshot.clear() + # Only process tail messages that match the current iteration + if message_iteration == snapshot.interation: + snapshot.state = SnapshotState.TAIL_RECEIVED + logging.debug(f"recurring snapshot {from_topic} tail received [ state {snapshot.state}] messages {sum(len(v) for v in snapshot.results.values())} and iteration {snapshot.interation}") + # Build handler data with metadata + tail_ts = decoded_message.get('timestamp', None) + handler_data = { + "iteration": snapshot.interation, + "header_timestamp": snapshot.timestamp, + "tail_timestamp": tail_ts, + "timestamp": tail_ts, + } + # Add PV data + for pv_name in snapshot.pv_list: + if pv_name in snapshot.results: + handler_data[pv_name] = snapshot.results[pv_name] + + # Call handler asynchronously + executor.submit( + snapshot.handler, + from_topic, + handler_data + ) + else: + logging.debug(f"Ignoring tail message from iteration {message_iteration}, current iteration is {snapshot.interation}") else: - #log the error - logging.error( - f"Error during snapshot {msg_id} with message type {message_type} and state {snapshot.state} and iteration {snapshot.interation} and timestamp {snapshot.timestamp}" - ) + logging.error(f"Error during snapshot {from_topic} with message type {message_type} and state {snapshot.state} and iteration {snapshot.interation} and timestamp {snapshot.timestamp}") diff --git a/tests/test_dml.py b/tests/test_dml.py index fc28460..7837407 100644 --- a/tests/test_dml.py +++ b/tests/test_dml.py @@ -480,6 +480,7 @@ def test_recurring_snapshot_time_buffered_with_sub_push(): def snapshot_handler(id, snapshot_data:Dict[str, Any]): nonlocal snapshot_name nonlocal received_snapshot + print(f"snapshot_handler called with id: {id}") if snapshot_name == id: received_snapshot.append(snapshot_data) @@ -490,10 +491,10 @@ def snapshot_handler(id, snapshot_data:Dict[str, Any]): result = k.snapshot_recurring( SnapshotProperties( snapshot_name = snapshot_name, - time_window = 4000, + time_window = 1000, repeat_delay = 0, - sub_push_delay_msec = 1000, - pv_uri_list = ['pva://channel:ramp:ramp'], + sub_push_delay_msec = 500, + pv_uri_list = ['ca://channel:ramp:ramp'], triggered=False, type=SnapshotType.TIMED_BUFFERED, pv_field_filter_list = ['value'],