Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 53 additions & 48 deletions k2eg/dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,66 +228,71 @@ def __consumer_handler(self):
snapshot.results
)
elif from_topic in self.reply_recurring_snapsthot_message:
# iit should be a recurring snapshot
# it should be a recurring snapshot
snapshot = self.reply_recurring_snapsthot_message[from_topic]
message_type = decoded_message.get('message_type', None)
if message_type is None:
continue

# Get the current iteration from the message
message_iteration = decoded_message.get('iter_index', 0)

if message_type == 0 and (snapshot.state == SnapshotState.INITIALIZED or snapshot.state == SnapshotState.TAIL_RECEIVED):
snapshot.state = SnapshotState.HEADER_RECEVED
#get the timestsamp and iteration
# Get the timestamp and iteration
snapshot.timestamp = decoded_message.get('timestamp', None)
snapshot.interation = decoded_message.get('iter_index', 0)
logging.debug(f"recurring snapshot {from_topic} header received [ state {snapshot.state}] and iteration {snapshot.interation}")
elif message_type == 1 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING):
## we are acquireing data for the snapshtot
snapshot.state = SnapshotState.DATA_ACQUIRING
# remove the timestamp, iter_index, message_type and message_size from the message
decoded_message.pop('timestamp', None)
decoded_message.pop('iter_index', None)
decoded_message.pop('message_type', None)
decoded_message.pop('message-size', None)
#now the remaining key is the pv name and should be used as key
# add the message to the snapshot results and be added to snapshot.results[key] = key-value
if len(decoded_message) == 1:
pv_name, value = next(iter(decoded_message.items()))
if pv_name not in snapshot.results:
snapshot.results[pv_name] = []
snapshot.results[pv_name].append(value)
logging.debug(f"recurring snapshot {from_topic} data received [ state {snapshot.state}] messages {len(snapshot.results)} and iteration {snapshot.interation}")

elif message_type == 1 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING):
# Only process data messages that match the current iteration
if message_iteration == snapshot.interation:
snapshot.state = SnapshotState.DATA_ACQUIRING
# Remove metadata from the message
decoded_message.pop('timestamp', None)
decoded_message.pop('iter_index', None)
decoded_message.pop('message_type', None)
decoded_message.pop('message-size', None)

# Now the remaining key is the pv name
if len(decoded_message) == 1:
pv_name, value = next(iter(decoded_message.items()))
if pv_name not in snapshot.results:
snapshot.results[pv_name] = []
snapshot.results[pv_name].append(value)
logging.debug(f"recurring snapshot {from_topic} data received [ state {snapshot.state}] messages {sum(len(v) for v in snapshot.results.values())} and iteration {snapshot.interation}")
else:
logging.debug(f"Ignoring data message from iteration {message_iteration}, current iteration is {snapshot.interation}")

elif message_type == 2 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING):
# we got the completion message on snapshot that we are managing
# renew the snapshot
snapshot.state = SnapshotState.TAIL_RECEIVED
logging.debug(f"recurring snapshot {from_topic} tail received [ state {snapshot.state}] messages {len(snapshot.results)} and iteration {snapshot.interation}")
# Prepare a dictionary with only PV values, inter_index, and timestamp
handler_data = {}
# Add 'iter_index' and 'timestamp' if present and not already set
handler_data["iteration"] = snapshot.interation
# timetamp of the header
handler_data["header_timestamp"] = snapshot.timestamp
# timstamp of the tail
handler_data["tail_timestamp"] = decoded_message.get('timestamp', snapshot.timestamp)
# add key and value from snapshot.results
# to the handler data
for pv_name, values in snapshot.results.items():
handler_data[pv_name] = values


# and call async handler in another thread
executor.submit(
snapshot.handler,
msg_id,
handler_data
)
# clear the dictionary
snapshot.clear()
# Only process tail messages that match the current iteration
if message_iteration == snapshot.interation:
snapshot.state = SnapshotState.TAIL_RECEIVED
logging.debug(f"recurring snapshot {from_topic} tail received [ state {snapshot.state}] messages {sum(len(v) for v in snapshot.results.values())} and iteration {snapshot.interation}")

# Build handler data with metadata
tail_ts = decoded_message.get('timestamp', None)
handler_data = {
"iteration": snapshot.interation,
"header_timestamp": snapshot.timestamp,
"tail_timestamp": tail_ts,
"timestamp": tail_ts,
}
# Add PV data
for pv_name in snapshot.pv_list:
if pv_name in snapshot.results:
handler_data[pv_name] = snapshot.results[pv_name]

# Call handler asynchronously
executor.submit(
snapshot.handler,
from_topic,
handler_data
)
else:
logging.debug(f"Ignoring tail message from iteration {message_iteration}, current iteration is {snapshot.interation}")
else:
#log the error
logging.error(
f"Error during snapshot {msg_id} with message type {message_type} and state {snapshot.state} and iteration {snapshot.interation} and timestamp {snapshot.timestamp}"
)
logging.error(f"Error during snapshot {from_topic} with message type {message_type} and state {snapshot.state} and iteration {snapshot.interation} and timestamp {snapshot.timestamp}")



Expand Down
7 changes: 4 additions & 3 deletions tests/test_dml.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ def test_recurring_snapshot_time_buffered_with_sub_push():
def snapshot_handler(id, snapshot_data:Dict[str, Any]):
nonlocal snapshot_name
nonlocal received_snapshot
print(f"snapshot_handler called with id: {id}")
if snapshot_name == id:
received_snapshot.append(snapshot_data)

Expand All @@ -490,10 +491,10 @@ def snapshot_handler(id, snapshot_data:Dict[str, Any]):
result = k.snapshot_recurring(
SnapshotProperties(
snapshot_name = snapshot_name,
time_window = 4000,
time_window = 1000,
repeat_delay = 0,
sub_push_delay_msec = 1000,
pv_uri_list = ['pva://channel:ramp:ramp'],
sub_push_delay_msec = 500,
pv_uri_list = ['ca://channel:ramp:ramp'],
triggered=False,
type=SnapshotType.TIMED_BUFFERED,
pv_field_filter_list = ['value'],
Expand Down