diff --git a/.github/workflows/BuildAndTest.yml b/.github/workflows/BuildAndTest.yml index 9977599..3e83fcf 100644 --- a/.github/workflows/BuildAndTest.yml +++ b/.github/workflows/BuildAndTest.yml @@ -50,4 +50,51 @@ jobs: if: always() run: | docker compose logs - docker compose down -v --rmi all \ No newline at end of file + docker compose down -v --rmi all + + publish_dev: + name: Publish Dev Package + runs-on: ubuntu-latest + needs: + - execute_test + if: ${{ github.event.pull_request.head.repo.fork == false }} + steps: + - name: Checkout source code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Set up Python "3.9" + uses: actions/setup-python@v4 + with: + python-version: 3.9 + - name: Install GitVersion + uses: gittools/actions/gitversion/setup@v0 + with: + versionSpec: '5.x' + - name: Find Version + id: gitversion + uses: gittools/actions/gitversion/execute@v0 + with: + useConfigFile: true + - name: Manage Version (PR dev tag) + run: | + PR_NUMBER='${{ github.event.pull_request.number }}' + # Create a PEP 440-compliant dev version that signals PR builds, e.g. 0.2.21.dev50{commits} + VERSION_DEV="$GitVersion_MajorMinorPatch.dev${PR_NUMBER}$GitVersion_CommitsSinceVersionSource" + echo "Setting PR dev version: $VERSION_DEV" + sed -i "s#^version = \"0.0.0\"#version = \"$VERSION_DEV\"#" pyproject.toml + grep '^version =' pyproject.toml + - name: Install build deps + run: | + python -m pip install --upgrade pip + pip install build twine + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Build package + run: | + python -m build . + - name: Upload on TestPyPi + env: + TWINE_USERNAME: ${{ secrets.TWINE_USERNAME }} + TWINE_PASSWORD: ${{ secrets.TWINE_PASSWORD }} + run: | + twine upload -u "$TWINE_USERNAME" -p "$TWINE_PASSWORD" --verbose --skip-existing dist/* diff --git a/.gitignore b/.gitignore index 6c5395d..842f586 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ k2eg/__pycache__* dist k2eg.egg-info k2eg/cli/__pycache__* +kafka.log +pv_list.txt +k2eg/environment/lcls* diff --git a/k2eg/broker.py b/k2eg/broker.py index 3341492..d3133c4 100644 --- a/k2eg/broker.py +++ b/k2eg/broker.py @@ -3,6 +3,7 @@ import json import os import logging +import logging.handlers import re import threading import configparser @@ -11,8 +12,19 @@ from confluent_kafka import Consumer, TopicPartition, Producer, OFFSET_END, OFFSET_BEGINNING from confluent_kafka import KafkaError, KafkaException + logger = logging.getLogger(__name__) +kafka_syslog_logger = logging.getLogger("k2eg-rdkafka") +kafka_syslog_logger.setLevel(logging.DEBUG) +syslog_handler = logging.handlers.RotatingFileHandler("kafka.log", maxBytes=10*1024*1024, backupCount=2) +# syslog_handler = logging.handlers.SysLogHandler(address="/dev/log") +formatter = logging.Formatter('%(name)s: %(levelname)s %(message)s') +syslog_handler.setFormatter(formatter) +kafka_syslog_logger.addHandler(syslog_handler) +# Prevent propagation to root logger to avoid console output +kafka_syslog_logger.propagate = False + class SnapshotType(Enum): """ Enum to define the type of snapshot @@ -163,7 +175,8 @@ def __init__( enable_kafka_debug = os.getenv( 'K2EG_PYTHON_ENABLE_KAFKA_DEBUG_LOG', 'false' - ).lower in ("yes", "true", "t", "1") + ).lower() in ("yes", "true", "t", "1") + self.__enviroment_set = enviroment_set # Create a new ConfigParser object @@ -193,17 +206,26 @@ def __init__( 'session.timeout.ms': 30000, # Session timeout 'heartbeat.interval.ms': 3000, # Heartbeat interval 'max.poll.interval.ms': 300000, # Max poll interval + + # Ensure we capture rdkafka internal errors and warnings + 'log_level': 4, # Capture WARNING and above (errors, critical, etc.) } if enable_kafka_debug is True: config_consumer['debug'] = 'consumer,fetch' + config_consumer['log_level'] = 7 # Capture all debug messages when debug enabled - self.__consumer = Consumer(config_consumer) + self.__consumer = Consumer(config_consumer, logger=kafka_syslog_logger) config_producer = { 'bootstrap.servers': self.__config.get( self.__enviroment_set, 'kafka_broker_url' ), + # Ensure we capture rdkafka internal errors and warnings for producer too + 'log_level': 4, # Capture WARNING and above #'debug': 'consumer,cgrp,topic,fetch', } + if enable_kafka_debug is True: + config_producer['log_level'] = 7 # Capture all debug messages when debug enabled + self.__producer = Producer(config_producer) self.__reply_topic = app_name + '-reply' self.__reply_partition_assigned = threading.Event() @@ -271,7 +293,7 @@ def wait_for_reply_available(self, timeout): end_time = start_time + timeout while self.__reply_partition_assigned.wait(1) is False: if time.time() > end_time: - raise TimeoutError("Function timed out") + raise TimeoutError(f"Timeout waiting to join the reply topic {self.__reply_topic}") logger.debug("waiting for reply topic to join") self.__consumer.poll(1) diff --git a/k2eg/dml.py b/k2eg/dml.py index b453ecf..b423379 100644 --- a/k2eg/dml.py +++ b/k2eg/dml.py @@ -19,6 +19,20 @@ _protocol_regex = r"^(pva?|ca)://((?:[A-Za-z0-9-_:]+(?:\.[A-Za-z0-9-_]+)*))$" def _filter_pv_uri(uri: str): + """Parse and validate a PV URI. + + Accepts URIs in the form: + - `pva://` + - `ca://` + + Returns the protocol and resource if valid; otherwise `(None, None)`. + + Args: + uri: The PV URI to validate. + + Returns: + tuple[str|None, str|None]: `(protocol, resource)` when valid; otherwise `(None, None)`. + """ match = re.match(_protocol_regex, uri) if match: return match.group(1), match.group(2) @@ -39,6 +53,7 @@ def __init__(self, error, message): self.error = error class SnapshotState(Enum): + """State machine for recurring snapshot assembly.""" INITIALIZED = 0 HEADER_RECEVED = 1 DATA_ACQUIRING = 2 @@ -46,6 +61,18 @@ class SnapshotState(Enum): @dataclass class Snapshot: + """Holds state and buffers for snapshot assembly. + + Attributes: + handler: Callback invoked with snapshot results when ready. + properties: Optional `SnapshotProperties` used to configure recurring snapshots. + publishing_topic: Kafka topic used for recurring snapshots. + state: Current `SnapshotState` of a recurring snapshot. + timestamp: Timestamp from the header message (recurring snapshots). + interation: Current iteration index for recurring snapshots. + pv_list: List of PV names expected in recurring snapshots. + results: Aggregated values per PV name. + """ handler: Callable[[str, Dict[str, Any]], None] properties: SnapshotProperties = None publishing_topic: str = None @@ -55,23 +82,41 @@ class Snapshot: pv_list: List[str] = field(default_factory=list) results: Dict[str, List[Any]] = field(default_factory=dict[str, List[Any]]) def init(self): - # fill the results with empty lists for each pv + """Initialize the results dict with empty lists per PV in `pv_list`.""" for pv in self.pv_list: self.results[pv] = [] def clear(self): - """Clear all lists in the results dictionary without removing the keys.""" + """Clear all lists in the results dictionary without removing keys.""" for key in self.results: self.results[key] = [] class dml: - """K2EG client""" + """K2EG client. + + Provides high-level operations to interact with the K2EG gateway via Kafka: + - `get`/`put` + - `monitor`/`monitor_many` and `stop_monitor` + - one-shot `snapshot` and synchronous `snapshot_sync` + - `snapshot_recurring`, `snapshost_trigger`, and `snapshot_stop` + """ def __init__( self, environment_id: str, app_name: str, group_name: str = None, poll_timeout: float = 0.01): + """Create a new client instance. + + Args: + environment_id: Environment identifier used to load Kafka configuration. + app_name: Logical application name; also used to build the reply topic. + group_name: Optional consumer group; defaults to `app_name` if not provided. + poll_timeout: Poll timeout (seconds) for the consumer thread. + + Raises: + ValueError: If `app_name` is not provided. + """ if app_name is None: raise ValueError( "The app name is mandatory" @@ -105,9 +150,25 @@ def __del__(self): self.close() def __from_json(self, j_msg): + """Decode a JSON-encoded payload. + + Not implemented in this client version. Present for future parity with + other serialization formats. + """ print('__from_json') def __from_msgpack(self, m_msg): + """Decode a MessagePack-encoded payload. + + Extracts a message id from well-known keys in the decoded dictionary and + annotates the payload with the raw message size under `message-size`. + + Args: + m_msg: Raw MessagePack bytes. + + Returns: + tuple[str|None, dict|None]: `(msg_id, decoded_payload)` or `(None, None)`. + """ msg_id = None converted_msg = None decodec_msg = msgpack.loads(m_msg) @@ -129,10 +190,22 @@ def __from_msgpack(self, m_msg): return msg_id, converted_msg def __from_msgpack_compack(self, mc_msg): + """Decode a compact-MessagePack payload. + + Not implemented in this client version. Present for format completeness. + """ print('__from_msgpack_compack') def __decode_message(self, msg): - """ Decode single message + """Decode a single Kafka message based on `k2eg-ser-type` header. + + Supports `json`, `msgpack`, and `msgpack-compact` (placeholders for json/compact). + + Args: + msg: The Kafka message object. + + Returns: + tuple[str|None, dict|None]: `(msg_id, decoded_payload)` or `(None, None)`. """ msg_id = None converted_msg = None @@ -160,149 +233,194 @@ def __decode_message(self, msg): return msg_id, converted_msg def process_event(self, topic_name, msg_id, decoded_message): + """Dispatch a decoded monitor event to the registered handler. + + Args: + topic_name: Kafka topic where the event was received. + msg_id: PV name (monitor key). + decoded_message: Decoded payload for the PV. + """ logger.debug(f"received event on topic {topic_name}") self.__monitor_pv_handler[msg_id](msg_id, decoded_message) def __consumer_handler(self): - """ Consume message form kafka consumer - after the message has been consumed the header 'k2eg-ser-type' is checked - for find the serialization: - json, - msgpack, - msgpack-compact + """Background consumer loop. + + Continuously polls Kafka, decodes messages according to `k2eg-ser-type`, + and routes them to one of the following flows: + - Reply handling (condition variable signaling) + - Monitor event dispatch (per-PV handlers) + - One-shot snapshot aggregation + - Recurring snapshot assembly and handler dispatch """ with ThreadPoolExecutor(max_workers=10) as executor: while self.__consume_data: - message = self.__broker.get_next_message(self.poll_timeout) - if message is None: - continue - if message.error(): - if message.error().code() == KafkaError._PARTITION_EOF: - # End of partition event - logger.error( - f"{message.topic()} [{message.partition()}]reached "+ - f"end at offset {message.offset()}" - ) - else: + try: + message = self.__broker.get_next_message(self.poll_timeout) + if message is None: continue - else: - was_a_reply = False + if message.error(): + if message.error().code() == KafkaError._PARTITION_EOF: + # End of partition event; not an error for us + logger.debug( + f"{message.topic()} [{message.partition()}] reached end at offset {message.offset()}" + ) + # Skip errored messages + continue + from_topic = message.topic() - #msg_id could be a reply id or pv name + # msg_id could be a reply id, snapshot name, or pv name msg_id, decoded_message = self.__decode_message(message) if msg_id is None or decoded_message is None: continue + + # 1) Fast path for replies (use the condition only here) with self.reply_wait_condition: - was_a_reply = msg_id in self.reply_message - if was_a_reply is True: - # print(f"message received from topic: {message.topic()} offset: {message.offset()}") - logger.debug(f"received reply on topic {message.topic()}") + if msg_id in self.reply_message: + logger.debug(f"received reply on topic {from_topic}") self.reply_message[msg_id] = decoded_message self.reply_wait_condition.notify_all() - elif msg_id in self.__monitor_pv_handler: - executor.submit( - self.process_event, - message.topic(), - msg_id, - decoded_message[msg_id] - ) - elif msg_id in self.reply_snapsthot_message: - # if the message is not a reply and not a monitor - # it should be a snapshot - snapshot = self.reply_snapsthot_message[msg_id] - # check if the message is a snapshot completion error == 1 - if decoded_message.get('error', 0) == 0: - logger.debug(f"Added message to snapshot {msg_id}]") - decoded_message.pop('error', None) - decoded_message.pop('reply_id', None) - decoded_message.pop('message-size', None) - # the message contains a snapshot value - if len(decoded_message) == 1: - pv_name, value = next(iter(decoded_message.items())) + # handled as reply; move to next message + continue + + # 2) Monitor events (no need to hold the reply condition) + try: + # Read lock to safely check handler presence + with self.__lock.gen_rlock(): + monitor_handler_present = msg_id in self.__monitor_pv_handler + monitor_handler = self.__monitor_pv_handler.get(msg_id) + if monitor_handler_present and monitor_handler is not None: + # Prefer the pv-named payload; fall back defensively if missing + payload = decoded_message.get( + msg_id, + next((v for k, v in decoded_message.items() if k not in ( + 'reply_id', 'message-size', 'error', 'snapshot_name', 'timestamp', 'message_type', 'iter_index' + )), None) + ) + if payload is not None: + executor.submit(self.process_event, from_topic, msg_id, payload) + continue + except Exception as e: + logger.exception(f"Error dispatching monitor event for {msg_id}: {e}") + + # 3) One-shot snapshot aggregation (keyed by snapshot id) + with self.__lock.gen_rlock(): + snapshot = self.reply_snapsthot_message.get(msg_id) + if snapshot is not None: + # check if the message is a snapshot completion error == 1 + if decoded_message.get('error', 0) == 0: + logger.debug(f"Added message to snapshot {msg_id}") + # Remove metadata and keep only pv payload + for k in ('error', 'reply_id', 'message-size'): + decoded_message.pop(k, None) + if len(decoded_message) == 1: + pv_name, value = next(iter(decoded_message.items())) + with self.__lock.gen_wlock(): if pv_name not in snapshot.results: snapshot.results[pv_name] = [] snapshot.results[pv_name].append(value) + else: + logger.debug( + f"Snapshot {msg_id} completed with error {decoded_message.get('error', 0)}" + ) + # Completion: remove snapshot and invoke handler asynchronously + with self.__lock.gen_wlock(): + self.reply_snapsthot_message.pop(msg_id, None) + executor.submit(snapshot.handler, msg_id, snapshot.results) + continue + + # 4) Recurring snapshot stream (keyed by publishing topic) + with self.__lock.gen_rlock(): + r_snapshot = self.reply_recurring_snapsthot_message.get(from_topic) + if r_snapshot is not None: + message_type = decoded_message.get('message_type', None) + if message_type is None: + continue + + # Get the current iteration from the message + message_iteration = decoded_message.get('iter_index', 0) + + if message_type == 0 and (r_snapshot.state == SnapshotState.INITIALIZED or r_snapshot.state == SnapshotState.TAIL_RECEIVED): + with self.__lock.gen_wlock(): + r_snapshot.state = SnapshotState.HEADER_RECEVED + r_snapshot.timestamp = decoded_message.get('timestamp', None) + r_snapshot.interation = decoded_message.get('iter_index', 0) + logger.debug( + f"recurring snapshot {from_topic} header received [state {r_snapshot.state}] iteration {r_snapshot.interation}" + ) + continue + + if message_type == 1 and (r_snapshot.state == SnapshotState.HEADER_RECEVED or r_snapshot.state == SnapshotState.DATA_ACQUIRING): + if message_iteration == r_snapshot.interation: + with self.__lock.gen_wlock(): + r_snapshot.state = SnapshotState.DATA_ACQUIRING + # Remove metadata from the message + for k in ('timestamp', 'iter_index', 'message_type', 'message-size'): + decoded_message.pop(k, None) + # Now the remaining key is the pv name + if len(decoded_message) == 1: + pv_name, value = next(iter(decoded_message.items())) + with self.__lock.gen_wlock(): + if pv_name not in r_snapshot.results: + r_snapshot.results[pv_name] = [] + r_snapshot.results[pv_name].append(value) else: - logger.debug(f"Snapshot {msg_id} compelted with error {decoded_message.get('error', 0)}") - # we got the completion message so - # remove the snapshot from the list - del self.reply_snapsthot_message[msg_id] - # and call async handler in another thread - executor.submit( - snapshot.handler, - msg_id, - snapshot.results + logger.debug( + f"Ignoring data message from iteration {message_iteration}, current iteration is {r_snapshot.interation}" ) - elif from_topic in self.reply_recurring_snapsthot_message: - # it should be a recurring snapshot - snapshot = self.reply_recurring_snapsthot_message[from_topic] - message_type = decoded_message.get('message_type', None) - if message_type is None: - continue - - # Get the current iteration from the message - message_iteration = decoded_message.get('iter_index', 0) - - if message_type == 0 and (snapshot.state == SnapshotState.INITIALIZED or snapshot.state == SnapshotState.TAIL_RECEIVED): - snapshot.state = SnapshotState.HEADER_RECEVED - # Get the timestamp and iteration - snapshot.timestamp = decoded_message.get('timestamp', None) - snapshot.interation = decoded_message.get('iter_index', 0) - logger.debug(f"recurring snapshot {from_topic} header received [ state {snapshot.state}] and iteration {snapshot.interation}") - - elif message_type == 1 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING): - # Only process data messages that match the current iteration - if message_iteration == snapshot.interation: - snapshot.state = SnapshotState.DATA_ACQUIRING - # Remove metadata from the message - decoded_message.pop('timestamp', None) - decoded_message.pop('iter_index', None) - decoded_message.pop('message_type', None) - decoded_message.pop('message-size', None) - - # Now the remaining key is the pv name - if len(decoded_message) == 1: - pv_name, value = next(iter(decoded_message.items())) - if pv_name not in snapshot.results: - snapshot.results[pv_name] = [] - snapshot.results[pv_name].append(value) - #logger.debug(f"recurring snapshot {from_topic} data received [ state {snapshot.state}] messages {sum(len(v) for v in snapshot.results.values())} and iteration {snapshot.interation}") - else: - logger.debug(f"Ignoring data message from iteration {message_iteration}, current iteration is {snapshot.interation}") - - elif message_type == 2 and (snapshot.state == SnapshotState.HEADER_RECEVED or snapshot.state == SnapshotState.DATA_ACQUIRING): - # Only process tail messages that match the current iteration - if message_iteration == snapshot.interation: - snapshot.state = SnapshotState.TAIL_RECEIVED - # Build handler data with metadata + continue + + if message_type == 2 and (r_snapshot.state == SnapshotState.HEADER_RECEVED or r_snapshot.state == SnapshotState.DATA_ACQUIRING): + if message_iteration == r_snapshot.interation: + with self.__lock.gen_wlock(): + r_snapshot.state = SnapshotState.TAIL_RECEIVED tail_ts = decoded_message.get('timestamp', None) handler_data = { - "iteration": snapshot.interation, - "header_timestamp": snapshot.timestamp, + "iteration": r_snapshot.interation, + "header_timestamp": r_snapshot.timestamp, "tail_timestamp": tail_ts, "timestamp": tail_ts, } - # Add PV data - for pv_name in snapshot.pv_list: - if pv_name in snapshot.results: - handler_data[pv_name] = snapshot.results[pv_name] - - logger.debug(f"recurring snapshot {from_topic} tail received [ state {snapshot.state}] fromm {len(handler_data)} PVs with {sum(len(v) for v in snapshot.results.values())} messages on iteration {snapshot.interation}") - # Call handler asynchronously - executor.submit( - snapshot.handler, - from_topic, - handler_data - ) - snapshot.clear() # Clear results for the next iteration - else: - logger.debug(f"Ignoring tail message from iteration {message_iteration}, current iteration is {snapshot.interation}") + for pv_name in r_snapshot.pv_list: + if pv_name in r_snapshot.results: + handler_data[pv_name] = r_snapshot.results[pv_name] + logger.debug( + f"recurring snapshot {from_topic} tail received [state {r_snapshot.state}] from {len(handler_data)} PVs with {sum(len(v) for v in r_snapshot.results.values())} messages on iteration {r_snapshot.interation}" + ) + executor.submit(r_snapshot.handler, from_topic, handler_data) + with self.__lock.gen_wlock(): + r_snapshot.clear() # Prepare for next iteration else: - logger.error(f"Error during snapshot {from_topic} with message type {message_type} and state {snapshot.state} and iteration {snapshot.interation} and timestamp {snapshot.timestamp}") + logger.debug( + f"Ignoring tail message from iteration {message_iteration}, current iteration is {r_snapshot.interation}" + ) + continue + + # Unexpected state/type combination + logger.error( + f"Error during snapshot {from_topic} with message type {message_type} and state {r_snapshot.state} and iteration {r_snapshot.interation} and timestamp {r_snapshot.timestamp}" + ) + continue + + # Nothing matched; drop silently + except Exception as e: + # Ensure the consumer loop is resilient + logger.exception(f"Unhandled exception in consumer handler: {e}") def parse_pv_url(self, pv_url): + """Validate and split a PV URL into protocol and name. + + Args: + pv_url: The PV URL (e.g., `pva://dev:chan` or `ca://PV_NAME`). + + Returns: + tuple[str, str]: `(protocol, pv_name)`. + + Raises: + ValueError: If the URL is not well formed. + """ protocol, pv_name = _filter_pv_uri(pv_url) if protocol is None or pv_name is None: raise ValueError( @@ -311,15 +429,21 @@ def parse_pv_url(self, pv_url): return protocol, pv_name def __check_pv_name(self, pv_url): + """Placeholder for PV name validation (reserved for future use).""" pass def _check_pv_list(self, pv_uri_list: list[str]): + """Validate a list of PV URIs for supported protocol and format. + + Each PV URI must be prefixed with `pva://` or `ca://`. + """ for pv_uri in pv_uri_list: protocol, pv_name = self.parse_pv_url(pv_uri) if protocol.lower() not in ("pva", "ca"): raise ValueError("The protocol need to be one of 'pva' 'ca'") def __normalize_pv_name(self, pv_name): + """Normalize a PV name for topic usage (replace ':' with '_').""" return pv_name.replace(":", "_") def _validate_snapshot_name(self, snapshot_name: str) -> None: @@ -338,6 +462,19 @@ def _validate_snapshot_name(self, snapshot_name: str) -> None: ) def __wait_for_reply(self, new_reply_id, timeout) -> tuple[int, any]: + """Block until a reply for `new_reply_id` is available or timeout occurs. + + Args: + new_reply_id: Correlation id used for matching a reply. + timeout: Maximum time (seconds) to wait; `None` waits indefinitely. + + Returns: + tuple[int, Any]: `(-2, None)` on timeout, `(-1, None)` on spurious wake, + or `(0, reply_dict)` on success. Raises on error in reply. + + Raises: + OperationError: If the reply contains a non-zero error code. + """ #with self.reply_wait_condition: got_it = self.reply_wait_condition.wait_for( lambda: new_reply_id in self.reply_message and self.reply_message[new_reply_id] is not None, @@ -357,12 +494,24 @@ def __wait_for_reply(self, new_reply_id, timeout) -> tuple[int, any]: return 0, reply_msg def wait_for_backends(self): + """Block until the client joins the reply topic.""" logger.debug("Waiting for join kafka reply topic") self.__broker.wait_for_reply_available() def get(self, pv_url: str, timeout: float = None): - """ Perform the get operation - raise OperationTimeout when timeout has expired + """Retrieve the current value of a PV. + + Args: + pv_url: PV URL (e.g., `pva://dev:chan`). + timeout: Optional timeout in seconds. + + Returns: + dict | Any: The decoded PV value structure, or the raw reply dict. + + Raises: + ValueError: If the PV URL or protocol is invalid. + OperationTimeout: If the operation times out. + OperationError: If the server returns an error. """ protocol, pv_name = self.parse_pv_url(pv_url) if protocol.lower() != "pva" and protocol.lower() != "ca": @@ -396,16 +545,20 @@ def get(self, pv_url: str, timeout: float = None): return result def put(self, pv_url: str, value: MessagePackSerializable, timeout: float = None): - """ Set the value for a single pv + """Set the value of a PV. + Args: - pv_name (str): is the name of the pv - value (str): is the new value - protocol (str): the protocol of the pv, the default is pva - timeout (float): the timeout, in second or fraction + pv_url: PV URL to write. + value: A MessagePackSerializable payload to write. + timeout: Optional timeout in seconds. + + Returns: + dict: Reply dictionary on success. + Raises: - ValueError: if some parameter are not valid - - return the error code and a message in case the error code is not 0 + ValueError: If parameters are invalid. + OperationTimeout: If the operation times out. + OperationError: If the server returns an error. """ protocol, pv_name = self.parse_pv_url(pv_url) if protocol.lower() not in ("pva", "ca"): @@ -438,17 +591,20 @@ def put(self, pv_url: str, value: MessagePackSerializable, timeout: float = None def monitor(self, pv_url: str, handler: Callable[[str, dict], None], timeout: float = None): # noqa: E501 - """ Add a new monitor for pv if it is not already activated - Parameters - ---------- - pv_name : str - The name of the PV to monitor - handler: function - The handler to be called when a message is received - Rais: - ---------- - True: the monitor has been activated - False: otherwhise + """Start a monitor for a PV and invoke a handler on updates. + + Args: + pv_url: PV URL to monitor. + handler: Callback `(pv_name, value_dict)` invoked per update. + timeout: Optional timeout waiting for acknowledgment. + + Returns: + dict | None: Reply dictionary on success, or `None` if already active. + + Raises: + ValueError: If the PV URL or protocol is invalid. + OperationTimeout: If the operation times out. + OperationError: If the server returns an error. """ fetched = False protocol, pv_name = self.parse_pv_url(pv_url) @@ -486,17 +642,20 @@ def monitor(self, pv_url: str, handler: Callable[[str, dict], None], timeout: fl return result def monitor_many(self, pv_uri_list: list[str], handler: Callable[[str, dict], None], timeout: float = None): # noqa: E501 - """ Add a new monitor for pv if it is not already activated - Parameters - ---------- - pv_uri_list : list[str] - The name of the PV to monitor - handler: function - The handler to be called when a message is received - Rais: - ---------- - True: the monitor has been activated - False: otherwhise + """Start monitors for multiple PVs and invoke a handler on updates. + + Args: + pv_uri_list: List of PV URLs to monitor. Each entry must be prefixed with `pva://` or `ca://`. + handler: Callback `(pv_name, value_dict)` per PV update. + timeout: Optional timeout waiting for acknowledgment. + + Returns: + dict | None: Reply dictionary on success, or `None` if nothing to add. + + Raises: + ValueError: If a PV URL or protocol is invalid. + OperationTimeout: If the operation times out. + OperationError: If the server returns an error. """ fetched = False self._check_pv_list(pv_uri_list) @@ -540,11 +699,10 @@ def monitor_many(self, pv_uri_list: list[str], handler: Callable[[str, dict], No return result def stop_monitor(self, pv_name: str): # noqa: E501 - """ Remove movitor for a specific pv - Parameters - ---------- - pv_name : str - The name of the PV to monitor + """Stop monitoring a specific PV and unsubscribe from its topic. + + Args: + pv_name: The PV name (without protocol) previously monitored. """ with self.reply_wait_condition: # all is gone ok i can register the handler and subscribe @@ -552,8 +710,14 @@ def stop_monitor(self, pv_name: str): # noqa: E501 self.__broker.remove_topic(self.__normalize_pv_name(pv_name)) def snapshot(self, pv_uri_list: list[str], handler: Callable[[str, dict], None])->str: - """ Perform the snapshot creation - return the id to be used to match the snapthot returned asynchronously in the hanlder + """Request a one-shot snapshot and register a handler for results. + + Args: + pv_uri_list: List of PV URLs to snapshot. Each entry must be prefixed with `pva://` or `ca://`. + handler: Callback `(snapshot_id, data_dict)` invoked asynchronously. + + Returns: + str: The generated `snapshot_id` used to correlate results. """ #check if all the pv are wellformed self._check_pv_list(pv_uri_list) @@ -570,40 +734,24 @@ def snapshot(self, pv_uri_list: list[str], handler: Callable[[str, dict], None] return new_reply_id def snapshot_recurring(self, properties: SnapshotProperties, handler: Callable[[str, Dict[str, Any]], None], timeout: float = None): - """ - Create a new recurring snapshot for a list of process variables (PVs). + """Create a recurring snapshot and register a handler per iteration tail. - This method initiates a recurring snapshot operation for the specified PVs. - It registers a handler to be called asynchronously when snapshot data is available. - The method blocks until the snapshot is created and an acknowledgment is received from the server, - or until the specified timeout is reached. + Blocks until the snapshot is created (ack received) or `timeout` expires, then + starts listening to the provided publishing topic and asynchronously dispatches + assembled results on each iteration tail to `handler`. Args: - snapshot_name (str): The name to assign to the recurring snapshot. - pv_uri_list (list[str]): List of PV URIs to include in the snapshot. - handler (Callable[[str, dict], None]): Callback function to handle snapshot results. - The handler receives the snapshot ID and a dictionary containing the snapshot data. - timeout (float, optional): Maximum time to wait for the server acknowledgment, in seconds. - If None, waits indefinitely. + properties: Snapshot configuration (name, list of PVs, timing, etc.). `properties.pv_uri_list` must contain PV URLs prefixed with `pva://` or `ca://`. + handler: Callback `(publishing_topic, data_dict)` invoked at each tail. + timeout: Optional timeout waiting for the acknowledgment. Returns: - str: "ok" if the snapshot is successfully created and acknowledged. + dict: The acknowledgment reply dictionary. Raises: - ValueError: If any PV URI is not well-formed or uses an unsupported protocol. - OperationTimeout: If the operation times out before receiving an acknowledgment. - OperationError: If the server returns an error during snapshot creation. - - Example: - def my_handler(snapshot_id, data): - print(f"Snapshot {snapshot_id} data: {data}") - - dml_instance.snapshot_recurring( - "my_snapshot", - ["pva://my:pv1", "ca://my:pv2"], - my_handler, - timeout=5.0 - ) + ValueError: If input validation fails. + OperationTimeout: If the operation times out. + OperationError: If the server returns an error. """ #check if all the pv are wellformed self._check_pv_list(properties.pv_uri_list) @@ -650,8 +798,7 @@ def my_handler(snapshot_id, data): return result def snapshost_trigger(self, snapshot_name: str, timeout: float = None): - """ Trigger a new publishing of a specific snapshot - """ + """Trigger an on-demand publish for a recurring snapshot.""" self._validate_snapshot_name(snapshot_name) new_reply_id = str(uuid.uuid1()) with self.reply_wait_condition: @@ -677,8 +824,7 @@ def snapshost_trigger(self, snapshot_name: str, timeout: float = None): return result def snapshot_stop(self, snapshot_name: str, timeout: float = None): - """ Stop the snapshot operation - """ + """Stop a recurring snapshot and unsubscribe its publishing topic.""" self._validate_snapshot_name(snapshot_name) new_reply_id = str(uuid.uuid1()) with self.reply_wait_condition: @@ -713,8 +859,17 @@ def snapshot_stop(self, snapshot_name: str, timeout: float = None): return result def snapshot_sync(self, pv_uri_list: list[str], timeout: float = 10.0)->list[dict[str, Any]]: - """ Perform the snapshot operation - return the snapshot value synchronously + """Perform a one-shot snapshot and return results synchronously. + + Args: + pv_uri_list: List of PV URLs to snapshot. Each entry must be prefixed with `pva://` or `ca://`. + timeout: Overall timeout for receiving the assembled snapshot. + + Returns: + dict[str, Any]: Snapshot data per PV with `error` key set to 0 on success. + + Raises: + OperationTimeout: If the snapshot assembly times out. """ snapshot_id = None received_snapshot = None @@ -740,6 +895,7 @@ def internal_snapshot_handler(id, snapshot_data): return received_snapshot def close(self): + """Close the client, stop the consumer thread, and release resources.""" # signal thread to terminate if self.__thread is not None: self.__consume_data = False