From 35b2c8631970aed1ee777db288a6cbf16922de01 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Tue, 18 Nov 2025 16:16:32 -0700 Subject: [PATCH 01/15] Update: timeout default --- imdclient/IMDClient.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index a0f71dd..972ea07 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -54,7 +54,7 @@ class IMDClient: buffer_size : int (optional) :class:`IMDFrameBuffer` will be filled with as many :class:`IMDFrame` fit in `buffer_size` bytes [``10MB``] timeout : int, optional - Timeout for the socket in seconds [``5``] + Timeout for the socket in seconds [``600``] continue_after_disconnect : bool, optional [``None``] If True, the client will attempt to change the simulation engine's waiting behavior to non-blocking after the client disconnects. If False, the client will attempt to change it @@ -362,7 +362,7 @@ class BaseIMDProducer(threading.Thread): error_queue: queue.Queue Queue to hold errors produced by the producer thread timeout : int, optional - Timeout for the socket in seconds [``5``] + Timeout for the socket in seconds [``600``] """ def __init__( @@ -373,7 +373,7 @@ def __init__( n_atoms, multithreaded, error_queue, - timeout=5, + timeout=600, **kwargs, ): super(BaseIMDProducer, self).__init__(daemon=True) From c4703222b73da8595dffa34c8bd53dc178d29ecb Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Tue, 18 Nov 2025 18:26:45 -0700 Subject: [PATCH 02/15] Update: Error propogation and exception chaining improvements --- imdclient/IMDClient.py | 46 ++++++++++++++++++++++++++---------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index 972ea07..45ca437 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -169,21 +169,24 @@ def get_imdframe(self): if self._multithreaded: try: return self._buf.pop_full_imdframe() - except EOFError: + except EOFError as e: # in this case, consumer is already finished # and doesn't need to be notified + logger.debug(f"IMDClient: Multithreaded connection ended") self._disconnect() self._stopped = True if self._error_queue.qsize(): - raise EOFError(f"{self._error_queue.get()}") - raise EOFError + error = self._error_queue.get() + raise EOFError(str(error)) from error + raise EOFError from e else: try: return self._producer._get_imdframe() - except EOFError: + except EOFError as e: + logger.debug(f"IMDClient: Single-threaded connection ended") self._disconnect() - raise EOFError + raise EOFError from e def get_imdsessioninfo(self): """ @@ -241,7 +244,7 @@ def _await_IMD_handshake(self) -> IMDSessionInfo: read_into_buf(self._conn, h_buf) except (ConnectionError, TimeoutError, Exception) as e: logger.debug("IMDClient: No handshake packet received: %s", e) - raise ConnectionError("IMDClient: No handshake packet received") + raise ConnectionError("IMDClient: No handshake packet received") from e header = IMDHeader(h_buf) @@ -424,7 +427,8 @@ def _get_imdframe(self): try: self._parse_imdframe() except EOFError as e: - raise EOFError + logger.debug(f"IMDProducer: No more frames to read: {e}") + raise EOFError from e except Exception as e: raise RuntimeError("An unexpected error occurred") from e @@ -468,11 +472,12 @@ def run(self): self._frame, t.elapsed, ) - except EOFError: + except EOFError as e: # simulation ended in a way # that we expected # i.e. consumer stopped or read_into_buf didn't find # full token of data + logger.debug("IMDProducer: %s", e) logger.debug("IMDProducer: Simulation ended normally, cleaning up") except Exception as e: logger.debug("IMDProducer: An unexpected error occurred: %s", e) @@ -513,13 +518,19 @@ def _read(self, buf): """Wraps `read_into_buf` call to give uniform error handling which indicates end of stream""" try: read_into_buf(self._conn, buf) - except (ConnectionError, TimeoutError, BlockingIOError, Exception): # ConnectionError: Server is definitely done sending frames, socket is closed # TimeoutError: Server is *likely* done sending frames. # BlockingIOError: Occurs when timeout is 0 in place of a TimeoutError. Server is *likely* done sending frames # OSError: Occurs when main thread disconnects from the server and closes the socket, but producer thread attempts to read another frame # Exception: Something unexpected happened - raise EOFError + except ConnectionError as e: + raise EOFError("Server is definitely done sending frames") from e + except TimeoutError as e: + raise EOFError("Server is likely done sending frames") from e + except BlockingIOError as e: + raise EOFError("Server is likely done sending frames") from e + except Exception as e: + raise EOFError("Something unexpected happened") from e class IMDProducerV2(BaseIMDProducer): @@ -597,7 +608,7 @@ def _pause(self): self._conn.sendall(pause) except ConnectionResetError as e: # Simulation has already ended by the time we paused - raise EOFError + raise EOFError("Simulation has already ended by the time we paused") from e # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation is not actually paused but is over, but we still want to read remaining data @@ -612,7 +623,7 @@ def _unpause(self): # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation was never actually paused in this case and is now over - raise EOFError + raise EOFError("Simulation was never actually paused as pause was sent after the last frame; simulation is now over") from e # Edge case: pause & unpause occured in the time between server sends its last frame and closing socket # in this case, the simulation isn't actually unpaused but over @@ -662,7 +673,7 @@ def _pause(self): self._conn.sendall(pause) except ConnectionResetError as e: # Simulation has already ended by the time we paused - raise EOFError + raise EOFError("Simulation has already ended by the time we paused") from e # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation is not actually paused but is over, but we still want to read remaining data @@ -677,7 +688,7 @@ def _unpause(self): # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation was never actually paused in this case and is now over - raise EOFError + raise EOFError("Simulation was never actually paused as pause was sent after the last frame; simulation is now over") from e # Edge case: pause & unpause occured in the time between server sends its last frame and closing socket # in this case, the simulation isn't actually unpaused but over @@ -852,9 +863,10 @@ def wait_for_space(self): if self._consumer_finished: logger.debug("IMDProducer: Noticing consumer finished") - raise EOFError + raise EOFError("Consumer has finished") except Exception as e: logger.debug(f"IMDProducer: Error waiting for space in buffer: {e}") + raise RuntimeError("Error waiting for space in buffer") from e def pop_empty_imdframe(self): logger.debug("IMDProducer: Getting empty frame") @@ -870,7 +882,7 @@ def pop_empty_imdframe(self): if self._consumer_finished: logger.debug("IMDProducer: Noticing consumer finished") - raise EOFError + raise EOFError("Consumer has finished") return self._empty_q.get() @@ -905,7 +917,7 @@ def pop_full_imdframe(self): if self._producer_finished and self._full_q.qsize() == 0: logger.debug("IMDFrameBuffer(Consumer): Producer finished") - raise EOFError + raise EOFError("Producer has finished") imdf = self._full_q.get() From f51a86a3c3644e1f91a469bed3a4f309af96efac Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Tue, 18 Nov 2025 22:31:15 -0700 Subject: [PATCH 03/15] Add: small `timeout` warning message and Black formatting --- imdclient/IMDClient.py | 28 +++++++++++++++++++++++----- imdclient/tests/server.py | 6 ++++-- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index 45ca437..ae06d4c 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -73,6 +73,14 @@ def __init__( continue_after_disconnect=None, **kwargs, ): + + # Warn if timeout is overly optimistic + if "timeout" in kwargs and kwargs["timeout"] <= 1: + logger.warning( + f"IMDClient: timeout value of {kwargs['timeout']} second(s) is very low and may lead to " + "premature disconnection by the client. Consider using a higher value (default is 600 seconds)." + ) + self._stopped = False self._conn = self._connect_to_server(host, port, socket_bufsize) self._imdsinfo = self._await_IMD_handshake() @@ -244,7 +252,9 @@ def _await_IMD_handshake(self) -> IMDSessionInfo: read_into_buf(self._conn, h_buf) except (ConnectionError, TimeoutError, Exception) as e: logger.debug("IMDClient: No handshake packet received: %s", e) - raise ConnectionError("IMDClient: No handshake packet received") from e + raise ConnectionError( + "IMDClient: No handshake packet received" + ) from e header = IMDHeader(h_buf) @@ -608,7 +618,9 @@ def _pause(self): self._conn.sendall(pause) except ConnectionResetError as e: # Simulation has already ended by the time we paused - raise EOFError("Simulation has already ended by the time we paused") from e + raise EOFError( + "Simulation has already ended by the time we paused" + ) from e # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation is not actually paused but is over, but we still want to read remaining data @@ -623,7 +635,9 @@ def _unpause(self): # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation was never actually paused in this case and is now over - raise EOFError("Simulation was never actually paused as pause was sent after the last frame; simulation is now over") from e + raise EOFError( + "Simulation was never actually paused as pause was sent after the last frame; simulation is now over" + ) from e # Edge case: pause & unpause occured in the time between server sends its last frame and closing socket # in this case, the simulation isn't actually unpaused but over @@ -673,7 +687,9 @@ def _pause(self): self._conn.sendall(pause) except ConnectionResetError as e: # Simulation has already ended by the time we paused - raise EOFError("Simulation has already ended by the time we paused") from e + raise EOFError( + "Simulation has already ended by the time we paused" + ) from e # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation is not actually paused but is over, but we still want to read remaining data @@ -688,7 +704,9 @@ def _unpause(self): # Edge case: pause occured in the time between server sends its last frame # and closing socket # Simulation was never actually paused in this case and is now over - raise EOFError("Simulation was never actually paused as pause was sent after the last frame; simulation is now over") from e + raise EOFError( + "Simulation was never actually paused as pause was sent after the last frame; simulation is now over" + ) from e # Edge case: pause & unpause occured in the time between server sends its last frame and closing socket # in this case, the simulation isn't actually unpaused but over diff --git a/imdclient/tests/server.py b/imdclient/tests/server.py index 6c467c3..b515da5 100644 --- a/imdclient/tests/server.py +++ b/imdclient/tests/server.py @@ -37,7 +37,7 @@ def set_imdsessioninfo(self, imdsinfo): @property def port(self): """Get the port the server is bound to. - + Returns: int: The port number, or None if not bound yet. """ @@ -47,7 +47,9 @@ def handshake_sequence(self, host, first_frame=True): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, 0)) # Bind to port 0 to get a free port self._bound_port = s.getsockname()[1] # Store the actual bound port - logger.debug(f"InThreadIMDServer: Listening on {host}:{self._bound_port}") + logger.debug( + f"InThreadIMDServer: Listening on {host}:{self._bound_port}" + ) s.listen(60) self.listen_socket = s From 3fa4551d3664c897a6077da16eb55286d0e94484 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Wed, 19 Nov 2025 00:33:33 -0700 Subject: [PATCH 04/15] tests for `timeout` --- imdclient/tests/test_imdclient.py | 100 ++++++++++++++++++++++++++++-- 1 file changed, 95 insertions(+), 5 deletions(-) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index 656d08c..1e130bb 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -1,6 +1,7 @@ """Test for IMDClient functionality""" import logging +import time import pytest from numpy.testing import ( @@ -178,16 +179,105 @@ def test_continue_after_disconnect(self, universe, imdsinfo, cont): IMDHeaderType.IMD_WAIT, expected_length=(int)(not cont) ) - def test_incorrect_atom_count(self, server_client_incorrect_atoms, universe): + def test_timeout_warning_low_value(self, universe, imdsinfo, caplog): + """Test that warning is issued for timeout values <= 1 second""" + server = InThreadIMDServer(universe.trajectory) + server.set_imdsessioninfo(imdsinfo) + server.handshake_sequence("localhost", first_frame=False) + + with caplog.at_level(logging.WARNING): + client = IMDClient( + f"localhost", + server.port, + universe.trajectory.n_atoms, + timeout=1, + ) + + server.join_accept_thread() + + # Check that warning was logged + assert any( + "timeout value of 1 second(s) is very low" in record.message + for record in caplog.records + ) + + client.stop() + server.cleanup() + + @pytest.mark.parametrize("timeout_val", [2, 10]) + def test_timeout_within_limit(self, universe, imdsinfo, timeout_val): + """Test that timeout does not trigger when server responds within timeout period""" + server = InThreadIMDServer(universe.trajectory) + server.set_imdsessioninfo(imdsinfo) + server.handshake_sequence("localhost", first_frame=False) + client = IMDClient( + f"localhost", + server.port, + universe.trajectory.n_atoms, + timeout=timeout_val, + ) + server.join_accept_thread() + + # Sleep for less than timeout before sending frames + time.sleep(timeout_val - 1) + server.send_frames(0) + + # Should successfully receive frame without timeout + imdf = client.get_imdframe() + assert_allclose(universe.trajectory[0].positions, imdf.positions) + + yield server, client + client.stop() + server.cleanup() + + @pytest.mark.parametrize("timeout_val", [2, 10]) + def test_timeout_when_exceeded(self, universe, imdsinfo, timeout_val): + """Test that timeout triggers EOFError when server doesn't respond within timeout period""" + server = InThreadIMDServer(universe.trajectory) + server.set_imdsessioninfo(imdsinfo) + server.handshake_sequence("localhost", first_frame=False) + client = IMDClient( + f"localhost", + server.port, + universe.trajectory.n_atoms, + timeout=timeout_val, + ) + server.join_accept_thread() + + # Sleep for longer than timeout without sending any frames + time.sleep(timeout_val + 1) + + # Client should timeout and raise EOFError when trying to get first frame + with pytest.raises(EOFError) as exc_info: + client.get_imdframe() + + # Verify TimeoutError is somewhere in the exception chain + exception_chain = [] + current = exc_info.value + while current is not None: + exception_chain.append(type(current)) + current = current.__cause__ + + assert TimeoutError in exception_chain + + yield server, client + client.stop() + server.cleanup() + + def test_incorrect_atom_count( + self, server_client_incorrect_atoms, universe + ): server, client = server_client_incorrect_atoms - + server.send_frame(0) - + with pytest.raises(EOFError) as exc_info: client.get_imdframe() - + error_msg = str(exc_info.value) - assert f"Expected n_atoms value {universe.atoms.n_atoms + 1}" in error_msg + assert ( + f"Expected n_atoms value {universe.atoms.n_atoms + 1}" in error_msg + ) assert f"got {universe.atoms.n_atoms}" in error_msg assert "Ensure you are using the correct topology file" in error_msg From 1b810c8cd8ea317f1cc26bb92fc71b1b07a5582d Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Wed, 19 Nov 2025 11:43:36 -0700 Subject: [PATCH 05/15] Remove: unnecessary `yield`s --- imdclient/tests/test_imdclient.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index 1e130bb..154074d 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -226,7 +226,6 @@ def test_timeout_within_limit(self, universe, imdsinfo, timeout_val): imdf = client.get_imdframe() assert_allclose(universe.trajectory[0].positions, imdf.positions) - yield server, client client.stop() server.cleanup() @@ -260,7 +259,6 @@ def test_timeout_when_exceeded(self, universe, imdsinfo, timeout_val): assert TimeoutError in exception_chain - yield server, client client.stop() server.cleanup() From 5068f24abf5bcb555479bd9a23c77266cb686f76 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Fri, 21 Nov 2025 01:29:49 -0700 Subject: [PATCH 06/15] Add: more error logging in queue --- imdclient/IMDClient.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index ae06d4c..1efa567 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -184,10 +184,12 @@ def get_imdframe(self): self._disconnect() self._stopped = True - if self._error_queue.qsize(): - error = self._error_queue.get() + try: + error = self._error_queue.get_nowait() + except queue.Empty: + raise EOFError from e + else: raise EOFError(str(error)) from error - raise EOFError from e else: try: return self._producer._get_imdframe() @@ -488,6 +490,7 @@ def run(self): # i.e. consumer stopped or read_into_buf didn't find # full token of data logger.debug("IMDProducer: %s", e) + self.error_queue.put(e) logger.debug("IMDProducer: Simulation ended normally, cleaning up") except Exception as e: logger.debug("IMDProducer: An unexpected error occurred: %s", e) From 51dd4265f06e8094c694596251968abfaef15a17 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Fri, 21 Nov 2025 01:30:22 -0700 Subject: [PATCH 07/15] Overhaul: tests with fixture factory for `server_client` --- imdclient/tests/test_imdclient.py | 154 +++++++++++------------------- 1 file changed, 57 insertions(+), 97 deletions(-) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index 154074d..6a93d8c 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -55,55 +55,62 @@ def imdsinfo(self): return create_default_imdsinfo_v3() @pytest.fixture - def server_client_two_frame_buf(self, universe, imdsinfo): - server = InThreadIMDServer(universe.trajectory) - server.set_imdsessioninfo(imdsinfo) - server.handshake_sequence("localhost", first_frame=False) - client = IMDClient( - f"localhost", - server.port, - universe.trajectory.n_atoms, - buffer_size=imdframe_memsize(universe.trajectory.n_atoms, imdsinfo) - * 2, + def server_client(self, universe, imdsinfo): + created = [] + + def _server_client(endianness=None, **client_kwargs): + server = InThreadIMDServer(universe.trajectory) + if endianness is not None: + imdsinfo.endianness = endianness + server.set_imdsessioninfo(imdsinfo) + + n_atoms = client_kwargs.pop("n_atoms", universe.atoms.n_atoms) + server.handshake_sequence("localhost", first_frame=False) + client = IMDClient( + "localhost", + server.port, + n_atoms, + **client_kwargs, + ) + server.join_accept_thread() + created.append((server, client)) + return server, client + + yield _server_client + + # Teardown: stop clients and cleanup servers + for server, client in created: + try: + client.stop() + except Exception: + pass + try: + server.cleanup() + except Exception: + pass + + @pytest.fixture + def server_client_two_frame_buf(self, server_client, universe, imdsinfo): + # Calculate the buffer size + buffer_size = ( + imdframe_memsize(universe.trajectory.n_atoms, imdsinfo) * 2 ) - server.join_accept_thread() + timeout = 5 # to speed up no disconnect test + server, client = server_client(buffer_size=buffer_size, timeout=timeout) yield server, client - client.stop() - server.cleanup() - @pytest.fixture(params=[">", "<"]) - def server_client(self, universe, imdsinfo, request): - server = InThreadIMDServer(universe.trajectory) - imdsinfo.endianness = request.param - server.set_imdsessioninfo(imdsinfo) - server.handshake_sequence("localhost", first_frame=False) - client = IMDClient( - f"localhost", - server.port, - universe.atoms.n_atoms, - ) - server.join_accept_thread() + @pytest.fixture(params=["<", ">"]) + def server_client_endianness(self, server_client, request): + server, client = server_client(endianness=request.param) yield server, client - client.stop() - server.cleanup() @pytest.fixture - def server_client_incorrect_atoms(self, universe, imdsinfo): - server = InThreadIMDServer(universe.trajectory) - server.set_imdsessioninfo(imdsinfo) - server.handshake_sequence("localhost", first_frame=False) - client = IMDClient( - f"localhost", - server.port, - universe.atoms.n_atoms + 1, - ) - server.join_accept_thread() + def server_client_incorrect_atoms(self, server_client, universe): + server, client = server_client(n_atoms=universe.trajectory.n_atoms + 1) yield server, client - client.stop() - server.cleanup() - def test_traj_unchanged(self, server_client, universe): - server, client = server_client + def test_traj_unchanged(self, server_client_endianness, universe): + server, client = server_client_endianness server.send_frames(0, 5) for i in range(5): imdf = client.get_imdframe() @@ -164,36 +171,16 @@ def test_pause_resume_no_disconnect(self, server_client_two_frame_buf): server.expect_packet(IMDHeaderType.IMD_DISCONNECT) @pytest.mark.parametrize("cont", [True, False]) - def test_continue_after_disconnect(self, universe, imdsinfo, cont): - server = InThreadIMDServer(universe.trajectory) - server.set_imdsessioninfo(imdsinfo) - server.handshake_sequence("localhost", first_frame=False) - client = IMDClient( - f"localhost", - server.port, - universe.trajectory.n_atoms, - continue_after_disconnect=cont, - ) - server.join_accept_thread() + def test_continue_after_disconnect(self, server_client, cont): + server, client = server_client(continue_after_disconnect=cont) server.expect_packet( IMDHeaderType.IMD_WAIT, expected_length=(int)(not cont) ) - def test_timeout_warning_low_value(self, universe, imdsinfo, caplog): + def test_timeout_warning_low_value(self, server_client, caplog): """Test that warning is issued for timeout values <= 1 second""" - server = InThreadIMDServer(universe.trajectory) - server.set_imdsessioninfo(imdsinfo) - server.handshake_sequence("localhost", first_frame=False) - with caplog.at_level(logging.WARNING): - client = IMDClient( - f"localhost", - server.port, - universe.trajectory.n_atoms, - timeout=1, - ) - - server.join_accept_thread() + server, client = server_client(timeout=1) # Check that warning was logged assert any( @@ -201,47 +188,23 @@ def test_timeout_warning_low_value(self, universe, imdsinfo, caplog): for record in caplog.records ) - client.stop() - server.cleanup() - @pytest.mark.parametrize("timeout_val", [2, 10]) - def test_timeout_within_limit(self, universe, imdsinfo, timeout_val): + def test_timeout_within_limit(self, server_client, universe, timeout_val): """Test that timeout does not trigger when server responds within timeout period""" - server = InThreadIMDServer(universe.trajectory) - server.set_imdsessioninfo(imdsinfo) - server.handshake_sequence("localhost", first_frame=False) - client = IMDClient( - f"localhost", - server.port, - universe.trajectory.n_atoms, - timeout=timeout_val, - ) - server.join_accept_thread() + server, client = server_client(timeout=timeout_val) # Sleep for less than timeout before sending frames time.sleep(timeout_val - 1) - server.send_frames(0) + server.send_frame(0) # Should successfully receive frame without timeout imdf = client.get_imdframe() assert_allclose(universe.trajectory[0].positions, imdf.positions) - client.stop() - server.cleanup() - @pytest.mark.parametrize("timeout_val", [2, 10]) - def test_timeout_when_exceeded(self, universe, imdsinfo, timeout_val): + def test_timeout_when_exceeded(self, server_client, timeout_val): """Test that timeout triggers EOFError when server doesn't respond within timeout period""" - server = InThreadIMDServer(universe.trajectory) - server.set_imdsessioninfo(imdsinfo) - server.handshake_sequence("localhost", first_frame=False) - client = IMDClient( - f"localhost", - server.port, - universe.trajectory.n_atoms, - timeout=timeout_val, - ) - server.join_accept_thread() + server, client = server_client(timeout=timeout_val) # Sleep for longer than timeout without sending any frames time.sleep(timeout_val + 1) @@ -259,9 +222,6 @@ def test_timeout_when_exceeded(self, universe, imdsinfo, timeout_val): assert TimeoutError in exception_chain - client.stop() - server.cleanup() - def test_incorrect_atom_count( self, server_client_incorrect_atoms, universe ): From 518e8d230f0c0f3e4f6b6189bc9f34cf28842105 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Tue, 26 May 2026 12:41:35 -0700 Subject: [PATCH 08/15] Test single-threaded client --- imdclient/tests/test_imdclient.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index 6a93d8c..a9f8ce3 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -239,6 +239,23 @@ def test_incorrect_atom_count( assert f"got {universe.atoms.n_atoms}" in error_msg assert "Ensure you are using the correct topology file" in error_msg + def test_single_threaded_client_reads_frame_and_eof( + self, server_client, universe + ): + server, client = server_client(multithreaded=False) + + server.send_frame(0) + + # Check one frame data by comparing positions + imdf = client.get_imdframe() + assert_allclose(universe.trajectory[0].positions, imdf.positions) + + # diconnect to check EOF handling in singlethreaded code + server.disconnect() + + with pytest.raises(EOFError): + client.get_imdframe() + class TestIMDClientV3ContextManager: @pytest.fixture From 64c84c6fddb834fcb30bb2d32a3941527a1e4aea Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Tue, 26 May 2026 13:18:22 -0700 Subject: [PATCH 09/15] Test pop_empty_imdframe() when _consumer_finished --- imdclient/tests/test_imdclient.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index a9f8ce3..c9da872 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -13,7 +13,11 @@ COORDINATES_H5MD, ) -from imdclient.IMDClient import imdframe_memsize, IMDClient +from imdclient.IMDClient import ( + IMDFrameBuffer, + imdframe_memsize, + IMDClient, +) from imdclient.IMDProtocol import IMDHeaderType from .utils import ( create_default_imdsinfo_v3, @@ -302,3 +306,21 @@ def test_context_manager_traj_unchanged(self, server, universe): i += 1 server.expect_packet(IMDHeaderType.IMD_DISCONNECT) assert i == 5 + + +class TestIMDFrameBuffer: + @pytest.fixture + def imdsinfo(self): + return create_default_imdsinfo_v3() + + def test_pop_empty_imdframe_raises_when_consumer_finished( + self, imdsinfo + ): + buffer_size = imdframe_memsize(1, imdsinfo) + buffer = IMDFrameBuffer(imdsinfo, n_atoms=1, buffer_size=buffer_size) + + buffer.pop_empty_imdframe() + buffer.notify_consumer_finished() + + with pytest.raises(EOFError, match="Consumer has finished"): + buffer.pop_empty_imdframe() From 869a90bd3069cc9f3099b2816d29ea0950801324 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Wed, 27 May 2026 09:46:56 -0700 Subject: [PATCH 10/15] Test _get_imdframe() scenarios --- imdclient/tests/test_imdclient.py | 74 +++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index c9da872..85e65ba 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -324,3 +324,77 @@ def test_pop_empty_imdframe_raises_when_consumer_finished( with pytest.raises(EOFError, match="Consumer has finished"): buffer.pop_empty_imdframe() + + +class TestBaseIMDProducer: + @pytest.fixture + def universe(self): + return mda.Universe(COORDINATES_TOPOLOGY, COORDINATES_H5MD) + + @pytest.fixture + def imdsinfo(self): + return create_default_imdsinfo_v3() + + @pytest.fixture + def server_client(self, universe, imdsinfo): + created = [] + + def _server_client(endianness=None, **client_kwargs): + server = InThreadIMDServer(universe.trajectory) + if endianness is not None: + imdsinfo.endianness = endianness + server.set_imdsessioninfo(imdsinfo) + + n_atoms = client_kwargs.pop("n_atoms", universe.atoms.n_atoms) + server.handshake_sequence("localhost", first_frame=False) + client = IMDClient( + "localhost", + server.port, + n_atoms, + **client_kwargs, + ) + server.join_accept_thread() + created.append((server, client)) + return server, client + + yield _server_client + + for server, client in created: + try: + client.stop() + except Exception: + pass + try: + server.cleanup() + except Exception: + pass + + def test_get_imdframe_reraises_eoferror(self, server_client, monkeypatch): + server, client = server_client(multithreaded=False) + + def _raise_eoferror(): + raise EOFError("test EOF") + + monkeypatch.setattr(client._producer, "_parse_imdframe", _raise_eoferror) + + with pytest.raises(EOFError): + client._producer._get_imdframe() + + def test_get_imdframe_wraps_unexpected_errors( + self, server_client, monkeypatch + ): + server, client = server_client(multithreaded=False) + + def _raise_valueerror(): + raise ValueError("test ValueError") + + monkeypatch.setattr( + client._producer, + "_parse_imdframe", + _raise_valueerror, + ) + + with pytest.raises(RuntimeError, match="An unexpected error occurred"): + client._producer._get_imdframe() + + From 82a43a8ca7cc8c4bc86d0c5957fd5cb45b0c53fb Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Wed, 27 May 2026 11:36:22 -0700 Subject: [PATCH 11/15] black reformat --- imdclient/tests/test_imdclient.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index 85e65ba..8971b29 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -99,7 +99,7 @@ def server_client_two_frame_buf(self, server_client, universe, imdsinfo): buffer_size = ( imdframe_memsize(universe.trajectory.n_atoms, imdsinfo) * 2 ) - timeout = 5 # to speed up no disconnect test + timeout = 5 # to speed up no disconnect test server, client = server_client(buffer_size=buffer_size, timeout=timeout) yield server, client @@ -313,9 +313,7 @@ class TestIMDFrameBuffer: def imdsinfo(self): return create_default_imdsinfo_v3() - def test_pop_empty_imdframe_raises_when_consumer_finished( - self, imdsinfo - ): + def test_pop_empty_imdframe_raises_when_consumer_finished(self, imdsinfo): buffer_size = imdframe_memsize(1, imdsinfo) buffer = IMDFrameBuffer(imdsinfo, n_atoms=1, buffer_size=buffer_size) @@ -375,7 +373,9 @@ def test_get_imdframe_reraises_eoferror(self, server_client, monkeypatch): def _raise_eoferror(): raise EOFError("test EOF") - monkeypatch.setattr(client._producer, "_parse_imdframe", _raise_eoferror) + monkeypatch.setattr( + client._producer, "_parse_imdframe", _raise_eoferror + ) with pytest.raises(EOFError): client._producer._get_imdframe() @@ -396,5 +396,3 @@ def _raise_valueerror(): with pytest.raises(RuntimeError, match="An unexpected error occurred"): client._producer._get_imdframe() - - From 821d682cc0f10ea568c90e5b4908119aa2619b2e Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Sun, 21 Jun 2026 16:12:50 -0700 Subject: [PATCH 12/15] Doc: EOF handling note for `get_imdframe()` --- imdclient/IMDClient.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index 1efa567..3474c11 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -172,7 +172,14 @@ def get_imdframe(self): Raises ------ EOFError - If there are no more frames to read from the stream + If there are no more frames to read from the stream, or if the + producer thread stopped due to an error. + + For API stability, errors raised in the producer thread (including + :class:`RuntimeError` for mismatches such as incorrect + atom counts) are raised eventually as ``EOFError``s. The original + exception message in the case fo a multithreaded client is available + in the error queue. """ if self._multithreaded: try: From 4a920a2fe5e57ea18d9412ab33b63a0256f22df1 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Sun, 21 Jun 2026 16:22:48 -0700 Subject: [PATCH 13/15] Add: test for `pop_full_imdframe` with porducer finished --- imdclient/tests/test_imdclient.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index 8971b29..2bc04f4 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -323,6 +323,15 @@ def test_pop_empty_imdframe_raises_when_consumer_finished(self, imdsinfo): with pytest.raises(EOFError, match="Consumer has finished"): buffer.pop_empty_imdframe() + def test_pop_full_imdframe_raises_when_producer_finished(self, imdsinfo): + buffer_size = imdframe_memsize(1, imdsinfo) + buffer = IMDFrameBuffer(imdsinfo, n_atoms=1, buffer_size=buffer_size) + + buffer.notify_producer_finished() + + with pytest.raises(EOFError, match="Producer has finished"): + buffer.pop_full_imdframe() + class TestBaseIMDProducer: @pytest.fixture From 855625a4ac4592238d9f948b29680312dc729c67 Mon Sep 17 00:00:00 2001 From: Amruthesh Thirumalaiswamy Date: Sun, 21 Jun 2026 20:46:07 -0700 Subject: [PATCH 14/15] Tests: new and some restructure - added new tests for `wait_for_space` and `_await_IMD_handshake` - restrcuture TestBaseIMDProducer tests into TestIMDClientV3 --- imdclient/tests/test_imdclient.py | 141 +++++++++++++++--------------- 1 file changed, 71 insertions(+), 70 deletions(-) diff --git a/imdclient/tests/test_imdclient.py b/imdclient/tests/test_imdclient.py index 2bc04f4..c58dc93 100644 --- a/imdclient/tests/test_imdclient.py +++ b/imdclient/tests/test_imdclient.py @@ -1,6 +1,7 @@ """Test for IMDClient functionality""" import logging +import sys import time import pytest @@ -181,6 +182,28 @@ def test_continue_after_disconnect(self, server_client, cont): IMDHeaderType.IMD_WAIT, expected_length=(int)(not cont) ) + @pytest.mark.parametrize("read_error", [TimeoutError, ConnectionError]) + def test_await_IMD_handshake_error_on_read_buf_failure( + self, server_client, monkeypatch, read_error + ): + _, client = server_client() + + def _raise_read_error(sock, buf): + raise read_error() + + monkeypatch.setattr( + sys.modules[client.__class__.__module__], + "read_into_buf", + _raise_read_error, + ) + + with pytest.raises( + ConnectionError, match="No handshake packet received" + ) as exc_info: + client._await_IMD_handshake() + + assert isinstance(exc_info.value.__cause__, read_error) + def test_timeout_warning_low_value(self, server_client, caplog): """Test that warning is issued for timeout values <= 1 second""" with caplog.at_level(logging.WARNING): @@ -260,6 +283,38 @@ def test_single_threaded_client_reads_frame_and_eof( with pytest.raises(EOFError): client.get_imdframe() + def test_single_threaded_get_imdframe_reraises_eoferror( + self, server_client, monkeypatch + ): + _server, client = server_client(multithreaded=False) + + def _raise_eoferror(): + raise EOFError("test EOF") + + monkeypatch.setattr( + client._producer, "_parse_imdframe", _raise_eoferror + ) + + with pytest.raises(EOFError): + client._producer._get_imdframe() + + def test_single_threaded_get_imdframe_wraps_unexpected_errors( + self, server_client, monkeypatch + ): + _server, client = server_client(multithreaded=False) + + def _raise_valueerror(): + raise ValueError("test ValueError") + + monkeypatch.setattr( + client._producer, + "_parse_imdframe", + _raise_valueerror, + ) + + with pytest.raises(RuntimeError, match="An unexpected error occurred"): + client._producer._get_imdframe() + class TestIMDClientV3ContextManager: @pytest.fixture @@ -332,76 +387,22 @@ def test_pop_full_imdframe_raises_when_producer_finished(self, imdsinfo): with pytest.raises(EOFError, match="Producer has finished"): buffer.pop_full_imdframe() + # simple test below to cover Error raising but doesn't test if the Error is raised + # when consumer finishes while the `wait_for_space()` is running + # TODO: add threaded test to properly test this scenario + def test_wait_for_space_raises_when_consumer_finished(self, imdsinfo): + buffer_size = imdframe_memsize(1, imdsinfo) * 2 + buffer = IMDFrameBuffer(imdsinfo, n_atoms=1, buffer_size=buffer_size) -class TestBaseIMDProducer: - @pytest.fixture - def universe(self): - return mda.Universe(COORDINATES_TOPOLOGY, COORDINATES_H5MD) - - @pytest.fixture - def imdsinfo(self): - return create_default_imdsinfo_v3() - - @pytest.fixture - def server_client(self, universe, imdsinfo): - created = [] - - def _server_client(endianness=None, **client_kwargs): - server = InThreadIMDServer(universe.trajectory) - if endianness is not None: - imdsinfo.endianness = endianness - server.set_imdsessioninfo(imdsinfo) - - n_atoms = client_kwargs.pop("n_atoms", universe.atoms.n_atoms) - server.handshake_sequence("localhost", first_frame=False) - client = IMDClient( - "localhost", - server.port, - n_atoms, - **client_kwargs, - ) - server.join_accept_thread() - created.append((server, client)) - return server, client - - yield _server_client - - for server, client in created: - try: - client.stop() - except Exception: - pass - try: - server.cleanup() - except Exception: - pass - - def test_get_imdframe_reraises_eoferror(self, server_client, monkeypatch): - server, client = server_client(multithreaded=False) - - def _raise_eoferror(): - raise EOFError("test EOF") - - monkeypatch.setattr( - client._producer, "_parse_imdframe", _raise_eoferror - ) - - with pytest.raises(EOFError): - client._producer._get_imdframe() - - def test_get_imdframe_wraps_unexpected_errors( - self, server_client, monkeypatch - ): - server, client = server_client(multithreaded=False) - - def _raise_valueerror(): - raise ValueError("test ValueError") + # Drain empty slots so buffer is below unpause threshold + buffer.pop_empty_imdframe() + buffer.pop_empty_imdframe() + buffer.notify_consumer_finished() - monkeypatch.setattr( - client._producer, - "_parse_imdframe", - _raise_valueerror, - ) + with pytest.raises( + RuntimeError, match="Error waiting for space in buffer" + ) as exc_info: + buffer.wait_for_space() - with pytest.raises(RuntimeError, match="An unexpected error occurred"): - client._producer._get_imdframe() + assert isinstance(exc_info.value.__cause__, EOFError) + assert "Consumer has finished" in str(exc_info.value.__cause__) From 71ccddbe21d34efe6091354737da46873242ccb7 Mon Sep 17 00:00:00 2001 From: Oliver Beckstein Date: Mon, 22 Jun 2026 18:00:29 -0700 Subject: [PATCH 15/15] Update imdclient/IMDClient.py --- imdclient/IMDClient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/imdclient/IMDClient.py b/imdclient/IMDClient.py index 3474c11..d158a7f 100644 --- a/imdclient/IMDClient.py +++ b/imdclient/IMDClient.py @@ -178,7 +178,7 @@ def get_imdframe(self): For API stability, errors raised in the producer thread (including :class:`RuntimeError` for mismatches such as incorrect atom counts) are raised eventually as ``EOFError``s. The original - exception message in the case fo a multithreaded client is available + exception message in the case for a multithreaded client is available in the error queue. """ if self._multithreaded: