diff --git a/README.rst b/README.rst index 5745700..d064d90 100644 --- a/README.rst +++ b/README.rst @@ -45,8 +45,8 @@ The CLI utility ``telnetlib3-client`` is provided for connecting to servers and ``telnetlib3-server`` for hosting a server. Both tools accept the argument ``--shell=my_module.fn_shell`` describing a python module path to a -function of signature ``async def shell(reader, writer)``. The server also provides ``--pty-exec`` -argument to host stand-alone programs. +function of signature ``async def shell(reader, writer)``. The server also provides a +``--pty-exec`` argument allowing it to act as a telnet server for any CLI/TUI programs. :: @@ -59,7 +59,7 @@ argument to host stand-alone programs. # automatic script communicates with a server telnetlib3-client --shell bin.client_wargame.shell 1984.ws 666 - # run a server bound with the default shell bound to 127.0.0.1 6023 + # run a default shell server bound to 127.0.0.1 6023 telnetlib3-server # or custom ip, port and shell @@ -112,6 +112,48 @@ connected to a TCP socket without any telnet negotiation may require "raw" mode telnetlib3-client --raw-mode area52.tk 5200 --encoding=atascii +Go-Ahead (GA) +~~~~~~~~~~~~~ + +When a client does not negotiate Suppress Go-Ahead (SGA), the server sends +``IAC GA`` after output to signal that the client may transmit. This is +correct behavior for MUD clients like Mudlet that expect prompt detection +via GA. + +If GA causes unwanted output for your use case, disable it:: + + telnetlib3-server --never-send-ga + +For PTY shells, GA is sent after 500ms of output idle time to avoid +injecting GA in the middle of streaming output. + +Compression (MCCP) +~~~~~~~~~~~~~~~~~~ + +MCCP2 (server-to-client) and MCCP3 (client-to-server) zlib compression are +supported, widely used by MUD servers to reduce bandwidth:: + + # connect to a MUD that offers MCCP compression + telnetlib3-client dunemud.net 6789 + + # or with TLS (compression auto-disabled over TLS, CRIME/BREACH mitigation) + telnetlib3-client --ssl dunemud.net 6788 + + # actively request compression from a server + telnetlib3-client --compression dunemud.net 6789 + + # reject compression even if the server offers it + telnetlib3-client --no-compression dunemud.net 6789 + + # host a MUD server that advertises MCCP2/MCCP3 + telnetlib3-server --compression --shell=my_mud.shell + +By default (without ``--compression`` or ``--no-compression``), the client +passively accepts compression when offered by the server, and the server does +not advertise compression. Compression is automatically disabled over TLS +connections to avoid CRIME/BREACH attacks. + + Asyncio Protocol ---------------- @@ -139,8 +181,10 @@ To migrate code, change import statements: # NEW imports: import telnetlib3 -This library *also* provides an additional client (and server) API through a similar interface but -offering more advanced negotiation features and options. See `sync API documentation`_ for more. +``telnetlib3`` did not provide server support, while this library also provides +both client and server support through a similar Blocking API interface. + +See `sync API documentation`_ for details. Quick Example ============= diff --git a/docs/history.rst b/docs/history.rst index 3862be7..0d577b7 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,5 +1,9 @@ History ======= +3.0.1 (unreleased) + * new: MCCP2 and MCCP3. Both client and server ends passively support if requested, and request + support by --compression or deny support by --no-compression. + 3.0.0 * change: :attr:`~telnetlib3.client_base.BaseClient.connect_minwait` default now 0 (was 1.0 seconds in library API). diff --git a/docs/rfcs.rst b/docs/rfcs.rst index 22188fc..71eccc7 100644 --- a/docs/rfcs.rst +++ b/docs/rfcs.rst @@ -81,16 +81,23 @@ Dungeon) servers and clients. * `MSSP`_ (MUD Server Status Protocol, option 70). Server metadata protocol for MUD crawlers and directories, providing server name, player count, codebase, and other listing information. +* `MCCP2`_ (MUD Client Compression Protocol v2, option 86). Server-to-client + zlib compression, reducing bandwidth for output-heavy sessions. Activated + via ``IAC SB MCCP2 IAC SE``; all subsequent server output is compressed. +* `MCCP3`_ (MUD Client Compression Protocol v3, option 87). Client-to-server + zlib compression, the reverse direction of MCCP2. .. _GMCP: https://www.gammon.com.au/gmcp .. _MSDP: https://tintin.mudhalla.net/protocols/msdp/ .. _MSSP: https://tintin.mudhalla.net/protocols/mssp/ +.. _MCCP2: https://tintin.mudhalla.net/protocols/mccp/ +.. _MCCP3: https://tintin.mudhalla.net/protocols/mccp/ MUDs Not Implemented -------------------- Constants are also defined for the following MUD options, though their handlers -are not implemented: MCCP/MCCP2 (85/86, compression), MXP (91, markup), ZMP +are not implemented: MCCP (85, legacy compression), MXP (91, markup), ZMP (93, messaging), MSP (90, sound), and ATCP (200, Achaea-specific). Additional Resources diff --git a/telnetlib3/_base.py b/telnetlib3/_base.py index 39cc58f..a845a79 100644 --- a/telnetlib3/_base.py +++ b/telnetlib3/_base.py @@ -45,6 +45,9 @@ def _process_data_chunk( or ``None`` when only IAC (255) is special. :param log_fn: Callable for logging exceptions (e.g. ``logger.warning``). :returns: ``True`` if any IAC/SB command was observed. + + When MCCP2 is activated mid-chunk, the remaining compressed bytes are + stored in ``writer._compressed_remainder`` for the caller to consume. """ cmd_received = False n = len(data) @@ -84,6 +87,12 @@ def _process_data_chunk( out_start = i feeding_oob = bool(writer.is_oob) + if writer._mccp2_activated: + writer._mccp2_activated = False + writer.mccp2_active = True + writer._compressed_remainder = data[i:] if i < n else b"" + return True + return cmd_received diff --git a/telnetlib3/client.py b/telnetlib3/client.py index d0aa84c..d8540c6 100755 --- a/telnetlib3/client.py +++ b/telnetlib3/client.py @@ -65,6 +65,7 @@ def __init__( force_binary: bool = False, connect_minwait: float = 0, connect_maxwait: float = 4.0, + compression: Optional[bool] = None, limit: Optional[int] = None, waiter_closed: Optional[asyncio.Future[None]] = None, _waiter_connected: Optional[asyncio.Future[None]] = None, @@ -72,6 +73,7 @@ def __init__( gmcp_log: bool = False, ) -> None: """Initialize TelnetClient with terminal parameters.""" + self._compression = compression super().__init__( shell=shell, encoding=encoding, @@ -118,6 +120,9 @@ def connection_made(self, transport: asyncio.BaseTransport) -> None: super().connection_made(transport) + # Set compression policy on writer + self.writer.compression = self._compression + # Wire extended rfc callbacks for requests of # terminal attributes, environment values, etc. for opt, func in ( @@ -472,6 +477,7 @@ async def open_connection( connect_minwait: float = 0, connect_maxwait: float = 3.0, connect_timeout: Optional[float] = None, + compression: Optional[bool] = None, waiter_closed: Optional[asyncio.Future[None]] = None, _waiter_connected: Optional[asyncio.Future[None]] = None, limit: Optional[int] = None, @@ -531,6 +537,9 @@ async def open_connection( connection attempt may block indefinitely. When specified, a :exc:`ConnectionError` is raised if the connection is not established within the given time. + :param compression: MCCP compression policy. ``None`` (default) passively + accepts compression when offered by the server. ``True`` actively + requests MCCP2/MCCP3. ``False`` rejects all compression offers. :param force_binary: When ``True``, the encoding is used regardless of BINARY mode negotiation. @@ -565,6 +574,7 @@ def connection_factory() -> client_base.BaseClient: shell=shell, connect_minwait=connect_minwait, connect_maxwait=connect_maxwait, + compression=compression, waiter_closed=waiter_closed, _waiter_connected=_waiter_connected, limit=limit, @@ -897,6 +907,13 @@ def _get_argument_parser() -> argparse.ArgumentParser: "keys instead of encoding-specific control codes. Use for " "BBSes that expect ANSI cursor sequences.", ) + parser.add_argument( + "--compression", + action=argparse.BooleanOptionalAction, + default=None, + help="MCCP compression: --compression to request, --no-compression to reject, " + "omit to passively accept (default)", + ) parser.add_argument( "--ssl", action="store_true", default=False, help="connect using TLS (TELNETS)" ) @@ -1029,6 +1046,7 @@ def _transform_args(args: argparse.Namespace) -> Dict[str, Any]: if args.gmcp_modules else None ), + "compression": args.compression, "gmcp_log": args.gmcp_log, "typescript": args.typescript, } diff --git a/telnetlib3/client_base.py b/telnetlib3/client_base.py index fc7a357..5618583 100644 --- a/telnetlib3/client_base.py +++ b/telnetlib3/client_base.py @@ -3,6 +3,7 @@ from __future__ import annotations # std imports +import zlib import asyncio import logging import weakref @@ -66,6 +67,12 @@ def __init__( self.writer: Optional[Union[TelnetWriter, TelnetWriterUnicode]] = None self._limit = limit + # MCCP2: server→client decompression + self._mccp2_decompressor: Optional[zlib.Decompress] = None + # MCCP3: client→server compression + self._mccp3_compressor: Optional[zlib.Compress] = None + self._mccp3_orig_write: Any = None + # High-throughput receive pipeline self._rx_queue: collections.deque[bytes] = collections.deque() self._rx_bytes = 0 @@ -93,6 +100,11 @@ def connection_lost(self, exc: Optional[Exception]) -> None: return self._closing = True + # Clean up MCCP compressors/decompressors + self._mccp2_decompressor = None + self._mccp3_compressor = None + self._mccp3_orig_write = None + # Drain any pending rx data before signalling EOF to prevent # _process_rx from calling feed_data() after feed_eof(). self._rx_queue.clear() @@ -343,6 +355,26 @@ def _process_chunk(self, data: bytes) -> bool: """Process a chunk of received bytes; return True if any IAC/SB cmd observed.""" self._last_received = datetime.datetime.now() + # MCCP2: decompress server→client data when active + if self._mccp2_decompressor is not None: + try: + data = self._mccp2_decompressor.decompress(data) + except zlib.error: + self.log.warning("MCCP2 decompression error, disabling") + self._mccp2_end() + return False + if self._mccp2_decompressor.eof: + unused = self._mccp2_decompressor.unused_data + self._mccp2_end() + cmd = self._process_chunk_inner(data) + if unused: + cmd = self._process_chunk(unused) or cmd + return cmd + + return self._process_chunk_inner(data) + + def _process_chunk_inner(self, data: bytes) -> bool: + """Inner chunk processing with IAC interpretation and mid-chunk MCCP2 detection.""" try: mode = self.writer.mode except Exception: @@ -355,7 +387,22 @@ def _process_chunk(self, data: bytes) -> bool: else: slc_special = None - return _process_data_chunk(data, self.writer, self.reader, slc_special, self.log.warning) + cmd_received = _process_data_chunk( + data, self.writer, self.reader, slc_special, self.log.warning + ) + + if self.writer._compressed_remainder is not None: + remainder = self.writer._compressed_remainder + self.writer._compressed_remainder = None + self._mccp2_start() + if remainder: + cmd_received = self._process_chunk(remainder) or cmd_received + + # MCCP3: start compressor when writer signals activation + if self.writer.mccp3_active and self._mccp3_compressor is None: + self._mccp3_start() + + return cmd_received async def _process_rx(self) -> None: """Async processor for receive queue that yields control and applies backpressure.""" @@ -395,6 +442,51 @@ async def _process_rx(self) -> None: if any_cmd and not self._waiter_connected.done(): self._check_negotiation_timer() + def _mccp2_start(self) -> None: + """Start MCCP2 decompression of server→client data.""" + self._mccp2_decompressor = zlib.decompressobj() + self.log.debug("MCCP2 decompression started (server→client)") + + def _mccp2_end(self) -> None: + """Stop MCCP2 decompression.""" + self._mccp2_decompressor = None + self.writer.mccp2_active = False + self.log.debug("MCCP2 decompression ended (server→client)") + + def _mccp3_start(self) -> None: + """Start MCCP3 compression of client→server data.""" + self._mccp3_compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + # Wrap transport.write so all outbound bytes are compressed + transport = self.writer._transport + orig_write = transport.write + + def compressed_write(data: bytes) -> None: + if self._mccp3_compressor is not None: + compressed = self._mccp3_compressor.compress(data) + compressed += self._mccp3_compressor.flush(zlib.Z_SYNC_FLUSH) + orig_write(compressed) + else: + orig_write(data) + + transport.write = compressed_write # type: ignore[assignment] + self._mccp3_orig_write = orig_write + self.log.debug("MCCP3 compression started (client→server)") + + def _mccp3_end(self) -> None: + """Stop MCCP3 compression, flush Z_FINISH.""" + if self._mccp3_compressor is not None: + if not self.writer.is_closing(): + self._mccp3_orig_write( + self._mccp3_compressor.flush(zlib.Z_FINISH) + ) + self._mccp3_compressor = None + # Restore original transport.write + self.writer._transport.write = self._mccp3_orig_write # type: ignore[method-assign] + self.writer.mccp3_active = False + self.log.debug("MCCP3 compression ended (client→server)") + def _check_negotiation_timer(self) -> None: self._check_later.cancel() self._tasks.remove(self._check_later) diff --git a/telnetlib3/fingerprinting.py b/telnetlib3/fingerprinting.py index 8a201df..adcc0b1 100644 --- a/telnetlib3/fingerprinting.py +++ b/telnetlib3/fingerprinting.py @@ -83,6 +83,8 @@ SUPDUPOUTPUT, VT3270REGIME, AUTHENTICATION, + MCCP2_COMPRESS, + MCCP3_COMPRESS, COM_PORT_OPTION, PRAGMA_HEARTBEAT, SUPPRESS_LOCAL_ECHO, @@ -260,6 +262,8 @@ class FingerprintingServer(FingerprintingTelnetServer, TelnetServer): # returning a hard error for anything else. GMCP-capable MUD clients # typically self-announce via IAC WILL GMCP, so probing is unnecessary. EXTENDED_OPTIONS = [ + (MCCP2_COMPRESS, "MCCP2", "MUD Client Compression Protocol v2"), + (MCCP3_COMPRESS, "MCCP3", "MUD Client Compression Protocol v3"), (GMCP, "GMCP", "Generic MUD Communication Protocol"), (MSDP, "MSDP", "MUD Server Data Protocol"), (MSSP, "MSSP", "MUD Server Status Protocol"), diff --git a/telnetlib3/server.py b/telnetlib3/server.py index 8f881ce..0dab03d 100755 --- a/telnetlib3/server.py +++ b/telnetlib3/server.py @@ -16,6 +16,7 @@ # std imports import ssl as ssl_module import sys +import zlib import codecs import signal import socket @@ -27,7 +28,7 @@ # local from . import accessories, server_base from ._types import ShellCallback -from .telopt import name_commands +from .telopt import SB, SE, IAC, MCCP2_COMPRESS, name_commands from .stream_reader import TelnetReader, TelnetReaderUnicode from .stream_writer import TelnetWriter, TelnetWriterUnicode @@ -97,6 +98,7 @@ def __init__( never_send_ga: bool = False, line_mode: bool = False, connect_maxwait: float = 4.0, + compression: Optional[bool] = None, limit: Optional[int] = None, reader_factory: type = TelnetReader, reader_factory_encoding: type = TelnetReaderUnicode, @@ -121,6 +123,11 @@ def __init__( ) self._environ_requested = False self._echo_negotiated = False + self._mccp2_compressor: Optional[Any] = None + self._mccp2_pending: bool = False + self._compression: Optional[bool] = compression + self._mccp2_enabled: bool = compression is True + self._mccp2_orig_write: Optional[Any] = None self.waiter_encoding: asyncio.Future[bool] = asyncio.Future() self._tasks.append(self.waiter_encoding) self._ttype_count = 1 @@ -141,6 +148,9 @@ def connection_made(self, transport: asyncio.BaseTransport) -> None: super().connection_made(transport) + # Set compression policy on writer + self.writer.compression = self._compression + # begin timeout timer self.set_timeout() @@ -168,6 +178,55 @@ def data_received(self, data: bytes) -> None: """Process received data and reset timeout timer.""" self.set_timeout() super().data_received(data) + # MCCP2: start compression once client confirms DO MCCP2 + if ( + self._mccp2_enabled + and not self._mccp2_pending + and self._mccp2_compressor is None + and self.writer.local_option.enabled(MCCP2_COMPRESS) + ): + self._mccp2_start() + + def _mccp2_start(self) -> None: + """Send SB MCCP2 SE and start compressing server→client output.""" + self._mccp2_pending = True + # All bytes after this SE are compressed. + self.writer.send_iac(IAC + SB + MCCP2_COMPRESS + IAC + SE) + + self._mccp2_compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + # Wrap transport.write so all subsequent output is compressed + transport = self.writer._transport + orig_write = transport.write + + def compressed_write(data: bytes) -> None: + if self._mccp2_compressor is not None: + compressed = self._mccp2_compressor.compress(data) + compressed += self._mccp2_compressor.flush(zlib.Z_SYNC_FLUSH) + orig_write(compressed) + else: + orig_write(data) + + transport.write = compressed_write # type: ignore[assignment] + self._mccp2_orig_write = orig_write + self.writer.mccp2_active = True + logger.debug("MCCP2 compression started (server→client)") + + def _mccp2_end(self) -> None: + """Stop MCCP2 compression, flush Z_FINISH.""" + if self._mccp2_compressor is not None: + try: + self._mccp2_orig_write( + self._mccp2_compressor.flush(zlib.Z_FINISH) + ) + except zlib.error as exc: + logger.debug("MCCP2 Z_FINISH flush error: %s", exc) + self._mccp2_compressor = None + self.writer._transport.write = self._mccp2_orig_write # type: ignore[method-assign] + self._mccp2_pending = False + self.writer.mccp2_active = False + logger.debug("MCCP2 compression ended (server→client)") def begin_negotiation(self) -> None: """Begin telnet negotiation by requesting terminal type.""" @@ -188,7 +247,7 @@ def begin_advanced_negotiation(self) -> None: MUD clients (Mudlet, TinTin++, etc.) interpret ``WILL ECHO`` as "password mode" and mask input. See ``_negotiate_echo()``. """ - from .telopt import DO, SGA, NAWS, WILL, BINARY, CHARSET + from .telopt import DO, SGA, NAWS, WILL, BINARY, CHARSET, MCCP2_COMPRESS, MCCP3_COMPRESS super().begin_advanced_negotiation() if not self.line_mode: @@ -199,6 +258,15 @@ def begin_advanced_negotiation(self) -> None: self.writer.iac(DO, NAWS) if self.default_encoding: self.writer.iac(DO, CHARSET) + # MCCP2/MCCP3: opt-in via compression=True, disabled over TLS + # (compress-then-encrypt is vulnerable to CRIME/BREACH attacks). + if self._mccp2_enabled: + ssl_obj = self.writer.get_extra_info("ssl_object") + if ssl_obj is None: + self.writer.iac(WILL, MCCP2_COMPRESS) + self.writer.iac(WILL, MCCP3_COMPRESS) + else: + logger.debug("MCCP disabled: TLS active (CRIME/BREACH mitigation)") def check_negotiation(self, final: bool = False) -> bool: """Check if negotiation is complete including encoding.""" @@ -894,6 +962,7 @@ async def create_server( never_send_ga: bool = False, line_mode: bool = False, connect_maxwait: float = 4.0, + compression: Optional[bool] = None, limit: Optional[int] = None, term: str = "unknown", cols: int = 80, @@ -961,6 +1030,10 @@ async def create_server( otherwise confused by our demands, the shell continues anyway after the greater of this value has elapsed. A client that is not answering option negotiation will delay the start of the shell by this amount. + :param compression: MCCP compression policy. ``None`` (default) + passively accepts compression if requested by the client. ``True`` + advertises MCCP2/MCCP3 during advanced negotiation. ``False`` + rejects all compression offers. :param limit: The buffer limit for the reader stream. :param ssl: An :class:`ssl.SSLContext` for TLS-encrypted connections (TELNETS, :rfc:`855` over TLS). When provided, the server performs a @@ -994,6 +1067,7 @@ def _make_telnet_protocol() -> asyncio.Protocol: never_send_ga=never_send_ga, line_mode=line_mode, connect_maxwait=connect_maxwait, + compression=compression, limit=limit, term=term, cols=cols, @@ -1146,6 +1220,13 @@ def parse_server_args() -> Dict[str, Any]: default=False, help="accept both TLS and plain telnet on the same port (requires --ssl-certfile)", ) + parser.add_argument( + "--compression", + action=argparse.BooleanOptionalAction, + default=None, + help="MCCP compression: --compression to advertise, --no-compression to reject, " + "omit to passively accept (default)", + ) result = vars(parser.parse_args(argv)) result["pty_args"] = pty_args if PTY_SUPPORT else None # --pty-raw is a hidden no-op (raw is now the default); @@ -1196,6 +1277,7 @@ async def run_server( status_interval: int = _config.status_interval, never_send_ga: bool = _config.never_send_ga, line_mode: bool = _config.line_mode, + compression: Optional[bool] = None, protocol_factory: Optional[Type[asyncio.Protocol]] = None, ssl: Optional[ssl_module.SSLContext] = None, tls_auto: bool = False, @@ -1280,8 +1362,9 @@ async def guarded_shell( force_binary=force_binary, never_send_ga=never_send_ga, line_mode=line_mode, - timeout=timeout, connect_maxwait=connect_maxwait, + compression=compression, + timeout=timeout, ssl=ssl, tls_auto=tls_auto, ) diff --git a/telnetlib3/server_base.py b/telnetlib3/server_base.py index e511626..d12acbe 100644 --- a/telnetlib3/server_base.py +++ b/telnetlib3/server_base.py @@ -3,6 +3,7 @@ from __future__ import annotations # std imports +import zlib import asyncio import logging import datetime @@ -29,6 +30,7 @@ class BaseServer(TelnetProtocolBase, asyncio.streams.FlowControlMixin, asyncio.P _check_later = None _rx_bytes = 0 _tx_bytes = 0 + _mccp3_decompressor: Optional[zlib.Decompress] = None def __init__( self, @@ -196,6 +198,21 @@ def data_received(self, data: bytes) -> None: self._last_received = datetime.datetime.now() self._rx_bytes += len(data) + # MCCP3: decompress client→server data when active + if self._mccp3_decompressor is not None: + try: + data = self._mccp3_decompressor.decompress(data) + except zlib.error: + logger.warning("MCCP3 decompression error, disabling") + self._mccp3_end() + return + if self._mccp3_decompressor.eof: + unused = self._mccp3_decompressor.unused_data + self._mccp3_end() + if unused: + self.data_received(unused) + return + if self.writer.slc_simulated: slc_vals = {defn.val[0] for defn in self.writer.slctab.values() if defn.val != theNULL} slc_special: frozenset[int] | None = frozenset({255} | slc_vals) @@ -206,6 +223,10 @@ def data_received(self, data: bytes) -> None: data, self.writer, self.reader, slc_special, logger.warning ) + # Check if MCCP3 SB was just received (client→server compression start) + if self.writer.mccp3_active and self._mccp3_decompressor is None: + self._mccp3_start() + if not self._waiter_connected.done() and cmd_received: self._check_negotiation_timer() @@ -320,4 +341,15 @@ def _check_negotiation_timer(self) -> None: ) self._tasks.append(self._check_later) + def _mccp3_start(self) -> None: + """Start MCCP3 decompression of client→server data.""" + self._mccp3_decompressor = zlib.decompressobj() + logger.debug("MCCP3 decompression started (client→server)") + + def _mccp3_end(self) -> None: + """Stop MCCP3 decompression.""" + self._mccp3_decompressor = None + self.writer.mccp3_active = False + logger.debug("MCCP3 decompression ended (client→server)") + _log_exception = staticmethod(_log_exception) diff --git a/telnetlib3/stream_writer.py b/telnetlib3/stream_writer.py index f85072a..d380eff 100644 --- a/telnetlib3/stream_writer.py +++ b/telnetlib3/stream_writer.py @@ -85,6 +85,8 @@ TTABLE_ACK, TTABLE_NAK, NEW_ENVIRON, + MCCP2_COMPRESS, + MCCP3_COMPRESS, COM_PORT_OPTION, TTABLE_REJECTED, LFLOW_RESTART_ANY, @@ -101,7 +103,7 @@ __all__ = ("TelnetWriter", "TelnetWriterUnicode") #: MUD options that allow empty SB payloads (e.g. ``IAC SB MXP IAC SE``). -_EMPTY_SB_OK = frozenset({MXP, MSP, ZMP, AARDWOLF, ATCP}) +_EMPTY_SB_OK = frozenset({MXP, MSP, ZMP, AARDWOLF, ATCP, MCCP2_COMPRESS, MCCP3_COMPRESS}) #: MUD protocol options that a plain telnet client should decline by default. _MUD_PROTOCOL_OPTIONS = frozenset({GMCP, MSDP, MSSP, MSP, MXP, ZMP, AARDWOLF, ATCP}) @@ -283,6 +285,26 @@ def __init__( #: ``None`` until an ``SB COM-PORT-OPTION`` payload is received. self.comport_data: Optional[dict[str, Any]] = None + #: Compression policy: ``None`` = passively accept (default), + #: ``True`` = actively request, ``False`` = reject. + self.compression: Optional[bool] = None + + #: One-shot flag: set True by ``_handle_sb_mccp2()`` when + #: ``IAC SB MCCP2 IAC SE`` is received. Consumed by + #: ``_process_data_chunk()`` to detect mid-chunk compression start. + self._mccp2_activated: bool = False + + #: Compressed remainder bytes after MCCP2 activation mid-chunk. + #: Set by ``_process_data_chunk()`` when ``_mccp2_activated`` fires, + #: consumed and cleared by the caller. + self._compressed_remainder: Optional[bytes] = None + + #: Whether MCCP2 compression is currently active (server→client). + self.mccp2_active: bool = False + + #: Whether MCCP3 compression is currently active (client→server). + self.mccp3_active: bool = False + #: Sub-negotiation buffer self._sb_buffer: collections.deque[bytes] = collections.deque() @@ -359,6 +381,7 @@ def __init__( (ZMP, "zmp"), (AARDWOLF, "aardwolf"), (ATCP, "atcp"), + (MCCP2_COMPRESS, "mccp2"), ): self.set_ext_callback(cmd=ext_cmd, func=getattr(self, f"handle_{key}")) @@ -1778,6 +1801,7 @@ def handle_do(self, opt: bytes) -> bool: CHARSET, NAWS, STATUS, + MCCP2_COMPRESS, GMCP, MSDP, MSSP, @@ -1797,6 +1821,12 @@ def handle_do(self, opt: bytes) -> bool: if not self.local_option.enabled(opt): self.iac(WONT, opt) return False + # Reject MCCP when compression is disabled or TLS is active + # (compress-then-encrypt is vulnerable to CRIME/BREACH attacks). + if opt in (MCCP2_COMPRESS, MCCP3_COMPRESS): + if self.compression is False or self.get_extra_info("ssl_object") is not None: + self.iac(WONT, opt) + return False # first time we've agreed, respond accordingly. if not self.local_option.enabled(opt): @@ -1879,6 +1909,8 @@ def handle_will(self, opt: bytes) -> None: EOR, SNDLOC, COM_PORT_OPTION, + MCCP2_COMPRESS, + MCCP3_COMPRESS, GMCP, MSDP, MSSP, @@ -1903,6 +1935,12 @@ def handle_will(self, opt: bytes) -> None: return self.iac(DONT, opt) return + # Reject MCCP when compression is disabled or TLS is active + # (compress-then-encrypt is vulnerable to CRIME/BREACH attacks). + if opt in (MCCP2_COMPRESS, MCCP3_COMPRESS): + if self.compression is False or self.get_extra_info("ssl_object") is not None: + self.iac(DONT, opt) + return if not self.remote_option.enabled(opt): self.iac(DO, opt) self.remote_option[opt] = True @@ -1914,6 +1952,12 @@ def handle_will(self, opt: bytes) -> None: self.send_linemode(self.default_linemode) if opt == COM_PORT_OPTION and self.client: self.request_comport_signature() + if opt == MCCP3_COMPRESS and self.client: + # MCCP3: client sends SB 87 SE to signal compression start; + # all bytes after SE are compressed by client. + self.send_iac(IAC + SB + MCCP3_COMPRESS + IAC + SE) + self.mccp3_active = True + self.log.debug("MCCP3: client compression activated") elif opt == TM: if opt == TM and not self.pending_option.enabled(DO + TM): @@ -2063,6 +2107,8 @@ def handle_subnegotiation(self, buf: collections.deque[bytes]) -> None: ZMP: self._handle_sb_zmp, AARDWOLF: self._handle_sb_aardwolf, ATCP: self._handle_sb_atcp, + MCCP2_COMPRESS: self._handle_sb_mccp2, + MCCP3_COMPRESS: self._handle_sb_mccp3, }.get(cmd) if fn_call is None: raise ValueError(f"SB unhandled: cmd={name_command(cmd)}, buf={buf!r}") @@ -2984,6 +3030,36 @@ def _handle_sb_atcp(self, buf: collections.deque[bytes]) -> None: package, value = atcp_decode(payload, encoding=encoding) self._ext_callback[ATCP](package, value) + def _handle_sb_mccp2(self, buf: collections.deque[bytes]) -> None: + """ + Handle MCCP2 subnegotiation (``IAC SB MCCP2 IAC SE``). + + Sets :attr:`_mccp2_activated` so ``_process_data_chunk()`` can detect + that remaining bytes in the current TCP chunk are zlib-compressed. + + :param buf: bytes following IAC SB MCCP2_COMPRESS. + """ + buf.popleft() + self._mccp2_activated = True + self._ext_callback[MCCP2_COMPRESS](True) + + def _handle_sb_mccp3(self, buf: collections.deque[bytes]) -> None: + """ + Handle MCCP3 subnegotiation (``IAC SB MCCP3 IAC SE``). + + On server side, this signals that subsequent client→server data is + zlib-compressed. + + :param buf: bytes following IAC SB MCCP3_COMPRESS. + """ + buf.popleft() + self.mccp3_active = True + self.log.debug("MCCP3: server received SB, client→server compression active") + + def handle_mccp2(self, activated: bool) -> None: + """Default ext_callback for MCCP2 activation.""" + self.log.debug("MCCP2 %s", "activated" if activated else "deactivated") + def _handle_do_forwardmask(self, buf: collections.deque[bytes]) -> None: """ Callback handles request for LINEMODE DO FORWARDMASK. diff --git a/telnetlib3/telopt.py b/telnetlib3/telopt.py index 30d2aaf..6edaec1 100644 --- a/telnetlib3/telopt.py +++ b/telnetlib3/telopt.py @@ -116,6 +116,7 @@ "LINEMODE", "LOGOUT", "MCCP2_COMPRESS", + "MCCP3_COMPRESS", "MCCP_COMPRESS", "MSDP", "MSDP_ARRAY_CLOSE", @@ -197,7 +198,7 @@ REQUEST, ACCEPTED, REJECTED, TTABLE_IS, TTABLE_REJECTED, TTABLE_ACK, TTABLE_NAK = ( bytes([const]) for const in range(1, 8) ) -MCCP_COMPRESS, MCCP2_COMPRESS = (bytes([85]), bytes([86])) +MCCP_COMPRESS, MCCP2_COMPRESS, MCCP3_COMPRESS = (bytes([85]), bytes([86]), bytes([87])) GMCP = bytes([201]) MSDP = bytes([69]) MSSP = bytes([70]) @@ -268,6 +269,7 @@ "SNDLOC", "MCCP_COMPRESS", "MCCP2_COMPRESS", + "MCCP3_COMPRESS", "GMCP", "MSDP", "MSSP", diff --git a/telnetlib3/tests/test_mccp.py b/telnetlib3/tests/test_mccp.py new file mode 100644 index 0000000..0c8b6c9 --- /dev/null +++ b/telnetlib3/tests/test_mccp.py @@ -0,0 +1,704 @@ +"""Tests for MCCP (MUD Client Compression Protocol) v2 and v3.""" + +# std imports +import zlib +import asyncio +import collections + +# 3rd party +import pytest + +# local +from telnetlib3.telopt import ( + DO, + SB, + SE, + IAC, + DONT, + WILL, + WONT, + MCCP2_COMPRESS, + MCCP3_COMPRESS, +) +from telnetlib3.stream_writer import TelnetWriter +from telnetlib3.tests.accessories import MockProtocol, MockTransport + + +def new_writer(server=True, client=False, reader=None): + t = MockTransport() + p = MockProtocol() + w = TelnetWriter(t, p, server=server, client=client, reader=reader) + return w, t, p + + +class TestMCCP2Negotiation: + def test_handle_will_mccp2_client(self): + w, t, _p = new_writer(server=False, client=True) + w.handle_will(MCCP2_COMPRESS) + assert IAC + DO + MCCP2_COMPRESS in t.writes + assert w.remote_option.get(MCCP2_COMPRESS) is True + + def test_handle_will_mccp2_server(self): + w, t, _p = new_writer(server=True) + w.handle_will(MCCP2_COMPRESS) + assert IAC + DO + MCCP2_COMPRESS in t.writes + + def test_handle_do_mccp2_server(self): + w, t, _p = new_writer(server=True) + result = w.handle_do(MCCP2_COMPRESS) + assert result is True + assert IAC + WILL + MCCP2_COMPRESS in t.writes + + def test_handle_do_mccp2_client(self): + w, t, _p = new_writer(server=False, client=True) + result = w.handle_do(MCCP2_COMPRESS) + assert result is True + assert IAC + WILL + MCCP2_COMPRESS in t.writes + + +class TestMCCP3Negotiation: + def test_handle_will_mccp3_client(self): + w, t, _p = new_writer(server=False, client=True) + w.handle_will(MCCP3_COMPRESS) + assert IAC + DO + MCCP3_COMPRESS in t.writes + assert IAC + SB + MCCP3_COMPRESS + IAC + SE in t.writes + assert w.mccp3_active is True + + def test_handle_will_mccp3_server(self): + w, t, _p = new_writer(server=True) + w.handle_will(MCCP3_COMPRESS) + assert IAC + DO + MCCP3_COMPRESS in t.writes + assert w.remote_option.get(MCCP3_COMPRESS) is True + + +class TestMCCPCompressionRejection: + @pytest.mark.parametrize("opt", [MCCP2_COMPRESS, MCCP3_COMPRESS], ids=["MCCP2", "MCCP3"]) + def test_handle_will_rejected_when_disabled(self, opt): + w, t, _p = new_writer(server=False, client=True) + w.compression = False + w.handle_will(opt) + assert IAC + DONT + opt in t.writes + assert w.remote_option.get(opt) is not True + + @pytest.mark.parametrize("opt", [MCCP2_COMPRESS, MCCP3_COMPRESS], ids=["MCCP2", "MCCP3"]) + def test_handle_do_rejected_when_disabled(self, opt): + w, t, _p = new_writer(server=True) + w.compression = False + result = w.handle_do(opt) + assert result is False + assert IAC + WONT + opt in t.writes + + +@pytest.mark.asyncio +class TestMCCPRejectedOverTLS: + @pytest.mark.parametrize("opt", [MCCP2_COMPRESS, MCCP3_COMPRESS], ids=["MCCP2", "MCCP3"]) + async def test_server_does_not_offer_mccp_when_tls_active(self, opt): + """Server skips WILL MCCP2/MCCP3 when TLS is active.""" + from telnetlib3.server import TelnetServer + + server = TelnetServer(encoding=False, connect_maxwait=0.1, compression=True) + transport = MockTransport() + transport.extra["ssl_object"] = object() + server.connection_made(transport) + + server.begin_advanced_negotiation() + + written = b"".join(transport.writes) + assert IAC + WILL + MCCP2_COMPRESS not in written + assert IAC + WILL + MCCP3_COMPRESS not in written + + async def test_server_offers_mccp_when_no_tls(self): + """Server sends WILL MCCP2/MCCP3 when no TLS.""" + from telnetlib3.server import TelnetServer + + server = TelnetServer(encoding=False, connect_maxwait=0.1, compression=True) + transport = MockTransport() + server.connection_made(transport) + + server.begin_advanced_negotiation() + + written = b"".join(transport.writes) + assert IAC + WILL + MCCP2_COMPRESS in written + assert IAC + WILL + MCCP3_COMPRESS in written + + @pytest.mark.parametrize("opt", [MCCP2_COMPRESS, MCCP3_COMPRESS], ids=["MCCP2", "MCCP3"]) + def test_client_rejects_will_mccp_over_tls(self, opt): + """Client sends DONT when server offers WILL MCCP over TLS.""" + w, t, _p = new_writer(server=False, client=True) + t.extra["ssl_object"] = object() + w.handle_will(opt) + assert IAC + DONT + opt in t.writes + assert w.remote_option.get(opt) is not True + + @pytest.mark.parametrize("opt", [MCCP2_COMPRESS, MCCP3_COMPRESS], ids=["MCCP2", "MCCP3"]) + def test_client_rejects_do_mccp_over_tls(self, opt): + """Client sends WONT when server sends DO MCCP over TLS.""" + w, t, _p = new_writer(server=True) + t.extra["ssl_object"] = object() + result = w.handle_do(opt) + assert result is False + assert IAC + WONT + opt in t.writes + + +class TestMCCP2SBHandler: + def test_sb_mccp2_sets_activated_flag(self): + w, _t, _p = new_writer(server=False, client=True) + w.pending_option[SB + MCCP2_COMPRESS] = True + buf = collections.deque([MCCP2_COMPRESS]) + w.handle_subnegotiation(buf) + assert w._mccp2_activated is True + + def test_sb_mccp2_calls_ext_callback(self): + w, _t, _p = new_writer(server=False, client=True) + received = [] + w.set_ext_callback(MCCP2_COMPRESS, lambda val: received.append(val)) + w.pending_option[SB + MCCP2_COMPRESS] = True + buf = collections.deque([MCCP2_COMPRESS]) + w.handle_subnegotiation(buf) + assert received == [True] + + +class TestMCCP3SBHandler: + def test_sb_mccp3_activates_on_server(self): + w, _t, _p = new_writer(server=True) + w.pending_option[SB + MCCP3_COMPRESS] = True + buf = collections.deque([MCCP3_COMPRESS]) + w.handle_subnegotiation(buf) + assert w.mccp3_active is True + + +class TestMCCP2MidChunk: + def test_mid_chunk_split(self): + """SB MCCP2 SE followed by compressed bytes in a single chunk.""" + from telnetlib3._base import _process_data_chunk + from telnetlib3.stream_reader import TelnetReader + + reader = TelnetReader() + w, _t, _p = new_writer(server=False, client=True, reader=reader) + + plaintext = b"Hello, compressed world!" + compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + compressed = compressor.compress(plaintext) + compressed += compressor.flush(zlib.Z_SYNC_FLUSH) + + chunk = IAC + SB + MCCP2_COMPRESS + IAC + SE + compressed + cmd_received = _process_data_chunk( + chunk, w, reader, None, lambda *a: None + ) + assert cmd_received is True + assert w.mccp2_active is True + assert w._compressed_remainder == compressed + w._compressed_remainder = None + + def test_no_remainder_without_mccp2(self): + from telnetlib3._base import _process_data_chunk + from telnetlib3.stream_reader import TelnetReader + + reader = TelnetReader() + w, _t, _p = new_writer(server=False, client=True, reader=reader) + + chunk = b"plain text data" + cmd_received = _process_data_chunk( + chunk, w, reader, None, lambda *a: None + ) + assert cmd_received is False + assert w._compressed_remainder is None + + +class TestMCCP2Decompression: + def test_stream_end_detection(self): + """Z_STREAM_END triggers decompressor cleanup.""" + compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + data = b"test data for z_stream_end" + compressed = compressor.compress(data) + compressed += compressor.flush(zlib.Z_FINISH) + + trailing_plaintext = b"after-compression" + full = compressed + trailing_plaintext + + decompressor = zlib.decompressobj() + decompressed = decompressor.decompress(full) + assert decompressed == data + assert decompressor.eof is True + assert decompressor.unused_data == trailing_plaintext + + +class TestMCCP2CompressionRoundTrip: + def test_iac_survives_roundtrip(self): + """IAC sequences survive compression and decompression.""" + compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + data_with_iac = IAC + WILL + MCCP2_COMPRESS + b"normal text" + compressed = compressor.compress(data_with_iac) + compressed += compressor.flush(zlib.Z_SYNC_FLUSH) + + decompressor = zlib.decompressobj() + decompressed = decompressor.decompress(compressed) + assert decompressed == data_with_iac + + +class TestMCCPAttributes: + def test_initial_attributes(self): + w, _t, _p = new_writer(server=True) + assert w._mccp2_activated is False + assert w.mccp2_active is False + assert w.mccp3_active is False + + @pytest.mark.parametrize( + "opt", + [MCCP2_COMPRESS, MCCP3_COMPRESS], + ids=["MCCP2", "MCCP3"], + ) + def test_empty_sb_allowed(self, opt): + w, _t, _p = new_writer(server=False, client=True) + w.pending_option[SB + opt] = True + buf = collections.deque([opt]) + w.handle_subnegotiation(buf) + + +def _make_compressed(plaintext: bytes, finish: bool = False) -> bytes: + compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + data = compressor.compress(plaintext) + data += compressor.flush(zlib.Z_FINISH if finish else zlib.Z_SYNC_FLUSH) + return data + + +_BOUNDARY_PLAINTEXT = b"The quick brown fox jumps over the lazy dog. " * 3 +_BOUNDARY_COMPRESSED = _make_compressed(_BOUNDARY_PLAINTEXT) +_BOUNDARY_SB = IAC + SB + MCCP2_COMPRESS + IAC + SE +_BOUNDARY_FULL = _BOUNDARY_SB + _BOUNDARY_COMPRESSED +_BOUNDARY_SPLITS = [ + 1, 2, 3, 4, + len(_BOUNDARY_SB) - 1, + len(_BOUNDARY_SB), + len(_BOUNDARY_SB) + 1, + len(_BOUNDARY_SB) + len(_BOUNDARY_COMPRESSED) // 2, + len(_BOUNDARY_FULL) - 1, +] +_BOUNDARY_IDS = [f"split_at_{s}" for s in _BOUNDARY_SPLITS] + + +def _make_client_with_capture(): + """Create a BaseClient with captured reader output, for packet boundary tests.""" + from telnetlib3.client_base import BaseClient + + received: list[bytes] = [] + client = BaseClient(encoding=False, connect_minwait=0, connect_maxwait=0.1) + transport = MockTransport() + client.connection_made(transport) + + orig_feed = client.reader.feed_data + + def capture_feed(data: bytes) -> None: + received.append(data) + orig_feed(data) + + client.reader.feed_data = capture_feed + return client, received + + +@pytest.mark.asyncio +class TestMCCP2PacketBoundary: + @pytest.mark.parametrize("split_at", _BOUNDARY_SPLITS, ids=_BOUNDARY_IDS) + async def test_two_chunk_delivery(self, split_at): + """Compressed data split across two TCP chunks at various offsets.""" + client, received = _make_client_with_capture() + + client._process_chunk(_BOUNDARY_FULL[:split_at]) + client._process_chunk(_BOUNDARY_FULL[split_at:]) + + joined = b"".join(received) + assert joined == _BOUNDARY_PLAINTEXT + + @pytest.mark.parametrize( + "n_chunks", + [3, 5, 10], + ids=["3_chunks", "5_chunks", "10_chunks"], + ) + async def test_multi_chunk_delivery(self, n_chunks): + """Compressed data delivered in many small chunks.""" + client, received = _make_client_with_capture() + + chunk_size = max(1, len(_BOUNDARY_FULL) // n_chunks) + for i in range(0, len(_BOUNDARY_FULL), chunk_size): + client._process_chunk(_BOUNDARY_FULL[i : i + chunk_size]) + + joined = b"".join(received) + assert joined == _BOUNDARY_PLAINTEXT + + async def test_z_finish_with_trailing_plaintext(self): + """Z_FINISH boundary: compressed stream ends, plaintext follows.""" + client, received = _make_client_with_capture() + + plaintext = b"compressed content here" + trailing = b"plaintext after compression ends" + compressed = _make_compressed(plaintext, finish=True) + full = _BOUNDARY_SB + compressed + trailing + + client._process_chunk(full) + joined = b"".join(received) + assert joined == plaintext + trailing + + @pytest.mark.parametrize( + "split_at", + [1, 4, 8, 16], + ids=["byte_1", "byte_4", "byte_8", "byte_16"], + ) + async def test_compressed_only_boundary(self, split_at): + """Split within compressed data only (SB already processed).""" + client, received = _make_client_with_capture() + + client._process_chunk(_BOUNDARY_SB) + + actual_split = min(split_at, len(_BOUNDARY_COMPRESSED) - 1) + client._process_chunk(_BOUNDARY_COMPRESSED[:actual_split]) + client._process_chunk(_BOUNDARY_COMPRESSED[actual_split:]) + + joined = b"".join(received) + assert joined == _BOUNDARY_PLAINTEXT + + +@pytest.mark.asyncio +class TestMCCPDecompressionError: + async def test_client_corrupt_mccp2_drops_data(self): + """Corrupt compressed data is discarded, not fed to IAC parser.""" + client, received = _make_client_with_capture() + + # Activate MCCP2 via SB + client._process_chunk(_BOUNDARY_SB) + + # Send garbage that is not valid zlib + client._process_chunk(b"\x00\x01\x02\x03\xff\xfe\xfd") + + # Decompressor should be disabled, corrupt data not fed to reader + assert client._mccp2_decompressor is None + assert received == [] + + async def test_server_corrupt_mccp3_drops_data(self): + """Corrupt MCCP3 data is discarded, not fed to IAC parser.""" + from telnetlib3.server_base import BaseServer + + server = BaseServer(encoding=False, connect_maxwait=0.1) + transport = MockTransport() + server.connection_made(transport) + + received: list[bytes] = [] + orig_feed = server.reader.feed_data + server.reader.feed_data = lambda d: (received.append(d), orig_feed(d)) + + # Manually activate MCCP3 decompression + server._mccp3_decompressor = zlib.decompressobj() + + # Send garbage + server.data_received(b"\x00\x01\x02\x03\xff\xfe\xfd") + + assert server._mccp3_decompressor is None + assert received == [] + + +@pytest.mark.asyncio +class TestMCCP2ServerEnd: + async def test_mccp2_end_flushes_and_restores(self): + """_mccp2_end flushes Z_FINISH and restores transport.write.""" + from telnetlib3.server import TelnetServer + + server = TelnetServer(encoding=False, connect_maxwait=0.1, compression=True) + transport = MockTransport() + server.connection_made(transport) + + # Manually start MCCP2 compression + server._mccp2_compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + orig_write = transport.write + server._mccp2_orig_write = orig_write + server._mccp2_pending = True + server.writer.mccp2_active = True + + # Wrap transport like _mccp2_start does + def compressed_write(data: bytes) -> None: + if server._mccp2_compressor is not None: + c = server._mccp2_compressor.compress(data) + c += server._mccp2_compressor.flush(zlib.Z_SYNC_FLUSH) + orig_write(c) + else: + orig_write(data) + + transport.write = compressed_write + + server._mccp2_end() + + assert server._mccp2_compressor is None + assert server._mccp2_pending is False + assert server.writer.mccp2_active is False + # transport.write should be restored to original + assert transport.write is orig_write + + async def test_mccp2_end_handles_zlib_error(self): + """_mccp2_end catches zlib.error from double-flush.""" + from telnetlib3.server import TelnetServer + + server = TelnetServer(encoding=False, connect_maxwait=0.1, compression=True) + transport = MockTransport() + server.connection_made(transport) + + compressor = zlib.compressobj( + zlib.Z_BEST_COMPRESSION, zlib.DEFLATED, 12, 5, zlib.Z_DEFAULT_STRATEGY + ) + # Exhaust the compressor so flush(Z_FINISH) raises + compressor.flush(zlib.Z_FINISH) + + server._mccp2_compressor = compressor + server._mccp2_orig_write = transport.write + server._mccp2_pending = True + server.writer.mccp2_active = True + + server._mccp2_end() + + assert server._mccp2_compressor is None + assert server.writer.mccp2_active is False + + async def test_compressed_write_fallback_after_end(self): + """compressed_write uses orig_write when compressor is None.""" + from telnetlib3.server import TelnetServer + + server = TelnetServer(encoding=False, connect_maxwait=0.1, compression=True) + transport = MockTransport() + server.connection_made(transport) + + server._mccp2_start() + compressed_write = transport.write + + # End compression — restores transport.write + server._mccp2_end() + + # The closure should fallback to orig_write when compressor is None + transport.writes.clear() + compressed_write(b"plaintext after end") + assert b"plaintext after end" in transport.writes + + +@pytest.mark.asyncio +class TestMCCP3ClientEnd: + async def test_mccp3_end_flushes_and_restores(self): + """_mccp3_end flushes Z_FINISH and restores transport.write.""" + from telnetlib3.client_base import BaseClient + + client = BaseClient(encoding=False, connect_minwait=0, connect_maxwait=0.1) + transport = MockTransport() + client.connection_made(transport) + + client._mccp3_start() + assert client._mccp3_compressor is not None + + client._mccp3_end() + + assert client._mccp3_compressor is None + assert client.writer.mccp3_active is False + # Final flush bytes should have been written + assert len(transport.writes) > 0 + + async def test_mccp3_end_skips_write_when_closing(self): + """_mccp3_end skips final write when transport is closing.""" + from telnetlib3.client_base import BaseClient + + client = BaseClient(encoding=False, connect_minwait=0, connect_maxwait=0.1) + transport = MockTransport() + client.connection_made(transport) + + client._mccp3_start() + transport.writes.clear() + transport._closing = True + + client._mccp3_end() + + assert client._mccp3_compressor is None + # No final flush written because transport is closing + assert transport.writes == [] + + async def test_mccp3_end_noop_when_inactive(self): + """_mccp3_end is safe to call when compression is not active.""" + from telnetlib3.client_base import BaseClient + + client = BaseClient(encoding=False, connect_minwait=0, connect_maxwait=0.1) + transport = MockTransport() + client.connection_made(transport) + + client._mccp3_end() + assert client._mccp3_compressor is None + assert client.writer.mccp3_active is False + + async def test_compressed_write_fallback_after_end(self): + """Client compressed_write uses orig_write when compressor is None.""" + from telnetlib3.client_base import BaseClient + + client = BaseClient(encoding=False, connect_minwait=0, connect_maxwait=0.1) + transport = MockTransport() + client.connection_made(transport) + + client._mccp3_start() + compressed_write = transport.write + + client._mccp3_end() + + transport.writes.clear() + compressed_write(b"plain after mccp3 end") + assert b"plain after mccp3 end" in transport.writes + + +@pytest.mark.asyncio +class TestMCCP2ServerEndNoop: + async def test_mccp2_end_noop_when_inactive(self): + """_mccp2_end is safe to call when compression is not active.""" + from telnetlib3.server import TelnetServer + + server = TelnetServer(encoding=False, connect_maxwait=0.1, compression=True) + transport = MockTransport() + server.connection_made(transport) + + server._mccp2_end() + assert server._mccp2_compressor is None + assert server.writer.mccp2_active is False + + +@pytest.mark.asyncio +class TestMCCP2Integration: + async def test_server_client_mccp2(self): + """Full MCCP2 round-trip: server compresses, client decompresses.""" + from telnetlib3.client import open_connection + from telnetlib3.server import create_server + + received_data: list[str] = [] + test_text = "Hello from MCCP2 compressed server!" + + async def server_shell(reader, writer): + writer.write(test_text) + writer.close() + + async def client_shell(reader, writer): + data = await asyncio.wait_for(reader.read(4096), timeout=5) + received_data.append(data) + + server = await create_server( + host="127.0.0.1", + port=0, + compression=True, + shell=server_shell, + encoding="utf-8", + connect_maxwait=1.0, + ) + port = server._server.sockets[0].getsockname()[1] + + try: + reader, writer = await open_connection( + host="127.0.0.1", + port=port, + shell=client_shell, + encoding="utf-8", + connect_minwait=0, + connect_maxwait=2.0, + ) + await asyncio.wait_for(writer.protocol.waiter_closed, timeout=10) + finally: + server.close() + + combined = "".join(received_data) + assert test_text in combined + + async def test_server_client_mccp2_bidirectional(self): + """MCCP2 server→client: server writes compressed, client reads plaintext.""" + from telnetlib3.client import open_connection + from telnetlib3.server import create_server + + server_received: list[str] = [] + client_received: list[str] = [] + client_msg = "hello from client" + server_msg = "hello from server" + + async def server_shell(reader, writer): + data = await asyncio.wait_for(reader.read(4096), timeout=5) + server_received.append(data) + writer.write(server_msg) + writer.close() + + async def client_shell(reader, writer): + writer.write(client_msg) + data = await asyncio.wait_for(reader.read(4096), timeout=5) + client_received.append(data) + + server = await create_server( + host="127.0.0.1", + port=0, + compression=True, + shell=server_shell, + encoding="utf-8", + connect_maxwait=1.0, + ) + port = server._server.sockets[0].getsockname()[1] + + try: + _reader, writer = await open_connection( + host="127.0.0.1", + port=port, + shell=client_shell, + encoding="utf-8", + connect_minwait=0, + connect_maxwait=2.0, + ) + await asyncio.wait_for(writer.protocol.waiter_closed, timeout=10) + finally: + server.close() + + assert client_msg in "".join(server_received) + assert server_msg in "".join(client_received) + + async def test_server_client_mccp3(self): + """MCCP3 client→server: client compresses, server reads plaintext.""" + from telnetlib3.client import open_connection + from telnetlib3.server import create_server + + server_received: list[str] = [] + client_msg = "hello compressed from client via MCCP3" + + async def server_shell(reader, writer): + data = await asyncio.wait_for(reader.read(4096), timeout=5) + server_received.append(data) + writer.write("ack") + writer.close() + + async def client_shell(reader, writer): + await asyncio.sleep(0.2) + writer.write(client_msg) + await asyncio.wait_for(reader.read(4096), timeout=5) + + server = await create_server( + host="127.0.0.1", + port=0, + compression=True, + shell=server_shell, + encoding="utf-8", + connect_maxwait=1.0, + ) + port = server._server.sockets[0].getsockname()[1] + + try: + _reader, writer = await open_connection( + host="127.0.0.1", + port=port, + shell=client_shell, + encoding="utf-8", + connect_minwait=0, + connect_maxwait=2.0, + ) + await asyncio.wait_for(writer.protocol.waiter_closed, timeout=10) + finally: + server.close() + + assert client_msg in "".join(server_received)