diff --git a/docs/api/color_filter.rst b/docs/api/color_filter.rst deleted file mode 100644 index 79df2de..0000000 --- a/docs/api/color_filter.rst +++ /dev/null @@ -1,7 +0,0 @@ -color_filter ------------- - -.. automodule:: telnetlib3.color_filter - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/history.rst b/docs/history.rst index 4ad92e1..9868190 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,20 +1,23 @@ History ======= -3.0.3 (unreleased) - * new: :func:`~telnetlib3.accessories.make_logger` accepts a ``filemode`` - parameter (``"a"`` append or ``"w"`` overwrite) so callers can control - whether log files are truncated on each run. The default remains - ``"a"`` (append), matching historical behaviour. - * new: ``--logfile-mode {append,rewrite}`` CLI flag controls whether the - log file is appended to (default) or overwritten on each connection. - * new: ``--typescript-mode {append,rewrite}`` CLI flag controls whether the - typescript file is appended to (default) or overwritten on each connection. - * bugfix: kludge mode not detected when server negotiates WILL ECHO + DO SGA - (requesting client WILL SGA) instead of WILL ECHO + WILL SGA. - :attr:`~telnetlib3.stream_writer.TelnetWriter.mode` and - ``_server_will_sga()`` now check SGA in either direction - (``remote_option`` or ``local_option``), so clients correctly switch from - line mode to character-at-a-time mode with these servers. +4.0.0 + * removed: ``telnetlib3.color_filter``. ``ColorFilter``, ``ColorConfig``, ``PALETTES``, + ``PetsciiColorFilter``, and ``AtasciiControlFilter`` have all been moved to the downstream + `Telix `_ project, recommended for connecting to legacy `BBSs + `_ systems requiring color correction. + + ``telnetlib3-client`` CLI args ``--colormatch``, ``--color-brightness``, ``--color-contrast``, + ``--background-color``, ``--ice-colors`` removed. + +3.0.3 + * bugfix: server and client now correctly complete LINEMODE negotiation when prompted to. + * new: ``--logfile-mode {append,rewrite}`` and ``--typescript-mode`` CLI flags + and :func:`~telnetlib3.accessories.make_logger` ``filemode`` argument control whether the log + file is appended to (default) or overwritten on each connection. + * new: :class:`~telnetlib3.client_shell.LinemodeBuffer` used by ``telnetlib3-client``, a + client-side line buffer for LINEMODE EDIT mode with local erase-char, erase-line, erase-word + editing, forwardmask flushing, and TRAPSIG IAC command generation. The default 'telsh' server + was also updated to support linemode. 3.0.2 * bugfix: :meth:`~telnetlib3.stream_writer.TelnetWriter.request_charset` raised :exc:`TypeError`, @@ -130,7 +133,7 @@ History art) as surrogates instead of replacing them with U+FFFD. 2.4.0 - * new: :mod:`telnetlib3.color_filter` module — translates 16-color ANSI SGR + * new: ``telnetlib3.color_filter`` module — translates 16-color ANSI SGR codes to 24-bit RGB from hardware palettes (EGA, CGA, VGA, xterm). Enabled by default. New client CLI options: ``--colormatch``, ``--color-brightness``, ``--color-contrast``, ``--background-color``, diff --git a/pyproject.toml b/pyproject.toml index 89c7aab..21738c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "telnetlib3" -version = "3.0.2" # Keep in sync with telnetlib3/accessories.py::get_version ! +version = "4.0.0" # Keep in sync with telnetlib3/accessories.py::get_version ! description = " Python Telnet server and client CLI and Protocol library" readme = "README.rst" license = "ISC" diff --git a/telnetlib3/__init__.py b/telnetlib3/__init__.py index 5f2c662..f717ca1 100644 --- a/telnetlib3/__init__.py +++ b/telnetlib3/__init__.py @@ -16,7 +16,6 @@ from . import stream_reader from . import client_base from . import client_shell -from . import color_filter from . import client from . import telopt from . import mud diff --git a/telnetlib3/_session_context.py b/telnetlib3/_session_context.py index cc9ed28..5120f18 100644 --- a/telnetlib3/_session_context.py +++ b/telnetlib3/_session_context.py @@ -31,7 +31,6 @@ def __init__(self) -> None: self.raw_mode: Optional[bool] = None self.ascii_eol: bool = False self.input_filter: Optional[Any] = None - self.color_filter: Optional[Any] = None self.autoreply_engine: Optional[Any] = None self.autoreply_wait_fn: Optional[Callable[..., Awaitable[None]]] = None self.typescript_file: Optional[IO[str]] = None diff --git a/telnetlib3/accessories.py b/telnetlib3/accessories.py index d57b9bf..86236cd 100644 --- a/telnetlib3/accessories.py +++ b/telnetlib3/accessories.py @@ -42,7 +42,7 @@ def get_version() -> str: """Return the current version of telnetlib3.""" - return "3.0.2" # keep in sync with pyproject.toml ! + return "4.0.0" # keep in sync with pyproject.toml ! def encoding_from_lang(lang: str) -> Optional[str]: diff --git a/telnetlib3/client.py b/telnetlib3/client.py index 30860bf..4bd13f6 100755 --- a/telnetlib3/client.py +++ b/telnetlib3/client.py @@ -685,56 +685,7 @@ def _patched_connection_made(transport: asyncio.BaseTransport) -> None: client_factory: Optional[Callable[..., client_base.BaseClient]] = _client_factory - # Wrap the shell callback to inject color filter when enabled - colormatch: str = args["colormatch"] shell_callback = args["shell"] - if colormatch.lower() != "none": - from .color_filter import ( - PALETTES, - ColorConfig, - ColorFilter, - PetsciiColorFilter, - AtasciiControlFilter, - ) - - # Auto-select encoding-specific filters - encoding_name: str = args.get("encoding", "") or "" - is_petscii = encoding_name.lower() in ("petscii", "cbm", "commodore", "c64", "c128") - is_atascii = encoding_name.lower() in ("atascii", "atari8bit", "atari_8bit") - if colormatch == "petscii": - colormatch = "c64" - if is_petscii and colormatch != "c64": - colormatch = "c64" - - if colormatch not in PALETTES: - print( - f"Unknown palette {colormatch!r}," f" available: {', '.join(sorted(PALETTES))}", - file=sys.stderr, - ) - sys.exit(1) - color_config = ColorConfig( - palette_name=colormatch, - brightness=args["color_brightness"], - contrast=args["color_contrast"], - background_color=args["background_color"], - ice_colors=args["ice_colors"], - ) - if is_petscii or colormatch == "c64": - color_filter_obj: object = PetsciiColorFilter(color_config) - elif is_atascii: - color_filter_obj = AtasciiControlFilter() - else: - color_filter_obj = ColorFilter(color_config) - original_shell = shell_callback - - async def _color_shell( - reader: Union[TelnetReader, TelnetReaderUnicode], - writer_arg: Union[TelnetWriter, TelnetWriterUnicode], - ) -> None: - writer_arg.ctx.color_filter = color_filter_obj - await original_shell(reader, writer_arg) - - shell_callback = _color_shell # Wrap shell to inject raw_mode flag and input translation for retro encodings raw_mode_val: Optional[bool] = args.get("raw_mode", False) @@ -894,44 +845,6 @@ def _get_argument_parser() -> argparse.ArgumentParser: metavar="OPT", help="always send DO for this option (name like GMCP or number, repeatable)", ) - parser.add_argument( - "--colormatch", - default="vga", - metavar="PALETTE", - help=( - "translate basic 16-color ANSI codes to exact 24-bit RGB values" - " from a named hardware palette, bypassing the terminal's custom" - " palette to preserve intended MUD/BBS artwork colors" - " (vga, xterm, none)" - ), - ) - parser.add_argument( - "--color-brightness", - default=1.0, - type=float, - metavar="FLOAT", - help="color brightness scale [0.0..1.0], where 1.0 is original", - ) - parser.add_argument( - "--color-contrast", - default=1.0, - type=float, - metavar="FLOAT", - help="color contrast scale [0.0..1.0], where 1.0 is original", - ) - parser.add_argument( - "--background-color", - default="#000000", - metavar="#RRGGBB", - help="forced background color as hex RGB (near-black by default)", - ) - parser.add_argument( - "--ice-colors", - action=argparse.BooleanOptionalAction, - default=True, - help="treat SGR 5 (blink) as bright background (iCE colors)" - " for BBS/ANSI art (default: enabled)", - ) parser.add_argument( "--ascii-eol", action="store_true", @@ -1013,20 +926,6 @@ def _parse_option_arg(value: str) -> bytes: return bytes([int(value)]) -def _parse_background_color(value: str) -> Tuple[int, int, int]: - """ - Parse hex color string to RGB tuple. - - :param value: Color string like ``"#RRGGBB"`` or ``"RRGGBB"``. - :returns: (R, G, B) tuple with values 0-255. - :raises ValueError: When *value* is not a valid hex color. - """ - h = value.lstrip("#") - if len(h) != 6: - raise ValueError(f"invalid hex color: {value!r}") - return (int(h[0:2], 16), int(h[2:4], 16), int(h[4:6], 16)) - - def _transform_args(args: argparse.Namespace) -> Dict[str, Any]: # Auto-enable force_binary for any non-ASCII encoding that uses high-bit bytes. from .encodings import FORCE_BINARY_ENCODINGS @@ -1075,11 +974,6 @@ def _transform_args(args: argparse.Namespace) -> Dict[str, Any]: "send_environ": tuple(v.strip() for v in args.send_environ.split(",") if v.strip()), "always_will": {_parse_option_arg(v) for v in args.always_will}, "always_do": {_parse_option_arg(v) for v in args.always_do}, - "colormatch": args.colormatch, - "color_brightness": args.color_brightness, - "color_contrast": args.color_contrast, - "background_color": _parse_background_color(args.background_color), - "ice_colors": args.ice_colors, "raw_mode": raw_mode, "ascii_eol": args.ascii_eol, "ansi_keys": args.ansi_keys, diff --git a/telnetlib3/client_shell.py b/telnetlib3/client_shell.py index e64e818..f5fdc6c 100644 --- a/telnetlib3/client_shell.py +++ b/telnetlib3/client_shell.py @@ -10,12 +10,14 @@ from dataclasses import dataclass # local +from . import slc as slc_module from . import accessories from ._session_context import TelnetSessionContext log = logging.getLogger(__name__) # local +from .telopt import LINEMODE # noqa: E402 from .accessories import TRACE # noqa: E402 from .stream_reader import TelnetReader, TelnetReaderUnicode # noqa: E402 from .stream_writer import TelnetWriter, TelnetWriterUnicode # noqa: E402 @@ -180,6 +182,93 @@ class _RawLoopState: reactivate_repl: bool = False +class LinemodeBuffer: + """ + Client-side line buffer for LINEMODE EDIT mode (RFC 1184 §3.1). + + Accumulates characters typed by the user, applying local SLC editing functions (erase-char, + erase-line, erase-word) and transmitting complete lines to the server. When TRAPSIG is enabled, + signal characters (^C etc.) are sent as IAC commands instead of buffered. + + :param slctab: The writer's current SLC character table. + :param forwardmask: FORWARDMASK received from server, or None. + :param trapsig: When True, signal characters are sent as IAC commands. + """ + + def __init__( + self, + slctab: Dict[bytes, slc_module.SLC], + forwardmask: Optional[slc_module.Forwardmask] = None, + trapsig: bool = False, + ) -> None: + """Initialize LinemodeBuffer.""" + from .telopt import IP, AYT, BRK, EOF, IAC, SUSP, ABORT + + self._buf: list[str] = [] + self.slctab = slctab + self.forwardmask = forwardmask + self.trapsig = trapsig + self._trapsig_map: Dict[bytes, bytes] = { + slc_module.SLC_IP: IAC + IP, + slc_module.SLC_ABORT: IAC + ABORT, + slc_module.SLC_SUSP: IAC + SUSP, + slc_module.SLC_EOF: IAC + EOF, + slc_module.SLC_BRK: IAC + BRK, + slc_module.SLC_AYT: IAC + AYT, + } + + def _slc_val(self, func: bytes) -> Optional[int]: + """Return the active byte value for SLC function, or None if unsupported.""" + defn = self.slctab.get(func) + if defn is None or defn.nosupport: + return None + v = defn.val + return ord(v) if v and v != slc_module.theNULL else None + + def feed(self, char: str) -> Tuple[str, Optional[bytes]]: + """ + Feed one character into the buffer. + + :returns: ``(echo, data)`` where ``echo`` is text to display locally + (may be empty) and ``data`` is bytes to send to server, or None + if buffering. + """ + b = ord(char) + if self.trapsig: + for func, cmd in self._trapsig_map.items(): + if b == self._slc_val(func): + return ("", cmd) + if b == self._slc_val(slc_module.SLC_EC): + if self._buf: + self._buf.pop() + return ("\b \b", None) + return ("", None) + if b == self._slc_val(slc_module.SLC_EL): + n = len(self._buf) + self._buf.clear() + return ("\b \b" * n, None) + if b == self._slc_val(slc_module.SLC_EW): + popped = 0 + # skip trailing spaces (POSIX VWERASE behaviour) + while self._buf and self._buf[-1] == " ": + self._buf.pop() + popped += 1 + while self._buf and self._buf[-1] != " ": + self._buf.pop() + popped += 1 + return ("\b \b" * popped, None) + if char in ("\r", "\n"): + line = "".join(self._buf) + char + self._buf.clear() + return (char, line.encode()) + if self.forwardmask is not None and b in self.forwardmask: + data = ("".join(self._buf) + char).encode() + self._buf.clear() + return (char, data) + self._buf.append(char) + return (char, None) + + if sys.platform == "win32": async def telnet_client_shell( @@ -314,10 +403,7 @@ def _server_will_sga(self) -> bool: from .telopt import SGA w = self.telnet_writer - return bool( - w.client - and (w.remote_option.enabled(SGA) or w.local_option.enabled(SGA)) - ) + return bool(w.client and (w.remote_option.enabled(SGA) or w.local_option.enabled(SGA))) def check_auto_mode( self, switched_to_raw: bool, last_will_echo: bool @@ -334,6 +420,20 @@ def check_auto_mode( return None wecho = self.telnet_writer.will_echo wsga = self._server_will_sga() + # LINEMODE EDIT: kernel must handle line editing; keep/restore cooked mode. + # This takes priority over the SGA/ECHO raw-mode heuristics below. + if ( + self.telnet_writer.local_option.enabled(LINEMODE) + and self.telnet_writer.linemode.edit + ): + if switched_to_raw: + assert self._save_mode is not None + self.set_mode(self._save_mode) + self.telnet_writer.log.debug( + "auto: LINEMODE EDIT confirmed, restoring cooked mode" + ) + return (False, wecho, False) + return None # WILL ECHO alone = line mode with server echo (suppress local echo) # WILL SGA (with or without ECHO) = raw/character-at-a-time should_go_raw = not switched_to_raw and wsga @@ -366,20 +466,35 @@ def determine_mode(self, mode: "Terminal.ModeDef") -> "Terminal.ModeDef": Auto mode (``_raw_mode is None``): follows the server's negotiation. - ================= ======== ========== ================================ + ================= ======== ========== ======================================== Server negotiates ICANON ECHO Behavior - ================= ======== ========== ================================ + ================= ======== ========== ======================================== Nothing on on Line mode, local echo + LINEMODE EDIT **on** on Cooked mode, kernel handles EC/EL/echo + LINEMODE remote **off** **off** Raw, server echoes WILL SGA only **off** on Character-at-a-time, local echo WILL ECHO only on **off** Line mode, server echoes WILL SGA + ECHO **off** **off** Full kludge mode (most common) - ================= ======== ========== ================================ + ================= ======== ========== ======================================== """ raw_mode = _get_raw_mode(self.telnet_writer) will_echo = self.telnet_writer.will_echo will_sga = self._server_will_sga() # Auto mode (None): follow server negotiation if raw_mode is None: + if self.telnet_writer.local_option.enabled(LINEMODE): + linemode_mode = self.telnet_writer.linemode + if linemode_mode.edit: + # RFC 1184 / NetBSD reference: LINEMODE EDIT means ICANON on. + # The kernel line discipline handles EC (VERASE), EL (VKILL), + # EW (VWERASE), and echo. No software line editing needed. + self.telnet_writer.log.debug( + "auto: LINEMODE EDIT, cooked mode (kernel line editing)" + ) + self.software_echo = False + return mode # keep ICANON on; kernel handles EC/EL/EW and echo + self.telnet_writer.log.debug("auto: LINEMODE remote, raw input server echo") + return self._make_raw(mode, suppress_echo=True) if will_echo and will_sga: self.telnet_writer.log.debug("auto: server echo + SGA, kludge mode") return self._make_raw(mode) @@ -453,17 +568,14 @@ def _transform_output( out: str, writer: Union[TelnetWriter, TelnetWriterUnicode], in_raw_mode: bool ) -> str: r""" - Apply color filter, ASCII EOL substitution, and CRLF normalization. + Apply ASCII EOL substitution and CRLF normalization. :param out: Server output text to transform. - :param writer: Telnet writer (``ctx`` provides color filter and ascii_eol). + :param writer: Telnet writer (``ctx`` provides ascii_eol). :param in_raw_mode: When ``True``, normalize line endings to ``\r\n``. :returns: Transformed output string. """ ctx: TelnetSessionContext = writer.ctx - cf = ctx.color_filter - if cf is not None: - out = cf.filter(out) if ctx.ascii_eol: out = out.replace(_ATASCII_CR_CHAR, "\r").replace(_ATASCII_LF_CHAR, "\n") if in_raw_mode: @@ -519,22 +631,24 @@ def _get_raw_mode(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "bool | N """Return the writer's ``ctx.raw_mode`` (``None``, ``True``, or ``False``).""" return writer.ctx.raw_mode - def _flush_color_filter( - writer: Union[TelnetWriter, TelnetWriterUnicode], stdout: asyncio.StreamWriter - ) -> None: - """Flush any pending color filter output to stdout.""" - cf = writer.ctx.color_filter - if cf is not None: - flush = cf.flush() - if flush: - stdout.write(flush.encode()) - def _ensure_autoreply_engine( telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], ) -> "Optional[Any]": """Return the autoreply engine from the writer's context, if set.""" return telnet_writer.ctx.autoreply_engine + def _get_linemode_buffer(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "LinemodeBuffer": + """Return (or lazily create) the LinemodeBuffer attached to *writer*.""" + buf: Optional[LinemodeBuffer] = getattr(writer, "_linemode_buf", None) + if buf is None: + buf = LinemodeBuffer( + slctab=writer.slctab, + forwardmask=writer.forwardmask, + trapsig=writer.linemode.trapsig, + ) + writer._linemode_buf = buf + return buf + async def _raw_event_loop( telnet_reader: Union[TelnetReader, TelnetReaderUnicode], telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], @@ -593,7 +707,26 @@ async def _raw_event_loop( wait_for.remove(telnet_task) handle_close("Connection closed.") break - new_timer, has_pending = _send_stdin(inp, telnet_writer, stdout, state.local_echo) + linemode_edit = ( + telnet_writer.local_option.enabled(LINEMODE) and telnet_writer.linemode.edit + ) + if linemode_edit and state.switched_to_raw: + # Raw PTY or non-TTY: kernel not doing line editing, use LinemodeBuffer + lmbuf = _get_linemode_buffer(telnet_writer) + for ch in inp.decode(errors="replace"): + echo, data = lmbuf.feed(ch) + if echo: + stdout.write(echo.encode()) + if data: + telnet_writer._write(data) + new_timer, has_pending = None, False + elif linemode_edit: + # Cooked PTY: kernel already handled EC/EL/echo; forward line directly + new_timer, has_pending = _send_stdin(inp, telnet_writer, stdout, False) + else: + new_timer, has_pending = _send_stdin( + inp, telnet_writer, stdout, state.local_echo + ) if has_pending and esc_timer_task not in wait_for: esc_timer_task = new_timer if esc_timer_task is not None: @@ -710,7 +843,6 @@ async def _wait_for_prompt_raw() -> None: stdout.write(f"Escape character is '{escape_name}'.{banner_sep}".encode()) def _handle_close(msg: str) -> None: - _flush_color_filter(telnet_writer, stdout) stdout.write(f"\033[m{linesep}{msg}{linesep}".encode()) tty_shell.cleanup_winch() diff --git a/telnetlib3/color_filter.py b/telnetlib3/color_filter.py deleted file mode 100644 index 6e3accb..0000000 --- a/telnetlib3/color_filter.py +++ /dev/null @@ -1,600 +0,0 @@ -""" -ANSI color palette translation for telnet client output. - -Most modern terminals use custom palette colors for ANSI colors 0-15 (e.g. -Solarized, Dracula, Gruvbox themes). When connecting to MUDs and BBS systems, -the artwork and text colors were designed for specific hardware palettes such as -IBM VGA or Commodore 64. The terminal's custom palette distorts the intended -colors, often ruining ANSI artwork. - -By translating basic 16-color SGR codes into their exact 24-bit RGB equivalents -from named hardware palettes, we bypass the terminal's palette entirely and -display the colors the artist intended. - -This feature is enabled by default using the VGA palette. Use -``--colormatch=none`` on the ``telnetlib3-client`` command line to disable it. - -Example usage:: - - # Default VGA palette with brightness/contrast adjustment - telnetlib3-client mud.example.com 4000 - - # Use xterm palette instead - telnetlib3-client --colormatch=xterm mud.example.com - - # Disable color translation entirely - telnetlib3-client --colormatch=none mud.example.com - - # Custom brightness and contrast - telnetlib3-client --color-brightness=0.7 --color-contrast=0.6 mud.example.com -""" - -from __future__ import annotations - -# std imports -import re -from typing import Dict, List, Match, Tuple, Optional, NamedTuple - -# 3rd party -from wcwidth.sgr_state import _SGR_PATTERN - -__all__ = ("AtasciiControlFilter", "ColorConfig", "ColorFilter", "PetsciiColorFilter", "PALETTES") - -# Type alias for a 16-color palette: 16 (R, G, B) tuples indexed 0-15. -# Index 0-7: normal colors (black, red, green, yellow, blue, magenta, cyan, white) -# Index 8-15: bright variants of the same order. -PaletteRGB = tuple[tuple[int, int, int], ...] - -# Hardware color palettes. Each defines exact RGB values for ANSI colors 0-15. -PALETTES: Dict[str, PaletteRGB] = { - # IBM VGA text-mode palette -- the classic DOS palette used by most - # BBS and MUD ANSI artwork. - "vga": ( - (0, 0, 0), - (170, 0, 0), - (0, 170, 0), - (170, 85, 0), - (0, 0, 170), - (170, 0, 170), - (0, 170, 170), - (170, 170, 170), - (85, 85, 85), - (255, 85, 85), - (85, 255, 85), - (255, 255, 85), - (85, 85, 255), - (255, 85, 255), - (85, 255, 255), - (255, 255, 255), - ), - # xterm default palette -- the standard xterm color table. - "xterm": ( - (0, 0, 0), - (205, 0, 0), - (0, 205, 0), - (205, 205, 0), - (0, 0, 238), - (205, 0, 205), - (0, 205, 205), - (229, 229, 229), - (127, 127, 127), - (255, 0, 0), - (0, 255, 0), - (255, 255, 0), - (92, 92, 255), - (255, 0, 255), - (0, 255, 255), - (255, 255, 255), - ), - # VIC-II C64 palette -- Colodore (Pepto) reference from VICE - # (colodore.vpl, https://www.colodore.com). - # Indexed by VIC-II color register 0-15, NOT ANSI SGR order. - "c64": ( - (0, 0, 0), # 0 black - (255, 255, 255), # 1 white - (150, 40, 46), # 2 red - (91, 214, 206), # 3 cyan - (159, 45, 173), # 4 purple - (65, 185, 54), # 5 green - (39, 36, 196), # 6 blue - (239, 243, 71), # 7 yellow - (159, 72, 21), # 8 orange - (94, 53, 0), # 9 brown - (218, 95, 102), # 10 pink / light red - (71, 71, 71), # 11 dark grey - (120, 120, 120), # 12 grey - (145, 255, 132), # 13 light green - (104, 100, 255), # 14 light blue - (174, 174, 174), # 15 light grey - ), -} - - -# Detect potentially incomplete escape sequence at end of a chunk. -_TRAILING_ESC = re.compile(r"\x1b(\[[\d;:]*)?$") - - -class ColorConfig(NamedTuple): - """ - Configuration for ANSI color palette translation. - - :param palette_name: Name of the hardware palette to use (key in PALETTES). - :param brightness: Brightness scale factor [0.0..1.0], where 1.0 is original. - :param contrast: Contrast scale factor [0.0..1.0], where 1.0 is original. - :param background_color: Forced background RGB as (R, G, B) tuple. - :param ice_colors: When True, treat SGR 5 (blink) as bright background - (iCE colors), promoting background 40-47 to palette 8-15. - """ - - palette_name: str = "vga" - brightness: float = 1.0 - contrast: float = 1.0 - background_color: Tuple[int, int, int] = (0, 0, 0) - ice_colors: bool = True - - -def _sgr_code_to_palette_index(code: int) -> Optional[int]: - """ - Map a basic SGR color code to a palette index (0-15). - - :param code: SGR parameter value (30-37, 40-47, 90-97, or 100-107). - :returns: Palette index 0-15, or None if not a basic color code. - """ - if 30 <= code <= 37: - return code - 30 - if 40 <= code <= 47: - return code - 40 - if 90 <= code <= 97: - return code - 90 + 8 - if 100 <= code <= 107: - return code - 100 + 8 - return None - - -def _is_foreground_code(code: int) -> bool: - """ - Return True if *code* is a foreground color SGR parameter. - - :param code: SGR parameter value. - :returns: True for foreground codes (30-37, 90-97). - """ - return (30 <= code <= 37) or (90 <= code <= 97) - - -def _adjust_color( - r: int, g: int, b: int, brightness: float, contrast: float -) -> Tuple[int, int, int]: - """ - Apply brightness and contrast scaling to an RGB color. - - Brightness scales linearly toward black (0.0 = black, 1.0 = original). - Contrast scales linearly toward mid-gray (0.0 = flat gray, 1.0 = original). - Result is clamped to 0-255. - - :param r: Red channel (0-255). - :param g: Green channel (0-255). - :param b: Blue channel (0-255). - :param brightness: Brightness factor [0.0..1.0]. - :param contrast: Contrast factor [0.0..1.0]. - :returns: Adjusted (R, G, B) tuple. - """ - mid = 127.5 - r_f = mid + (r * brightness - mid) * contrast - g_f = mid + (g * brightness - mid) * contrast - b_f = mid + (b * brightness - mid) * contrast - return ( - max(0, min(255, int(r_f + 0.5))), - max(0, min(255, int(g_f + 0.5))), - max(0, min(255, int(b_f + 0.5))), - ) - - -class ColorFilter: - """ - Stateful ANSI color palette translation filter. - - Translates basic 16-color ANSI SGR codes to 24-bit RGB equivalents from a named hardware - palette, with brightness/contrast adjustment and background color enforcement. - - The filter is designed to process chunked text (as received from a telnet connection) and - correctly handles escape sequences split across chunk boundaries. - - :param config: Color configuration parameters. - """ - - def __init__(self, config: ColorConfig) -> None: - """Initialize with the given color configuration.""" - self._config = config - palette = PALETTES[config.palette_name] - self._adjusted: List[Tuple[int, int, int]] = [ - _adjust_color(r, g, b, config.brightness, config.contrast) for r, g, b in palette - ] - bg = config.background_color - self._bg_sgr = f"\x1b[48;2;{bg[0]};{bg[1]};{bg[2]}m" - fg = self._adjusted[7] # default fg = white (palette index 7) - self._fg_sgr = f"\x1b[38;2;{fg[0]};{fg[1]};{fg[2]}m" - self._reset_bg_parts = ["48", "2", str(bg[0]), str(bg[1]), str(bg[2])] - self._reset_fg_parts = ["38", "2", str(fg[0]), str(fg[1]), str(fg[2])] - self._buffer = "" - self._initial = True - self._bold = False - self._blink = False - self._fg_idx = 7 # current fg palette index (0-15), -1 for extended - - def filter(self, text: str) -> str: - """ - Transform SGR sequences in *text* using the configured palette. - - Handles chunked input by buffering incomplete trailing escape sequences across calls. On - the very first non-empty output, the configured background color is injected. - - :param text: Input text, possibly containing ANSI escape sequences. - :returns: Text with basic colors replaced by 24-bit RGB equivalents. - """ - if self._buffer: - text = self._buffer + text - self._buffer = "" - - match = _TRAILING_ESC.search(text) - if match: - self._buffer = match.group() - text = text[: match.start()] - - if not text: - return "" - - result = _SGR_PATTERN.sub(self._replace_sgr, text) - - if self._initial: - self._initial = False - result = self._bg_sgr + result - return result - - def _replace_sgr(self, match: Match[str]) -> str: # noqa: C901 - r""" - Regex replacement callback for a single SGR sequence. - - Tracks bold state across calls so that ``\x1b[1;30m`` (bold + black) uses the bright palette - entry (index 8) instead of pure black. This preserves the traditional "bold as bright" - rendering that legacy systems rely on, which would otherwise be lost when converting to - 24-bit RGB (terminals do not brighten true-color values for bold). - """ - params_str = match.group(1) - - # Empty params or bare "0" -> reset - if not params_str: - self._bold = False - self._blink = False - self._fg_idx = 7 - return f"\x1b[0m{self._bg_sgr}{self._fg_sgr}" - - # Colon-separated extended colors (ITU T.416) -- pass through unchanged - if ":" in params_str: - return match.group() - - parts = params_str.split(";") - output_parts: List[str] = [] - i = 0 - - # Pre-scan: check if bold (1), blink (5), or explicit foreground - # colors appear in this sequence so that a color code *before* the - # attribute in the same sequence still gets the bright treatment, - # e.g. \x1b[31;1m should brighten red and \x1b[41;5m should - # brighten background. - seq_sets_bold = False - seq_sets_blink = False - seq_has_fg = False - for part in parts: - try: - val = int(part) if part else 0 - except ValueError: - continue - if val == 1: - seq_sets_bold = True - elif val == 5: - seq_sets_blink = True - if (30 <= val <= 37) or (90 <= val <= 97) or val in (38, 39): - seq_has_fg = True - - # Effective bold/blink for color lookups in this sequence - bold = self._bold or seq_sets_bold - ice = self._config.ice_colors - blink = self._blink or (seq_sets_blink and ice) - - while i < len(parts): - try: - p = int(parts[i]) if parts[i] else 0 - except ValueError: - output_parts.append(parts[i]) - i += 1 - continue - - if p == 0: - bold = False - blink = False - output_parts.append("0") - output_parts.extend(self._reset_bg_parts) - output_parts.extend(self._reset_fg_parts) - self._fg_idx = 7 - i += 1 - continue - - # Track bold state - if p == 1: - bold = True - output_parts.append("1") - if not seq_has_fg and 0 <= self._fg_idx <= 7: - bright_idx = self._fg_idx + 8 - r, g, b = self._adjusted[bright_idx] - output_parts.extend(["38", "2", str(r), str(g), str(b)]) - i += 1 - continue - if p == 22: - bold = False - output_parts.append("22") - if not seq_has_fg and 0 <= self._fg_idx <= 7: - r, g, b = self._adjusted[self._fg_idx] - output_parts.extend(["38", "2", str(r), str(g), str(b)]) - i += 1 - continue - - # Track blink state - if p == 5: - if ice: - blink = True - else: - output_parts.append("5") - i += 1 - continue - if p == 25: - blink = False - if not ice: - output_parts.append("25") - i += 1 - continue - - # Extended color -- pass through 38;5;N or 38;2;R;G;B verbatim - if p in (38, 48): - if p == 38: - self._fg_idx = -1 - start_i = i - i += 1 - if i < len(parts): - try: - mode = int(parts[i]) if parts[i] else 0 - except ValueError: - mode = 0 - i += 1 - if mode == 5 and i < len(parts): - i += 1 - elif mode == 2 and i + 2 < len(parts): - i += 3 - output_parts.extend(parts[start_i:i]) - continue - - # Default fg -> palette white; default bg -> configured bg - if p == 39: - self._fg_idx = 7 - r, g, b = self._adjusted[7] - output_parts.extend(["38", "2", str(r), str(g), str(b)]) - i += 1 - continue - if p == 49: - bg = self._config.background_color - output_parts.extend(["48", "2", str(bg[0]), str(bg[1]), str(bg[2])]) - i += 1 - continue - - idx = _sgr_code_to_palette_index(p) - if idx is not None: - is_fg = _is_foreground_code(p) - if is_fg: - self._fg_idx = idx - # Bold-as-bright: promote normal fg 30-37 to bright 8-15 - if is_fg and bold and 30 <= p <= 37: - idx += 8 - # iCE colors: promote normal bg 40-47 to bright 8-15 - if not is_fg and blink and 40 <= p <= 47: - idx += 8 - r, g, b = self._adjusted[idx] - if is_fg: - output_parts.extend(["38", "2", str(r), str(g), str(b)]) - else: - output_parts.extend(["48", "2", str(r), str(g), str(b)]) - else: - output_parts.append(str(p)) - i += 1 - - # Update persistent bold/blink state for subsequent sequences - self._bold = bold - self._blink = blink - - result = f"\x1b[{';'.join(output_parts)}m" if output_parts else "" - return result - - def flush(self) -> str: - """ - Flush any buffered partial escape sequence. - - Call this when the stream closes to emit any remaining buffered bytes. - - :returns: Buffered content (may be an incomplete escape sequence). - """ - result = self._buffer - self._buffer = "" - return result - - -# PETSCII decoded control character -> VIC-II palette index (0-15). -_PETSCII_COLOR_CODES: Dict[str, int] = { - "\x05": 1, # WHT (white) - "\x1c": 2, # RED - "\x1e": 5, # GRN (green) - "\x1f": 6, # BLU (blue) - "\x81": 8, # ORN (orange) - "\x90": 0, # BLK (black) - "\x95": 9, # BRN (brown) - "\x96": 10, # LRD (pink / light red) - "\x97": 11, # GR1 (dark grey) - "\x98": 12, # GR2 (grey) - "\x99": 13, # LGR (light green) - "\x9a": 14, # LBL (light blue) - "\x9b": 15, # GR3 (light grey) - "\x9c": 4, # PUR (purple) - "\x9e": 7, # YEL (yellow) - "\x9f": 3, # CYN (cyan) -} - -# PETSCII cursor/screen control codes -> ANSI escape sequences. -_PETSCII_CURSOR_CODES: Dict[str, str] = { - "\x11": "\x1b[B", # cursor down - "\x91": "\x1b[A", # cursor up - "\x1d": "\x1b[C", # cursor right - "\x9d": "\x1b[D", # cursor left - "\x13": "\x1b[H", # HOME (cursor to top-left) - "\x93": "\x1b[2J", # CLR (clear screen) - "\x14": "\x08\x1b[P", # DEL (destructive backspace) -} - -# All PETSCII control chars handled by the filter. -_PETSCII_FILTER_CHARS = ( - frozenset(_PETSCII_COLOR_CODES) | frozenset(_PETSCII_CURSOR_CODES) | {"\x12", "\x92"} -) - -# Precompiled pattern matching any single PETSCII control character that -# the filter should consume (color codes, cursor codes, RVS ON/OFF). -_PETSCII_CTRL_RE = re.compile("[" + re.escape("".join(sorted(_PETSCII_FILTER_CHARS))) + "]") - - -class PetsciiColorFilter: - r""" - Translate PETSCII control codes to ANSI sequences. - - PETSCII uses single-byte control codes embedded in the text stream for - color changes, cursor movement, and screen control. This filter - translates them to ANSI equivalents: - - - **Colors**: 16 VIC-II palette colors -> ``\x1b[38;2;R;G;Bm`` (24-bit RGB) - - **Reverse video**: RVS ON/OFF -> ``\x1b[7m`` / ``\x1b[27m`` - - **Cursor**: up/down/left/right -> ``\x1b[A/B/C/D`` - - **Screen**: HOME -> ``\x1b[H``, CLR -> ``\x1b[2J`` - - **DEL**: destructive backspace -> ``\x08\x1b[P`` - - :param config: Color configuration (uses ``brightness`` and ``contrast`` - for palette adjustment; ``palette_name`` is ignored -- always C64). - """ - - def __init__(self, config: Optional[ColorConfig] = None) -> None: - """Initialize PETSCII filter with optional color configuration.""" - palette = PALETTES["c64"] - if config is not None: - brightness = config.brightness - contrast = config.contrast - else: - brightness = 1.0 - contrast = 1.0 - self._adjusted: List[Tuple[int, int, int]] = [ - _adjust_color(r, g, b, brightness, contrast) for r, g, b in palette - ] - - def _sgr_for_index(self, idx: int) -> str: - """Return a 24-bit foreground SGR sequence for palette *idx*.""" - r, g, b = self._adjusted[idx] - return f"\x1b[38;2;{r};{g};{b}m" - - def filter(self, text: str) -> str: - """ - Replace PETSCII control codes with ANSI sequences. - - PETSCII control characters (colors, cursor, screen) are replaced with their ANSI - equivalents. All other characters pass through unchanged. - - :param text: Decoded PETSCII text (Unicode string). - :returns: Text with PETSCII controls translated to ANSI. - """ - if not _PETSCII_CTRL_RE.search(text): - return text - return _PETSCII_CTRL_RE.sub(self._replace, text) - - def _replace(self, match: Match[str]) -> str: - """Regex callback for a single PETSCII control character.""" - ch = match.group() - idx = _PETSCII_COLOR_CODES.get(ch) - if idx is not None: - return self._sgr_for_index(idx) - cursor = _PETSCII_CURSOR_CODES.get(ch) - if cursor is not None: - return cursor - if ch == "\x12": - return "\x1b[7m" - if ch == "\x92": - return "\x1b[27m" - return "" - - def flush(self) -> str: - """ - Flush buffered state. - - PETSCII color codes are single-byte, so no buffering is needed. - - :returns: Always ``""``. - """ - return "" - - -# ATASCII decoded control character glyphs -> ANSI terminal sequences. -# The atascii codec decodes control bytes to Unicode glyphs; this map -# translates those glyphs to the terminal actions they represent. -_ATASCII_CONTROL_CODES: Dict[str, str] = { - "\u25c0": "\x08\x1b[P", # ◀ backspace/delete (0x7E / 0xFE) - "\u25b6": "\t", # ▶ tab (0x7F / 0xFF) - "\u21b0": "\x1b[2J\x1b[H", # ↰ clear screen (0x7D / 0xFD) - "\u2191": "\x1b[A", # ↑ cursor up (0x1C / 0x9C) - "\u2193": "\x1b[B", # ↓ cursor down (0x1D / 0x9D) - "\u2190": "\x1b[D", # ← cursor left (0x1E / 0x9E) - "\u2192": "\x1b[C", # -> cursor right (0x1F / 0x9F) -} - -_ATASCII_CTRL_RE = re.compile("[" + re.escape("".join(sorted(_ATASCII_CONTROL_CODES))) + "]") - - -class AtasciiControlFilter: - r""" - Translate decoded ATASCII control character glyphs to ANSI sequences. - - The ``atascii`` codec decodes ATASCII control bytes into Unicode glyphs - (e.g. byte 0x7E -> U+25C0 ◀). This filter replaces those glyphs with - the ANSI terminal sequences that produce the intended effect: - - - **Backspace/delete**: ◀ -> ``\x08\x1b[P`` (destructive backspace) - - **Tab**: ▶ -> ``\t`` - - **Clear screen**: ↰ -> ``\x1b[2J\x1b[H`` - - **Cursor movement**: ↑↓←-> -> ``\x1b[A/B/D/C`` - """ - - def filter(self, text: str) -> str: - """ - Replace ATASCII control glyphs with ANSI sequences. - - :param text: Decoded ATASCII text (Unicode string). - :returns: Text with control glyphs translated to ANSI. - """ - if not _ATASCII_CTRL_RE.search(text): - return text - return _ATASCII_CTRL_RE.sub(self._replace, text) - - @staticmethod - def _replace(match: Match[str]) -> str: - """Regex callback for a single ATASCII control glyph.""" - return _ATASCII_CONTROL_CODES.get(match.group(), "") - - @staticmethod - def flush() -> str: - """ - Flush buffered state. - - ATASCII control glyphs are single characters, so no buffering is needed. - - :returns: Always ``""``. - """ - return "" diff --git a/telnetlib3/server.py b/telnetlib3/server.py index 123f4e5..863677b 100755 --- a/telnetlib3/server.py +++ b/telnetlib3/server.py @@ -42,7 +42,14 @@ except ImportError: PTY_SUPPORT = False -__all__ = ("TelnetServer", "Server", "create_server", "run_server", "parse_server_args") +__all__ = ( + "TelnetServer", + "LinemodeServer", + "Server", + "create_server", + "run_server", + "parse_server_args", +) class CONFIG(NamedTuple): @@ -806,6 +813,42 @@ def connection_lost(self, exc: Optional[Exception]) -> None: _ = exc +class LinemodeServer(TelnetServer): + """ + :class:`TelnetServer` subclass that negotiates LINEMODE EDIT. + + In addition to the standard options negotiated by :class:`TelnetServer`, + this server sends ``DO LINEMODE`` during advanced negotiation, proposes + LINEMODE EDIT (local line editing by the client), and suppresses + ``WILL ECHO`` so the client performs local echoing via its LINEMODE buffer. + + Use with :func:`create_server` to enable RFC 1184 LINEMODE EDIT on a + :func:`~.telnet_server_shell` session or any custom shell. + """ + + from . import slc as _slc_module + + #: Propose LINEMODE EDIT (local line editing) instead of remote mode. + default_linemode = _slc_module.Linemode(_slc_module.LMODE_MODE_LOCAL) + + def begin_advanced_negotiation(self) -> None: + """Negotiate standard options plus ``DO LINEMODE``.""" + from .telopt import DO, LINEMODE + + super().begin_advanced_negotiation() + # Propagate the protocol-level default_linemode to the writer so that + # TelnetWriter.handle_will(LINEMODE) proposes the correct mode (LOCAL/EDIT) + # rather than the TelnetWriter class default (REMOTE). + self.writer.default_linemode = self.default_linemode + self.writer.iac(DO, LINEMODE) + + def _negotiate_echo(self) -> None: + """Skip ``WILL ECHO`` — LINEMODE EDIT client handles local echo.""" + if self._echo_negotiated: + return + self._echo_negotiated = True + + class Server: """ Telnet server that tracks connected clients. diff --git a/telnetlib3/server_fingerprinting.py b/telnetlib3/server_fingerprinting.py index 783df84..64c7386 100644 --- a/telnetlib3/server_fingerprinting.py +++ b/telnetlib3/server_fingerprinting.py @@ -875,9 +875,9 @@ def _format_banner(data: bytes, encoding: str = "utf-8") -> str: Falls back to ``latin-1`` when the requested encoding is unavailable (e.g. a server-advertised charset that Python does not recognise). - When *encoding* is ``petscii``, inline PETSCII color control codes - are translated to ANSI 24-bit RGB SGR sequences using the VIC-II - C64 palette so the saved banner is human-readable with colors. + When *encoding* is ``petscii``, non-printable PETSCII control + characters (color, cursor, RVS) are stripped so the saved banner + contains only readable text. :param data: Raw bytes from the server. :param encoding: Character encoding to use for decoding. @@ -889,9 +889,7 @@ def _format_banner(data: bytes, encoding: str = "utf-8") -> str: text = data.decode("latin-1") if encoding.lower() in ("petscii", "cbm", "commodore", "c64", "c128"): - from .color_filter import PetsciiColorFilter - - text = PetsciiColorFilter().filter(text) + text = re.sub(r"[\x05\x11-\x14\x1c-\x1f\x81\x90-\x9f]", "", text) # PETSCII uses CR (0x0D) as line terminator; normalize to LF. text = text.replace("\r\n", "\n").replace("\r", "\n") diff --git a/telnetlib3/server_shell.py b/telnetlib3/server_shell.py index 85565f2..35a75ed 100644 --- a/telnetlib3/server_shell.py +++ b/telnetlib3/server_shell.py @@ -137,7 +137,14 @@ def feed(self, char: str) -> tuple[str, Optional[str]]: return char, None -__all__ = ("telnet_server_shell", "readline_async", "readline") +__all__ = ( + "telnet_server_shell", + "readline_async", + "readline", + "get_linemode", + "get_slcdata", + "do_toggle", +) async def telnet_server_shell( @@ -179,7 +186,7 @@ async def telnet_server_shell( writer.write("Goodbye." + CR + LF) break if command == "help": - writer.write("quit, writer, slc, toggle [option|all], reader, proto, dump") + writer.write("quit, writer, slc, linemode, toggle [option|all], reader, proto, dump") elif command == "writer": # show 'writer' status writer.write(repr(writer)) @@ -194,6 +201,8 @@ async def telnet_server_shell( elif command == "slc": # show 'slc' support and data tables writer.write(get_slcdata(writer)) + elif command == "linemode": + writer.write(get_linemode(writer)) elif command.startswith("toggle"): # toggle specified options option = command[len("toggle ") :] or None @@ -342,8 +351,25 @@ def get_slcdata(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> str: ) +def get_linemode(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> str: + """Display current LINEMODE negotiation state.""" + active = writer.remote_option.enabled(telopt.LINEMODE) + if not active: + return "LINEMODE not negotiated." + lm = writer.linemode + bits = ( + f"EDIT={'on' if lm.edit else 'off'}" + f" TRAPSIG={'on' if lm.trapsig else 'off'}" + f" SOFT_TAB={'on' if lm.soft_tab else 'off'}" + f" LIT_ECHO={'on' if lm.lit_echo else 'off'}" + f" ACK={'on' if lm.ack else 'off'}" + ) + return f"LINEMODE active. Mode: {writer.mode}\r\n{bits}" + + def do_toggle(writer: Union[TelnetWriter, TelnetWriterUnicode], option: Optional[str]) -> str: """Display or toggle telnet session parameters.""" + linemode_active = writer.remote_option.enabled(telopt.LINEMODE) tbl_opt = { "echo": writer.local_option.enabled(telopt.ECHO), "goahead": not writer.local_option.enabled(telopt.SGA), @@ -352,6 +378,9 @@ def do_toggle(writer: Union[TelnetWriter, TelnetWriterUnicode], option: Optional "binary": writer.outbinary and writer.inbinary, "xon-any": writer.xon_any, "lflow": writer.lflow, + "linemode": linemode_active, + "linemode-edit": writer.linemode.edit if linemode_active else False, + "linemode-trapsig": writer.linemode.trapsig if linemode_active else False, } if not option: @@ -390,6 +419,25 @@ def do_toggle(writer: Union[TelnetWriter, TelnetWriterUnicode], option: Optional writer.send_lineflow_mode() msgs.append(f"lineflow {'en' if writer.lflow else 'dis'}abled.") + if option in ("linemode",): + cmd = telopt.DONT if tbl_opt["linemode"] else telopt.DO + writer.iac(cmd, telopt.LINEMODE) + msgs.append(f"{telopt.name_command(cmd).lower()} linemode.") + + if option in ("linemode-edit",): + if not tbl_opt["linemode"]: + msgs.append("linemode not active.") + else: + writer.request_linemode_change(edit=not tbl_opt["linemode-edit"]) + msgs.append(f"linemode-edit {'dis' if tbl_opt['linemode-edit'] else 'en'}abled.") + + if option in ("linemode-trapsig",): + if not tbl_opt["linemode"]: + msgs.append("linemode not active.") + else: + writer.request_linemode_change(trapsig=not tbl_opt["linemode-trapsig"]) + msgs.append(f"linemode-trapsig {'dis' if tbl_opt['linemode-trapsig'] else 'en'}abled.") + if option not in tbl_opt and option != "all": msgs.append("toggle: not an option.") diff --git a/telnetlib3/slc.py b/telnetlib3/slc.py index 54c7496..69aee01 100644 --- a/telnetlib3/slc.py +++ b/telnetlib3/slc.py @@ -17,6 +17,7 @@ "Linemode", "LMODE_FORWARDMASK", "LMODE_MODE", + "LMODE_MODE_EDIT", "LMODE_MODE_REMOTE", "LMODE_SLC", "name_slc_command", @@ -89,6 +90,9 @@ LMODE_MODE_REMOTE, LMODE_MODE_LOCAL, LMODE_MODE_TRAPSIG = (bytes([const]) for const in range(3)) LMODE_MODE_ACK, LMODE_MODE_SOFT_TAB, LMODE_MODE_LIT_ECHO = (bytes([4]), bytes([8]), bytes([16])) +#: RFC 1184's name for LMODE_MODE_LOCAL (EDIT bit) +LMODE_MODE_EDIT = LMODE_MODE_LOCAL + class SLC: """Defines the willingness to support a Special Linemode Character.""" @@ -315,6 +319,15 @@ def local(self) -> bool: """True if linemode is local.""" return bool(ord(self.mask) & ord(LMODE_MODE_LOCAL)) + @property + def edit(self) -> bool: + """ + True if EDIT bit set (client performs local line editing). + + RFC 1184 name. + """ + return self.local + @property def remote(self) -> bool: """True if linemode is remote.""" diff --git a/telnetlib3/stream_writer.py b/telnetlib3/stream_writer.py index b3b2b18..57307ee 100644 --- a/telnetlib3/stream_writer.py +++ b/telnetlib3/stream_writer.py @@ -318,6 +318,12 @@ def __init__( #: attribute ``ack`` returns True if it is in use. self._linemode = slc.Linemode() + #: LINEMODE FORWARDMASK received from server, or None. + self._forwardmask: Optional[slc.Forwardmask] = None + + #: True once the server has sent its SLC table after MODE ACK. + self._slc_sent: bool = False + self._connection_closed = False # Set default callback handlers to local methods. A base protocol @@ -1330,6 +1336,38 @@ def send_linemode(self, linemode: Optional[slc.Linemode] = None) -> None: self._transport.write(self._linemode.mask) self.send_iac(IAC + SE) + def request_linemode_change( + self, + edit: Optional[bool] = None, + trapsig: Optional[bool] = None, + soft_tab: Optional[bool] = None, + lit_echo: Optional[bool] = None, + ) -> None: + """ + Request a LINEMODE mode change. + + Server-side only. Each keyword arg, if not None, enables or disables the corresponding bit + in the LINEMODE MODE mask. + + :param edit: Set the EDIT (local line-editing) bit. + :param trapsig: Set the TRAPSIG bit. + :param soft_tab: Set the SOFT_TAB bit. + :param lit_echo: Set the LIT_ECHO bit. + """ + mask = ord(self._linemode.mask) + flag_map = ( + (edit, ord(slc.LMODE_MODE_LOCAL)), + (trapsig, ord(slc.LMODE_MODE_TRAPSIG)), + (soft_tab, ord(slc.LMODE_MODE_SOFT_TAB)), + (lit_echo, ord(slc.LMODE_MODE_LIT_ECHO)), + ) + for value, bit in flag_map: + if value is True: + mask |= bit + elif value is False: + mask &= ~bit + self.send_linemode(slc.Linemode(bytes([mask]))) + # Public is-a-command (IAC) callbacks # def set_iac_callback(self, cmd: bytes, func: Callable[..., Any]) -> None: @@ -1876,6 +1914,13 @@ def handle_do(self, opt: bytes) -> bool: # Note that CHARSET is not included -- either side that has sent # WILL and received DO may initiate SB at any time. self.pending_option[SB + opt] = True + if opt == LINEMODE and self.client: + # Client has agreed to LINEMODE; mark as locally active. + self.local_option[LINEMODE] = True + # RFC 1184: client initiates SLC exchange immediately after WILL LINEMODE + self._slc_start() + self._slc_add(theNULL, slc.SLC(slc.SLC_DEFAULT, theNULL)) + self._slc_end() elif opt in self.always_will: if not self.local_option.enabled(opt): @@ -2607,6 +2652,12 @@ def _handle_sb_linemode_mode(self, mode: collections.deque[bytes]) -> None: mask=bytes([ord(suggest_mode.mask) | ord(slc.LMODE_MODE_ACK)]) ) ) + # Record the agreed mode (with ACK bit) so that duplicate proposals + # are suppressed and the client-ACK path comparison succeeds. + # __eq__ masks out the ACK bit, so deduplication still works. + self._linemode = slc.Linemode( + mask=bytes([ord(suggest_mode.mask) | ord(slc.LMODE_MODE_ACK)]) + ) return # " In all cases, a response is never generated to a MODE @@ -2645,23 +2696,31 @@ def _handle_sb_linemode_mode(self, mode: collections.deque[bytes]) -> None: self._linemode = suggest_mode + if self.server and not self._slc_sent: + self._slc_start() + self._slc_send() + self._slc_end() + self._slc_sent = True + def _handle_sb_linemode_slc(self, buf: collections.deque[bytes]) -> None: """ Callback handles IAC-SB-LINEMODE-SLC-. Processes SLC command function triplets found in ``buf`` and replies - accordingly. + with any changes. An empty reply is never sent — that would trigger + an infinite echo loop between client and server. """ - if not len(buf) - 2 % 3: - raise ValueError(f"SLC buffer wrong size: expect multiple of 3: {len(buf) - 2}") - self._slc_start() + if len(buf) % 3 != 0: + raise ValueError(f"SLC buffer wrong size: expect multiple of 3: {len(buf)}") while len(buf): func = buf.popleft() flag = buf.popleft() value = buf.popleft() slc_def = slc.SLC(flag, value) self._slc_process(func, slc_def) - self._slc_end() + if self._slc_buffer: + self._slc_start() + self._slc_end() if self.server: self.request_forwardmask() @@ -2697,10 +2756,11 @@ def _slc_send(self, slctab: Optional[dict[bytes, slc.SLC]] = None) -> None: continue _default = slc.SLC_nosupport() - if self.slctab.get(bytes([func]), _default).nosupport: + if slctab.get(bytes([func]), _default).nosupport: continue - self._slc_add(bytes([func])) + slc_def = slctab.get(bytes([func])) + self._slc_add(bytes([func]), slc_def) send_count += 1 self.log.debug("slc_send: %s functions queued.", send_count) @@ -2740,9 +2800,13 @@ def _slc_process(self, func: bytes, slc_def: slc.SLC) -> None: # process special request if func == theNULL: if slc_def.level == slc.SLC_DEFAULT: - # client requests we send our default tab, + # client requests we send our default tab; reset current to defaults + # (analogous to NetBSD default_slc() before send_slc()) self.log.debug("_slc_process: client request SLC_DEFAULT") + self.slctab = dict(self.default_slc_tab) self._slc_send(self.default_slc_tab) + if self.server: + self._slc_sent = True elif slc_def.level == slc.SLC_VARIABLE: # client requests we send our current tab, self.log.debug("_slc_process: client request SLC_VARIABLE") @@ -2799,8 +2863,8 @@ def _slc_change(self, func: bytes, slc_def: slc.SLC) -> None: default_slc = self.default_slc_tab.get(func) self.slctab[func].set_mask(default_slc.mask) # set current value to value indicated in default tab - self.default_slc_tab.get(func, slc.SLC_nosupport()) - self.slctab[func].set_value(slc_def.val) + default_def = self.default_slc_tab.get(func, slc.SLC_nosupport()) + self.slctab[func].set_value(default_def.val) self._slc_add(func) return @@ -3099,7 +3163,16 @@ def _handle_do_forwardmask(self, buf: collections.deque[bytes]) -> None: :param buf: bytes following IAC SB LINEMODE DO FORWARDMASK. """ mask = b"".join(buf) - self.log.debug("FORWARDMASK received (%d bytes), not applied", len(mask)) + if 1 <= len(mask) <= 32: + self._forwardmask = slc.Forwardmask(mask) + self.log.debug("FORWARDMASK stored (%d bytes)", len(mask)) + else: + self.log.warning("FORWARDMASK invalid length: %d", len(mask)) + + @property + def forwardmask(self) -> Optional[slc.Forwardmask]: + """Received LINEMODE FORWARDMASK, or None.""" + return self._forwardmask class TelnetWriterUnicode(TelnetWriter): diff --git a/telnetlib3/tests/test_client_shell.py b/telnetlib3/tests/test_client_shell.py index 79da644..c22f200 100644 --- a/telnetlib3/tests/test_client_shell.py +++ b/telnetlib3/tests/test_client_shell.py @@ -8,7 +8,6 @@ # 3rd party import pytest -import pexpect # local from telnetlib3._session_context import TelnetSessionContext @@ -364,7 +363,7 @@ def test_filter_without_eol_xlat(data: bytes, expected: bytes) -> None: import subprocess # noqa: E402 # local -from telnetlib3.tests.accessories import asyncio_server +from telnetlib3.tests.accessories import create_server, asyncio_server _IAC = b"\xff" _WILL = b"\xfb" @@ -387,9 +386,10 @@ def _strip_iac(data: bytes) -> bytes: def _client_cmd(host: str, port: int, extra: "list[str] | None" = None) -> "list[str]": - prog = pexpect.which("telnetlib3-client") - assert prog is not None - args = [prog, host, str(port), "--connect-maxwait=0.5", "--colormatch=none"] + # Use sys.executable so the subprocess uses the same Python interpreter and + # telnetlib3 package as the test process, not whatever pyenv shim happens to + # be on PATH (which may point to a different Python version or install). + args = [sys.executable, "-m", "telnetlib3.client", host, str(port), "--connect-maxwait=0.5"] if extra: args.extend(extra) return args @@ -749,7 +749,6 @@ def at_eof(self) -> bool: writer = _make_writer() writer.log = types.SimpleNamespace(debug=lambda *a, **kw: None, log=lambda *a, **kw: None) writer.ctx.raw_mode = None - writer.ctx.color_filter = None writer.ctx.ascii_eol = False writer.ctx.autoreply_engine = None writer.ctx.autoreply_rules = [] @@ -809,7 +808,6 @@ def at_eof(self) -> bool: writer = _make_writer() writer.log = types.SimpleNamespace(debug=lambda *a, **kw: None, log=lambda *a, **kw: None) writer.ctx.raw_mode = True - writer.ctx.color_filter = None writer.ctx.ascii_eol = False writer.ctx.autoreply_engine = None writer.ctx.input_filter = None @@ -979,3 +977,47 @@ def connection_made(self, transport): assert idx != -1 after = text[idx + len(line) :] assert after.startswith("\r\n") + + +async def test_linemode_edit_via_telsh(bind_host: str, unused_tcp_port: int) -> None: + """ + LinemodeBuffer is exercised end-to-end via LinemodeServer + telnet_server_shell. + + Covers _get_linemode_buffer, _raw_event_loop LINEMODE EDIT path, EC (erase-char), EL (erase- + line), and line transmission. + """ + from telnetlib3.server import LinemodeServer + from telnetlib3.server_shell import telnet_server_shell + + async with create_server( + protocol_factory=LinemodeServer, + shell=telnet_server_shell, + host=bind_host, + port=unused_tcp_port, + connect_maxwait=0.5, + ): + cmd = _client_cmd(bind_host, unused_tcp_port) + + def _interact(master_fd: int, proc: "subprocess.Popen[bytes]") -> bytes: + # Wait for the telsh prompt — LINEMODE negotiation has completed by then + buf = _pty_read(master_fd, marker=b"tel:sh>", timeout=12.0) + # EC test: type "helo" + 0x7F (EC = delete-char) + "lo" + CR + # LinemodeBuffer: "helo" → EC deletes 'o' → "hel" + "lo" → "hello", CR sends it + os.write(master_fd, b"helo\x7flo\r") + buf += _pty_read(master_fd, marker=b"no such", timeout=8.0) + # EL test: type "garbage" + 0x15 (EL = erase-line) + "slc" + CR + os.write(master_fd, b"garbage\x15slc\r") + buf += _pty_read(master_fd, marker=b"Special Line", timeout=8.0) + # Tidy up + os.write(master_fd, b"quit\r") + buf += _pty_read(master_fd, proc=proc, timeout=5.0) + return buf + + with _pty_client(cmd) as (proc, master_fd): + output = await asyncio.to_thread(_interact, master_fd, proc) + # Local echo from LinemodeBuffer should include backspace sequence + assert b"\x08 \x08" in output + # "hello" was transmitted intact and shell replied + assert b"no such command" in output + # After EL, "slc" was sent clean and shell replied with SLC table + assert b"Special Line Characters" in output diff --git a/telnetlib3/tests/test_client_unit.py b/telnetlib3/tests/test_client_unit.py index 5fa613a..e1c74dc 100644 --- a/telnetlib3/tests/test_client_unit.py +++ b/telnetlib3/tests/test_client_unit.py @@ -198,13 +198,6 @@ def test_argument_parser_typescript(): assert defaults.typescript is None -def test_argument_parser_ice_colors(): - parser = cl._get_argument_parser() - args = parser.parse_args(["host", "--typescript", "out.log", "--no-ice-colors"]) - assert args.typescript == "out.log" - assert args.ice_colors is False - - def test_transform_args(): parser = cl._get_argument_parser() result = cl._transform_args( @@ -262,12 +255,6 @@ def test_detect_syncterm_font_sets_force_binary(): assert client.force_binary is True -@pytest.mark.parametrize("extra_args,expected", [([], "vga"), (["--colormatch", "xterm"], "xterm")]) -def test_transform_args_colormatch(extra_args, expected): - parser = cl._get_argument_parser() - assert cl._transform_args(parser.parse_args(["myhost"] + extra_args))["colormatch"] == expected - - def test_guard_shells_connection_counter(): from telnetlib3.guard_shells import ConnectionCounter @@ -368,50 +355,6 @@ async def _fake_open_connection(*args, **kwargs): return _fake_open_connection, captured_kwargs, writer_obj -@pytest.mark.asyncio -async def test_run_client_unknown_palette(monkeypatch): - """run_client exits with error on unknown palette.""" - monkeypatch.setattr(sys, "argv", ["telnetlib3-client", "localhost", "--colormatch", "bogus"]) - monkeypatch.setattr(sys.stdin, "isatty", lambda: False) - - with pytest.raises(SystemExit) as exc_info: - await cl.run_client() - assert exc_info.value.code == 1 - - -@pytest.mark.parametrize( - "argv_extra,filter_cls_name", - [ - pytest.param( - ["--encoding", "petscii", "--colormatch", "vga"], - "PetsciiColorFilter", - id="petscii_selects_c64", - ), - pytest.param( - ["--encoding", "atascii", "--colormatch", "vga"], - "AtasciiControlFilter", - id="atascii_filter", - ), - pytest.param(["--colormatch", "vga"], "ColorFilter", id="colormatch_vga"), - pytest.param( - ["--colormatch", "petscii"], "PetsciiColorFilter", id="colormatch_petscii_alias" - ), - ], -) -@pytest.mark.asyncio -async def test_run_client_color_filter(monkeypatch, argv_extra, filter_cls_name): - monkeypatch.setattr(sys, "argv", ["telnetlib3-client", "localhost"] + argv_extra) - monkeypatch.setattr(sys.stdin, "isatty", lambda: False) - monkeypatch.setattr(accessories, "function_lookup", lambda _: _noop_shell) - - loop = asyncio.get_event_loop() - fake_oc, captured, writer_obj = _fake_open_connection_factory(loop) - monkeypatch.setattr(cl, "open_connection", fake_oc) - await cl.run_client() - - assert type(writer_obj.ctx.color_filter).__name__ == filter_cls_name - - @pytest.mark.asyncio async def test_connection_made_reader_set_transport_exception(): client = _make_client(encoding=False) diff --git a/telnetlib3/tests/test_color_filter.py b/telnetlib3/tests/test_color_filter.py deleted file mode 100644 index de8a1d6..0000000 --- a/telnetlib3/tests/test_color_filter.py +++ /dev/null @@ -1,663 +0,0 @@ -"""Tests for telnetlib3.color_filter -- ANSI color palette translation.""" - -# 3rd party -import pytest - -# local -from telnetlib3.color_filter import ( - PALETTES, - ColorConfig, - ColorFilter, - PetsciiColorFilter, - AtasciiControlFilter, - _adjust_color, - _is_foreground_code, - _sgr_code_to_palette_index, -) - - -def _make_filter(**kwargs: object) -> ColorFilter: - cfg = ColorConfig(brightness=1.0, contrast=1.0, **kwargs) # type: ignore[arg-type] - return ColorFilter(cfg) - - -@pytest.mark.parametrize("name", list(PALETTES.keys())) -def test_palette_has_16_entries(name: str) -> None: - assert len(PALETTES[name]) == 16 - - -@pytest.mark.parametrize("name", list(PALETTES.keys())) -def test_palette_rgb_in_range(name: str) -> None: - for r, g, b in PALETTES[name]: - assert 0 <= r <= 255 - assert 0 <= g <= 255 - assert 0 <= b <= 255 - - -def test_all_expected_palettes_exist() -> None: - assert set(PALETTES.keys()) == {"vga", "xterm", "c64"} - - -def test_color_config_defaults() -> None: - cfg = ColorConfig() - assert cfg.palette_name == "vga" - assert cfg.brightness == 1.0 - assert cfg.contrast == 1.0 - assert cfg.background_color == (0, 0, 0) - assert cfg.ice_colors is True - - -@pytest.mark.parametrize( - "code,expected", - [ - (30, 0), - (31, 1), - (32, 2), - (33, 3), - (34, 4), - (35, 5), - (36, 6), - (37, 7), - (40, 0), - (41, 1), - (42, 2), - (43, 3), - (44, 4), - (45, 5), - (46, 6), - (47, 7), - (90, 8), - (91, 9), - (92, 10), - (93, 11), - (94, 12), - (95, 13), - (96, 14), - (97, 15), - (100, 8), - (101, 9), - (102, 10), - (103, 11), - (104, 12), - (105, 13), - (106, 14), - (107, 15), - ], -) -def test_color_code_maps_to_palette_index(code: int, expected: int) -> None: - assert _sgr_code_to_palette_index(code) == expected - - -@pytest.mark.parametrize("code", [0, 1, 4, 7, 22, 38, 39, 48, 49, 128]) -def test_non_color_returns_none(code: int) -> None: - assert _sgr_code_to_palette_index(code) is None - - -@pytest.mark.parametrize("code", list(range(30, 38)) + list(range(90, 98))) -def test_is_foreground_code_true(code: int) -> None: - assert _is_foreground_code(code) is True - - -@pytest.mark.parametrize("code", list(range(40, 48)) + list(range(100, 108))) -def test_is_foreground_code_false(code: int) -> None: - assert _is_foreground_code(code) is False - - -@pytest.mark.parametrize( - "r,g,b,brightness,contrast,expected", - [ - (170, 85, 0, 1.0, 1.0, (170, 85, 0)), - (200, 100, 50, 1.0, 0.0, (128, 128, 128)), - (200, 100, 50, 0.0, 1.0, (0, 0, 0)), - (200, 100, 0, 0.5, 1.0, (100, 50, 0)), - ], -) -def test_adjust_color_values( - r: int, g: int, b: int, brightness: float, contrast: float, expected: tuple[int, int, int] -) -> None: - assert _adjust_color(r, g, b, brightness, contrast) == expected - - -def test_adjust_color_clamp_high() -> None: - r, _, _ = _adjust_color(255, 255, 255, 1.0, 2.0) - assert r == 255 - - -def test_adjust_color_clamp_low() -> None: - r, _, _ = _adjust_color(0, 0, 0, 1.0, 2.0) - assert r == 0 - - -def test_adjust_color_in_range() -> None: - r, g, b = _adjust_color(170, 0, 0, 0.9, 0.8) - assert 0 <= r <= 255 - assert 0 <= g <= 255 - assert 0 <= b <= 255 - - -@pytest.mark.parametrize( - "sgr,palette_idx,prefix", - [("31", 1, "38;2"), ("41", 1, "48;2"), ("91", 9, "38;2"), ("101", 9, "48;2")], -) -def test_color_filter_basic_translation(sgr: str, palette_idx: int, prefix: str) -> None: - f = _make_filter() - result = f.filter(f"\x1b[{sgr}m") - rgb = PALETTES["vga"][palette_idx] - assert f"{prefix};{rgb[0]};{rgb[1]};{rgb[2]}" in result - - -@pytest.mark.parametrize( - "code,idx", [(30, 0), (31, 1), (32, 2), (33, 3), (34, 4), (35, 5), (36, 6), (37, 7)] -) -def test_all_normal_foreground_colors(code: int, idx: int) -> None: - f = _make_filter() - result = f.filter(f"\x1b[{code}m") - rgb = PALETTES["vga"][idx] - assert f"38;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result - - -@pytest.mark.parametrize( - "code,idx", [(40, 0), (41, 1), (42, 2), (43, 3), (44, 4), (45, 5), (46, 6), (47, 7)] -) -def test_all_normal_background_colors(code: int, idx: int) -> None: - f = _make_filter() - result = f.filter(f"\x1b[{code}m") - rgb = PALETTES["vga"][idx] - assert f"48;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result - - -def test_explicit_reset() -> None: - f = _make_filter(background_color=(0, 0, 0)) - result = f.filter("\x1b[0m") - assert "48;2;0;0;0" in result - assert "38;2;170;170;170" in result - - -def test_empty_reset() -> None: - f = _make_filter(background_color=(0, 0, 0)) - result = f.filter("\x1b[m") - assert "\x1b[0m" in result - assert "48;2;0;0;0" in result - assert "38;2;170;170;170" in result - - -def test_reset_in_compound_sequence() -> None: - f = _make_filter(background_color=(0, 0, 0)) - assert "48;2;0;0;0" in f.filter("\x1b[0;31m") - - -def test_reset_with_bg_preserves_explicit_bg() -> None: - """SGR 0;30;42 must not override green bg with configured black bg.""" - f = _make_filter(background_color=(0, 0, 0)) - result = f.filter("\x1b[0;30;42m") - assert "48;2;0;170;0" in result - last_bg = result.rfind("48;2;") - assert result[last_bg:].startswith("48;2;0;170;0") - - -def test_reset_with_fg_preserves_explicit_fg() -> None: - """SGR 0;31 must use red, not the injected default white.""" - f = _make_filter(background_color=(0, 0, 0)) - result = f.filter("\x1b[0;31m") - last_fg = result.rfind("38;2;") - assert result[last_fg:].startswith("38;2;170;0;0") - - -def test_bold_after_reset_emits_bright_white() -> None: - """ESC[0m ESC[1m should produce bright white (palette 15).""" - f = _make_filter(background_color=(0, 0, 0)) - f.filter("\x1b[0m") - assert "38;2;255;255;255" in f.filter("\x1b[1m") - - -def test_bold_after_explicit_fg_emits_bright_color() -> None: - """ESC[31m ESC[1m should produce bright red (palette 9).""" - f = _make_filter(background_color=(0, 0, 0)) - f.filter("\x1b[31m") - assert "38;2;255;85;85" in f.filter("\x1b[1m") - - -def test_unbold_restores_normal_fg() -> None: - """ESC[31m ESC[1m ESC[22m should restore normal red (palette 1).""" - f = _make_filter(background_color=(0, 0, 0)) - f.filter("\x1b[31m") - f.filter("\x1b[1m") - assert "38;2;170;0;0" in f.filter("\x1b[22m") - - -def test_bold_with_explicit_fg_in_same_seq_no_double_inject() -> None: - """ESC[1;31m should not inject default bright fg.""" - f = _make_filter(background_color=(0, 0, 0)) - result = f.filter("\x1b[1;31m") - assert "255;85;85" in result - assert "255;255;255" not in result - - -@pytest.mark.parametrize( - "seq,needle", - [ - ("\x1b[38;5;196m", "38;5;196"), - ("\x1b[38;2;100;200;50m", "38;2;100;200;50"), - ("\x1b[48;5;42m", "48;5;42"), - ("\x1b[48;2;10;20;30m", "48;2;10;20;30"), - ], -) -def test_extended_color_pass_through(seq: str, needle: str) -> None: - f = _make_filter() - assert needle in f.filter(seq) - - -def test_bold_emits_bright_default_fg() -> None: - f = _make_filter() - result = f.filter("\x1b[1m") - assert "1" in result - assert "38;2;255;255;255" in result - - -@pytest.mark.parametrize( - "seq,needle", - [ - ("\x1b[4m", "\x1b[4m"), - ("\x1b[2J", "\x1b[2J"), - ("\x1b[H", "\x1b[H"), - ("\x1b[38:2::255:0:0m", "\x1b[38:2::255:0:0m"), - ], -) -def test_non_sgr_pass_through(seq: str, needle: str) -> None: - f = _make_filter() - assert needle in f.filter(seq) - - -@pytest.mark.parametrize( - "sgr_code,expected_prefix", [("39", "38;2;170;170;170"), ("49", "48;2;0;0;0")] -) -def test_default_color_translated(sgr_code: str, expected_prefix: str) -> None: - f = _make_filter() - assert expected_prefix in f.filter(f"\x1b[{sgr_code}m") - - -def test_bold_plus_red_uses_bright() -> None: - f = _make_filter() - result = f.filter("\x1b[1;31m") - bright_red = PALETTES["vga"][9] - assert f"38;2;{bright_red[0]};{bright_red[1]};{bright_red[2]}" in result - - -def test_red_fg_green_bg() -> None: - f = _make_filter() - result = f.filter("\x1b[31;42m") - fg_rgb = PALETTES["vga"][1] - bg_rgb = PALETTES["vga"][2] - assert f"38;2;{fg_rgb[0]};{fg_rgb[1]};{fg_rgb[2]}" in result - assert f"48;2;{bg_rgb[0]};{bg_rgb[1]};{bg_rgb[2]}" in result - - -@pytest.mark.parametrize("seq", ["\x1b[1;30m", "\x1b[30;1m"]) -def test_bold_single_seq_uses_bright(seq: str) -> None: - f = _make_filter() - result = f.filter(seq) - bright_black = PALETTES["vga"][8] - assert f"38;2;{bright_black[0]};{bright_black[1]};{bright_black[2]}" in result - - -@pytest.mark.parametrize( - "setup_seqs,test_seq,palette_idx", - [ - (["\x1b[1m"], "\x1b[30m", 8), - (["\x1b[1m", "\x1b[22m"], "\x1b[30m", 0), - (["\x1b[1m", "\x1b[0m"], "\x1b[30m", 0), - ], -) -def test_bold_state_across_sequences( - setup_seqs: list[str], test_seq: str, palette_idx: int -) -> None: - f = _make_filter() - for seq in setup_seqs: - f.filter(seq) - result = f.filter(test_seq) - rgb = PALETTES["vga"][palette_idx] - assert f"38;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result - - -def test_bold_does_not_affect_bright_colors() -> None: - f = _make_filter() - result = f.filter("\x1b[1;90m") - bright_black = PALETTES["vga"][8] - assert f"38;2;{bright_black[0]};{bright_black[1]};{bright_black[2]}" in result - - -def test_bold_does_not_affect_background() -> None: - f = _make_filter() - result = f.filter("\x1b[1;40m") - normal_black = PALETTES["vga"][0] - assert f"48;2;{normal_black[0]};{normal_black[1]};{normal_black[2]}" in result - - -@pytest.mark.parametrize( - "code,normal_idx", [(30, 0), (31, 1), (32, 2), (33, 3), (34, 4), (35, 5), (36, 6), (37, 7)] -) -def test_all_bold_fg_use_bright_palette(code: int, normal_idx: int) -> None: - f = _make_filter() - result = f.filter(f"\x1b[1;{code}m") - bright_rgb = PALETTES["vga"][normal_idx + 8] - assert f"38;2;{bright_rgb[0]};{bright_rgb[1]};{bright_rgb[2]}" in result - - -def test_reset_bold_color_in_same_seq() -> None: - f = _make_filter() - result = f.filter("\x1b[0;1;34m") - bright_blue = PALETTES["vga"][12] - assert f"38;2;{bright_blue[0]};{bright_blue[1]};{bright_blue[2]}" in result - - -def _make_ice_filter(ice_colors: bool = True) -> ColorFilter: - return ColorFilter(ColorConfig(brightness=1.0, contrast=1.0, ice_colors=ice_colors)) - - -@pytest.mark.parametrize("seq", ["\x1b[5;40m", "\x1b[40;5m"]) -def test_ice_blink_single_seq_uses_bright(seq: str) -> None: - f = _make_ice_filter() - result = f.filter(seq) - bright_black = PALETTES["vga"][8] - assert f"48;2;{bright_black[0]};{bright_black[1]};{bright_black[2]}" in result - - -@pytest.mark.parametrize( - "setup_seqs,test_seq,palette_idx", - [ - (["\x1b[5m"], "\x1b[40m", 8), - (["\x1b[5m", "\x1b[25m"], "\x1b[40m", 0), - (["\x1b[5m", "\x1b[0m"], "\x1b[40m", 0), - ], -) -def test_ice_blink_state_across_sequences( - setup_seqs: list[str], test_seq: str, palette_idx: int -) -> None: - f = _make_ice_filter() - for seq in setup_seqs: - f.filter(seq) - result = f.filter(test_seq) - rgb = PALETTES["vga"][palette_idx] - assert f"48;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result - - -def test_ice_blink_does_not_affect_foreground() -> None: - f = _make_ice_filter() - result = f.filter("\x1b[5;30m") - normal_black = PALETTES["vga"][0] - assert f"38;2;{normal_black[0]};{normal_black[1]};{normal_black[2]}" in result - - -def test_ice_blink_does_not_affect_bright_bg() -> None: - f = _make_ice_filter() - result = f.filter("\x1b[5;100m") - bright_black = PALETTES["vga"][8] - assert f"48;2;{bright_black[0]};{bright_black[1]};{bright_black[2]}" in result - - -@pytest.mark.parametrize( - "code,normal_idx", [(40, 0), (41, 1), (42, 2), (43, 3), (44, 4), (45, 5), (46, 6), (47, 7)] -) -def test_all_blink_bg_use_bright_palette(code: int, normal_idx: int) -> None: - f = _make_ice_filter() - result = f.filter(f"\x1b[5;{code}m") - bright_rgb = PALETTES["vga"][normal_idx + 8] - assert f"48;2;{bright_rgb[0]};{bright_rgb[1]};{bright_rgb[2]}" in result - - -def test_ice_reset_blink_bg_in_same_seq() -> None: - f = _make_ice_filter() - result = f.filter("\x1b[0;5;41m") - bright_red = PALETTES["vga"][9] - assert f"48;2;{bright_red[0]};{bright_red[1]};{bright_red[2]}" in result - - -def test_ice_colors_disabled() -> None: - f = _make_ice_filter(ice_colors=False) - f.filter("x") - result = f.filter("\x1b[5;40m") - normal_black = PALETTES["vga"][0] - assert f"48;2;{normal_black[0]};{normal_black[1]};{normal_black[2]}" in result - params = result.split("\x1b[")[1].split("m")[0] - assert "5" in params.split(";") - - -def test_chunked_split_at_esc() -> None: - f = _make_filter() - result1 = f.filter("hello\x1b") - assert "hello" in result1 - assert result1.endswith("hello") - result2 = f.filter("[31mworld") - rgb = PALETTES["vga"][1] - assert f"38;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result2 - assert "world" in result2 - - -def test_chunked_split_mid_params() -> None: - f = _make_filter() - result1 = f.filter("hello\x1b[3") - assert "hello" in result1 - result2 = f.filter("1mworld") - rgb = PALETTES["vga"][1] - assert f"38;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result2 - assert "world" in result2 - - -def test_chunked_flush_returns_buffer() -> None: - f = _make_filter() - f.filter("hello\x1b[3") - assert f.flush() == "\x1b[3" - - -def test_chunked_flush_empty_when_no_buffer() -> None: - f = _make_filter() - f.filter("hello") - assert not f.flush() - - -def test_color_filter_initial_background_first_output() -> None: - f = ColorFilter(ColorConfig(brightness=1.0, contrast=1.0, background_color=(0, 0, 0))) - result = f.filter("hello") - assert result.startswith("\x1b[48;2;0;0;0m") - assert result.endswith("hello") - - -def test_color_filter_initial_background_second_output() -> None: - f = ColorFilter(ColorConfig(brightness=1.0, contrast=1.0, background_color=(0, 0, 0))) - f.filter("hello") - result2 = f.filter("world") - assert not result2.startswith("\x1b[48;2;") - assert result2 == "world" - - -def test_color_filter_plain_text_pass_through() -> None: - f = _make_filter() - assert "hello world" in f.filter("hello world") - - -def test_color_filter_empty_string() -> None: - f = _make_filter() - assert not f.filter("") - - -def test_color_filter_reduced_brightness() -> None: - f = ColorFilter(ColorConfig(brightness=0.5, contrast=1.0)) - result = f.filter("\x1b[37m") - ega_white = PALETTES["vga"][7] - adjusted = _adjust_color(*ega_white, 0.5, 1.0) - assert f"38;2;{adjusted[0]};{adjusted[1]};{adjusted[2]}" in result - - -def test_color_filter_reduced_contrast() -> None: - f = ColorFilter(ColorConfig(brightness=1.0, contrast=0.5)) - result = f.filter("\x1b[31m") - ega_red = PALETTES["vga"][1] - adjusted = _adjust_color(*ega_red, 1.0, 0.5) - assert f"38;2;{adjusted[0]};{adjusted[1]};{adjusted[2]}" in result - - -def test_color_filter_custom_background_in_reset() -> None: - f = ColorFilter(ColorConfig(brightness=1.0, contrast=1.0, background_color=(32, 32, 48))) - assert "\x1b[48;2;32;32;48m" in f.filter("\x1b[0m") - - -def test_color_filter_custom_background_on_initial() -> None: - f = ColorFilter(ColorConfig(brightness=1.0, contrast=1.0, background_color=(32, 32, 48))) - assert f.filter("hello").startswith("\x1b[48;2;32;32;48m") - - -@pytest.mark.parametrize("name", [n for n in PALETTES if n != "c64"]) -def test_color_filter_palette_red_foreground(name: str) -> None: - f = ColorFilter(ColorConfig(palette_name=name, brightness=1.0, contrast=1.0)) - result = f.filter("\x1b[31m") - rgb = PALETTES[name][1] - assert f"38;2;{rgb[0]};{rgb[1]};{rgb[2]}" in result - - -def _make_petscii_filter(**kwargs: object) -> PetsciiColorFilter: - cfg = ColorConfig(brightness=1.0, contrast=1.0, **kwargs) # type: ignore[arg-type] - return PetsciiColorFilter(cfg) - - -@pytest.mark.parametrize( - "ctrl_char,palette_idx", - [ - ("\x05", 1), - ("\x1c", 2), - ("\x1e", 5), - ("\x1f", 6), - ("\x81", 8), - ("\x90", 0), - ("\x95", 9), - ("\x96", 10), - ("\x97", 11), - ("\x98", 12), - ("\x99", 13), - ("\x9a", 14), - ("\x9b", 15), - ("\x9c", 4), - ("\x9e", 7), - ("\x9f", 3), - ], -) -def test_petscii_color_code_to_24bit(ctrl_char: str, palette_idx: int) -> None: - f = _make_petscii_filter() - result = f.filter(f"hello{ctrl_char}world") - rgb = PALETTES["c64"][palette_idx] - assert f"\x1b[38;2;{rgb[0]};{rgb[1]};{rgb[2]}m" in result - assert ctrl_char not in result - assert "hello" in result - assert "world" in result - - -def test_petscii_rvs_on() -> None: - f = _make_petscii_filter() - result = f.filter("before\x12after") - assert "\x1b[7m" in result - assert "\x12" not in result - - -def test_petscii_rvs_off() -> None: - f = _make_petscii_filter() - result = f.filter("before\x92after") - assert "\x1b[27m" in result - assert "\x92" not in result - - -def test_petscii_mixed_colors_and_rvs() -> None: - f = _make_petscii_filter() - result = f.filter("\x1c\x12hello\x92\x05world") - red_rgb = PALETTES["c64"][2] - white_rgb = PALETTES["c64"][1] - assert f"\x1b[38;2;{red_rgb[0]};{red_rgb[1]};{red_rgb[2]}m" in result - assert "\x1b[7m" in result - assert "\x1b[27m" in result - assert f"\x1b[38;2;{white_rgb[0]};{white_rgb[1]};{white_rgb[2]}m" in result - assert "hello" in result - assert "world" in result - - -def test_petscii_plain_text_unchanged() -> None: - f = _make_petscii_filter() - assert f.filter("hello world") == "hello world" - - -def test_petscii_non_petscii_control_chars_unchanged() -> None: - f = _make_petscii_filter() - assert f.filter("A\x07B\x0bC") == "A\x07B\x0bC" - - -@pytest.mark.parametrize( - "ctrl_char,expected", - [ - ("\x13", "\x1b[H"), - ("\x93", "\x1b[2J"), - ("\x11", "\x1b[B"), - ("\x91", "\x1b[A"), - ("\x1d", "\x1b[C"), - ("\x9d", "\x1b[D"), - ("\x14", "\x08\x1b[P"), - ], -) -def test_petscii_cursor_controls_translated(ctrl_char: str, expected: str) -> None: - f = _make_petscii_filter() - assert f.filter(f"A{ctrl_char}B") == f"A{expected}B" - - -def test_petscii_flush_returns_empty() -> None: - f = _make_petscii_filter() - assert not f.flush() - - -def test_petscii_brightness_contrast_applied() -> None: - f_full = PetsciiColorFilter(ColorConfig(brightness=1.0, contrast=1.0)) - f_dim = PetsciiColorFilter(ColorConfig(brightness=0.5, contrast=0.5)) - assert f_full.filter("\x1c") != f_dim.filter("\x1c") - - -def test_petscii_default_config() -> None: - f = PetsciiColorFilter() - assert "\x1b[38;2;" in f.filter("\x1c") - - -@pytest.mark.parametrize( - "glyph,expected", - [ - ("\u25c0", "\x08\x1b[P"), - ("\u25b6", "\t"), - ("\u21b0", "\x1b[2J\x1b[H"), - ("\u2191", "\x1b[A"), - ("\u2193", "\x1b[B"), - ("\u2190", "\x1b[D"), - ("\u2192", "\x1b[C"), - ], -) -def test_atascii_control_glyph_translated(glyph: str, expected: str) -> None: - f = AtasciiControlFilter() - assert f.filter(f"before{glyph}after") == f"before{expected}after" - - -def test_atascii_backspace_erases() -> None: - f = AtasciiControlFilter() - assert f.filter("DINGO\u25c0\u25c0\u25c0\u25c0\u25c0") == ("DINGO" + "\x08\x1b[P" * 5) - - -def test_atascii_plain_text_unchanged() -> None: - f = AtasciiControlFilter() - assert f.filter("hello world") == "hello world" - - -def test_atascii_graphics_unchanged() -> None: - f = AtasciiControlFilter() - text = "\u2663\u2665\u2666\u2660" - assert f.filter(text) == text - - -def test_atascii_flush_returns_empty() -> None: - f = AtasciiControlFilter() - assert not f.flush() - - -def test_atascii_multiple_controls_in_one_string() -> None: - f = AtasciiControlFilter() - assert f.filter("\u2191\u2193\u2190\u2192") == "\x1b[A\x1b[B\x1b[D\x1b[C" diff --git a/telnetlib3/tests/test_core.py b/telnetlib3/tests/test_core.py index 2a318b9..7295a4e 100644 --- a/telnetlib3/tests/test_core.py +++ b/telnetlib3/tests/test_core.py @@ -384,14 +384,7 @@ async def test_telnet_client_as_module(): async def test_telnet_client_cmdline(bind_host, unused_tcp_port): """Test executing telnetlib3/client.py as client.""" prog = pexpect.which("telnetlib3-client") - args = [ - prog, - bind_host, - str(unused_tcp_port), - "--loglevel=info", - "--connect-maxwait=0.05", - "--colormatch=none", - ] + args = [prog, bind_host, str(unused_tcp_port), "--loglevel=info", "--connect-maxwait=0.05"] class HelloServer(asyncio.Protocol): def connection_made(self, transport): @@ -432,7 +425,6 @@ async def test_telnet_client_tty_cmdline(bind_host, unused_tcp_port): str(unused_tcp_port), "--loglevel=warning", "--connect-maxwait=0.05", - "--colormatch=none", ] class HelloServer(asyncio.Protocol): @@ -467,7 +459,6 @@ async def test_telnet_client_cmdline_stdin_pipe(bind_host, unused_tcp_port): "--loglevel=info", "--connect-maxwait=0.15", f"--logfile={logfile}", - "--colormatch=none", ] async def shell(reader, writer): diff --git a/telnetlib3/tests/test_linemode.py b/telnetlib3/tests/test_linemode.py index abd4fd3..5c582cb 100644 --- a/telnetlib3/tests/test_linemode.py +++ b/telnetlib3/tests/test_linemode.py @@ -1,14 +1,40 @@ """Test LINEMODE, rfc-1184_.""" # std imports +import sys import asyncio +import collections + +# 3rd party +import pytest # local import telnetlib3 import telnetlib3.stream_writer -from telnetlib3.slc import LMODE_MODE, LMODE_MODE_ACK, LMODE_MODE_LOCAL +from telnetlib3 import slc +from telnetlib3.slc import LMODE_SLC, LMODE_MODE, LMODE_MODE_ACK, LMODE_MODE_LOCAL from telnetlib3.telopt import DO, SB, SE, IAC, WILL, LINEMODE -from telnetlib3.tests.accessories import create_server, asyncio_connection +from telnetlib3.stream_writer import TelnetWriter +from telnetlib3.tests.accessories import ( + MockProtocol, + MockTransport, + create_server, + asyncio_connection, +) + + +def _make_server_writer(): + t = MockTransport() + p = MockProtocol() + w = TelnetWriter(t, p, server=True) + return w, t + + +def _make_client_writer(): + t = MockTransport() + p = MockProtocol() + w = TelnetWriter(t, p, client=True) + return w, t async def test_server_demands_remote_linemode_client_agrees(bind_host, unused_tcp_port): @@ -46,8 +72,9 @@ def begin_negotiation(self): 0.1, ) + # server sends SLC table after MODE ACK; drain remaining bytes to reach EOF result = await client_reader.read() - assert result == b"" + assert result.startswith(IAC + SB + LINEMODE + LMODE_SLC) assert srv_instance.writer.mode == "remote" assert srv_instance.writer.linemode.remote is True @@ -94,8 +121,9 @@ def begin_negotiation(self): 0.1, ) + # server sends SLC table after MODE ACK; drain remaining bytes to reach EOF result = await client_reader.read() - assert result == b"" + assert result.startswith(IAC + SB + LINEMODE + LMODE_SLC) assert srv_instance.writer.mode == "local" assert srv_instance.writer.linemode.remote is False @@ -104,3 +132,328 @@ def begin_negotiation(self): assert srv_instance.writer.linemode.ack is True assert srv_instance.writer.linemode.soft_tab is False assert srv_instance.writer.linemode.lit_echo is False + + +def test_slc_validation_rejects_misaligned(): + """_handle_sb_linemode_slc raises ValueError for non-multiple-of-3 buffer.""" + w, _ = _make_server_writer() + buf = collections.deque([b"\x09", b"\x02", b"\x03", b"\x04"]) + with pytest.raises(ValueError, match="multiple of 3"): + w._handle_sb_linemode_slc(buf) + + +def test_slc_change_default_uses_default_table(): + """_slc_change with SLC_DEFAULT restores value from default table, not incoming.""" + w, _ = _make_server_writer() + default_val = w.default_slc_tab[slc.SLC_EC].val + slc_def = slc.SLC(slc.SLC_DEFAULT, b"\x03") + w._slc_change(slc.SLC_EC, slc_def) + assert w.slctab[slc.SLC_EC].val == default_val + assert w.slctab[slc.SLC_EC].val != b"\x03" + + +def test_forwardmask_stored(): + """_handle_do_forwardmask stores a Forwardmask for valid lengths.""" + w, _ = _make_client_writer() + assert w.forwardmask is None + buf = collections.deque([bytes([b]) for b in b"\x00" * 16]) + w._handle_do_forwardmask(buf) + assert w.forwardmask is not None + assert isinstance(w.forwardmask, slc.Forwardmask) + assert len(w.forwardmask.value) == 16 + + +@pytest.mark.parametrize("length", [0, 33]) +def test_forwardmask_invalid_length(length): + """_handle_do_forwardmask logs warning and stores nothing for invalid lengths.""" + w, _ = _make_client_writer() + buf = collections.deque([bytes([0]) for _ in range(length)]) + w._handle_do_forwardmask(buf) + assert w.forwardmask is None + + +def test_client_sends_slc_request_on_will_linemode(): + """Client emits SLC (0, SLC_DEFAULT, 0) triplet immediately after WILL LINEMODE.""" + w, t = _make_client_writer() + w.handle_do(LINEMODE) + all_writes = b"".join(t.writes) + assert IAC + WILL + LINEMODE in all_writes + assert IAC + SB + LINEMODE + LMODE_SLC in all_writes + from telnetlib3.telopt import theNULL + + assert theNULL + slc.SLC_DEFAULT + theNULL in all_writes + + +def test_server_sends_slc_after_mode_ack(): + """Server proactively sends SLC table after client acknowledges MODE.""" + w, t = _make_server_writer() + w.remote_option[LINEMODE] = True + buf = collections.deque([slc.LMODE_MODE_ACK]) + w._handle_sb_linemode_mode(buf) + all_writes = b"".join(t.writes) + assert IAC + SB + LINEMODE + LMODE_SLC in all_writes + assert w._slc_sent is True + t.writes.clear() + w._handle_sb_linemode_mode(collections.deque([slc.LMODE_MODE_ACK])) + assert IAC + SB + LINEMODE + LMODE_SLC not in b"".join(t.writes) + + +def test_linemode_buffer_ec_el_ew(): + """LinemodeBuffer.feed() handles EC, EL, EW SLC functions correctly.""" + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab(slc.BSD_SLC_TAB) + buf = LinemodeBuffer(slctab=slctab) + for c in "hel": + buf.feed(c) + # EC (^? = 0x7F in BSD_SLC_TAB) + echo, data = buf.feed("\x7f") + assert echo == "\b \b" + assert data is None + assert buf._buf == ["h", "e"] + # EL (^U = 0x15) + echo, data = buf.feed("\x15") + assert echo == "\b \b" * 2 + assert data is None + assert buf._buf == [] + # EW (^W = 0x17) -- add a word first + for c in "hello world": + buf.feed(c) + echo, data = buf.feed("\x17") + assert echo == "\b \b" * 5 + assert data is None + assert buf._buf == list("hello ") + + +def test_linemode_buffer_forwardmask_flush(): + """LinemodeBuffer flushes buffer immediately when a forwardmask character arrives.""" + from telnetlib3.client_shell import LinemodeBuffer + + fm_bytes = bytearray(16) + # byte 0x01: mask=0, flag=2**(7-1)=64=0x40 + fm_bytes[0] = 0x40 + fm = slc.Forwardmask(bytes(fm_bytes)) + slctab = slc.generate_slctab() + buf = LinemodeBuffer(slctab=slctab, forwardmask=fm) + buf.feed("a") + buf.feed("b") + echo, data = buf.feed("\x01") + assert data == b"ab\x01" + assert buf._buf == [] + + +def test_linemode_buffer_trapsig(): + """LinemodeBuffer returns IAC command bytes for signal chars when TRAPSIG is on.""" + from telnetlib3.telopt import IP, IAC + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab(slc.BSD_SLC_TAB) + buf = LinemodeBuffer(slctab=slctab, trapsig=True) + # SLC_IP is ^C = 0x03 in BSD_SLC_TAB + echo, data = buf.feed("\x03") + assert echo == "" + assert data == IAC + IP + + +def test_linemode_buffer_ec_empty_buf(): + """EC on an empty LinemodeBuffer returns empty echo and no data.""" + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab(slc.BSD_SLC_TAB) + buf = LinemodeBuffer(slctab=slctab) + # EC = 0x7F in BSD_SLC_TAB; buffer is empty + echo, data = buf.feed("\x7f") + assert echo == "" + assert data is None + assert buf._buf == [] + + +def test_linemode_buffer_cr_sends_line(): + """CR flushes the buffer and returns the line as bytes for the server.""" + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab(slc.BSD_SLC_TAB) + buf = LinemodeBuffer(slctab=slctab) + for c in "hello": + buf.feed(c) + echo, data = buf.feed("\r") + assert echo == "\r" + assert data == b"hello\r" + assert buf._buf == [] + + +def test_linemode_buffer_lf_sends_line(): + """LF flushes the buffer and returns the line as bytes for the server.""" + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab(slc.BSD_SLC_TAB) + buf = LinemodeBuffer(slctab=slctab) + for c in "hi": + buf.feed(c) + echo, data = buf.feed("\n") + assert echo == "\n" + assert data == b"hi\n" + assert buf._buf == [] + + +def test_linemode_buffer_trapsig_regular_char_buffered(): + """Regular char with trapsig=True is buffered, not sent as IAC.""" + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab(slc.BSD_SLC_TAB) + buf = LinemodeBuffer(slctab=slctab, trapsig=True) + echo, data = buf.feed("a") + assert echo == "a" + assert data is None + assert buf._buf == ["a"] + + +def test_linemode_buffer_slc_val_nosupport(): + """_slc_val returns None when the SLC entry is nosupport.""" + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab() + # Force SLC_EC to nosupport + slctab[slc.SLC_EC] = slc.SLC(slc.SLC_NOSUPPORT, slc.theNULL) + buf = LinemodeBuffer(slctab=slctab) + assert buf._slc_val(slc.SLC_EC) is None + + +def test_linemode_buffer_slc_val_missing(): + """_slc_val returns None when the SLC function is absent from the table.""" + from telnetlib3.client_shell import LinemodeBuffer + + buf = LinemodeBuffer(slctab={}) + assert buf._slc_val(slc.SLC_EC) is None + + +def test_slc_send_uses_parameter_not_self_slctab(): + """_slc_send(slctab) sends values from the given table, not self.slctab.""" + w, t = _make_server_writer() + # Modify slctab so SLC_EC has a different value from the default + modified_val = b"\x42" + default_val = w.default_slc_tab[slc.SLC_EC].val + assert default_val != modified_val, "precondition: values must differ" + w.slctab[slc.SLC_EC] = slc.SLC(slc.SLC_VARIABLE, modified_val) + + # Call _slc_send with the default table; result should contain default_val + w._slc_start() + w._slc_send(w.default_slc_tab) + w._slc_end() + + sent = b"".join(t.writes) + # Locate the SLC_EC triplet (func, flag, val) in the sent bytes + ec_byte = slc.SLC_EC + idx = 0 + found_default = False + found_modified = False + while idx < len(sent) - 2: + if sent[idx : idx + 1] == ec_byte: + val = sent[idx + 2 : idx + 3] + if val == default_val: + found_default = True + if val == modified_val: + found_modified = True + idx += 1 + assert found_default, "expected default value in SLC table" + assert not found_modified, "must not send modified self.slctab value" + + +def test_slc_process_default_resets_slctab(): + """(0, SLC_DEFAULT, 0) resets self.slctab to the default table.""" + from telnetlib3.telopt import theNULL + + w, _ = _make_server_writer() + # Modify an entry so it differs from the default + w.slctab[slc.SLC_EC] = slc.SLC(slc.SLC_VARIABLE, b"\x42") + assert w.slctab[slc.SLC_EC].val != w.default_slc_tab[slc.SLC_EC].val + + # Process the special (0, SLC_DEFAULT, 0) request + w._slc_process(theNULL, slc.SLC(slc.SLC_DEFAULT, theNULL)) + + assert w.slctab[slc.SLC_EC].val == w.default_slc_tab[slc.SLC_EC].val + + +def test_server_sends_slc_table_exactly_once(): + """Server sends SLC table on (0, SLC_DEFAULT, 0) and not again on MODE ACK.""" + from telnetlib3.telopt import theNULL + + w, t = _make_server_writer() + w.remote_option[LINEMODE] = True + + # Simulate client sending (0, SLC_DEFAULT, 0) + buf = collections.deque([theNULL, slc.SLC_DEFAULT, theNULL]) + w._handle_sb_linemode_slc(buf) + slc_table_header = IAC + SB + LINEMODE + LMODE_SLC + first_send = b"".join(t.writes) + assert slc_table_header in first_send + assert w._slc_sent is True + + # Now simulate client sending MODE ACK: server must NOT send SLC table again + t.writes.clear() + mode_ack_buf = collections.deque([slc.LMODE_MODE_ACK]) + w._handle_sb_linemode_mode(mode_ack_buf) + second_send = b"".join(t.writes) + assert slc_table_header not in second_send + + +def test_linemode_buffer_ew_skips_trailing_spaces(): + """EW erases trailing whitespace then the preceding word (POSIX VWERASE).""" + from telnetlib3.client_shell import LinemodeBuffer + + slctab = slc.generate_slctab(slc.BSD_SLC_TAB) + buf = LinemodeBuffer(slctab=slctab) + for c in "hello ": + buf.feed(c) + # EW (^W = 0x17) on "hello " should erase the trailing space AND "hello" + echo, data = buf.feed("\x17") + assert echo == "\b \b" * 6 + assert data is None + assert buf._buf == [] + + +if sys.platform != "win32": + import termios + + def test_determine_mode_linemode_edit(): + """determine_mode() keeps cooked mode with kernel echo when LINEMODE EDIT is set.""" + import types + + from telnetlib3.telopt import LINEMODE + from telnetlib3.client_shell import Terminal + from telnetlib3._session_context import TelnetSessionContext + + class _Opt: + def __init__(self, active): + self._active = active + + def enabled(self, key): + return key in self._active + + linemode = slc.Linemode(slc.LMODE_MODE_LOCAL) + ctx = TelnetSessionContext() + writer = types.SimpleNamespace( + will_echo=False, + client=True, + remote_option=_Opt(set()), + local_option=_Opt({LINEMODE}), + linemode=linemode, + log=types.SimpleNamespace(debug=lambda *a, **kw: None), + ctx=ctx, + ) + term = Terminal.__new__(Terminal) + term.telnet_writer = writer + term.software_echo = False + mode = Terminal.ModeDef( + iflag=termios.BRKINT | termios.ICRNL | termios.IXON, + oflag=termios.OPOST | termios.ONLCR, + cflag=termios.CS8 | termios.CREAD, + lflag=termios.ICANON | termios.ECHO | termios.ISIG | termios.IEXTEN, + ispeed=termios.B38400, + ospeed=termios.B38400, + cc=[b"\x00"] * termios.NCCS, + ) + result = term.determine_mode(mode) + assert result.lflag & termios.ICANON # cooked mode: kernel handles line editing + assert result.lflag & termios.ECHO # kernel handles echo + assert term.software_echo is False diff --git a/telnetlib3/tests/test_server_fingerprinting.py b/telnetlib3/tests/test_server_fingerprinting.py index efbb5cb..11af426 100644 --- a/telnetlib3/tests/test_server_fingerprinting.py +++ b/telnetlib3/tests/test_server_fingerprinting.py @@ -263,10 +263,10 @@ def test_server_fingerprint_hash_consistency(): (b"\xff\xfe\xb1", None, [("not_in", "\ufffd"), ("eq", "\udcff\udcfe\udcb1")]), (b"Hello\xb1World", "x-no-such-codec", [("eq", "Hello\xb1World")]), (b"Hello\x9b", "atascii", [("eq", "Hello\n")]), - (b"\x1c\xc8\xc9", "petscii", [("in", "\x1b[38;2;"), ("in", "HI")]), - (b"\x12\xc8\xc9\x92", "petscii", [("in", "\x1b[7m"), ("in", "\x1b[27m")]), + (b"\x1c\xc8\xc9", "petscii", [("eq", "HI")]), + (b"\x12\xc8\xc9\x92", "petscii", [("eq", "HI")]), (b"\xc8\xc9\x0d\xca\xcb", "petscii", [("eq", "HI\nJK")]), - (b"\x13\xc8\xc9", "petscii", [("in", "\x1b[H"), ("in", "HI")]), + (b"\x13\xc8\xc9", "petscii", [("eq", "HI")]), ], ) def test_format_banner(data, encoding, checks): diff --git a/telnetlib3/tests/test_shell.py b/telnetlib3/tests/test_shell.py index 21425ae..8693a19 100644 --- a/telnetlib3/tests/test_shell.py +++ b/telnetlib3/tests/test_shell.py @@ -134,7 +134,7 @@ async def test_telnet_server_given_shell(bind_host, unused_tcp_port): ( (b"\bhel\blp\r"), ( - b"\r\nquit, writer, slc, toggle [option|all], reader, proto, dump" + b"\r\nquit, writer, slc, linemode, toggle [option|all], reader, proto, dump" b"\r\ntel:sh> " ), ), @@ -195,6 +195,9 @@ async def test_telnet_server_given_shell(bind_host, unused_tcp_port): b"\r\ngoahead ON" b"\r\ninbinary off" b"\r\nlflow ON" + b"\r\nlinemode off" + b"\r\nlinemode-edit off" + b"\r\nlinemode-trapsig off" b"\r\noutbinary off" b"\r\nxon-any off" b"\r\ntel:sh> " @@ -240,6 +243,9 @@ async def test_telnet_server_given_shell(bind_host, unused_tcp_port): b"\r\ngoahead ON" b"\r\ninbinary off" b"\r\nlflow off" # flipped + b"\r\nlinemode off" + b"\r\nlinemode-edit off" + b"\r\nlinemode-trapsig off" b"\r\noutbinary off" b"\r\nxon-any ON" # flipped b"\r\ntel:sh> " diff --git a/telnetlib3/tests/test_tls.py b/telnetlib3/tests/test_tls.py index faf33f9..71f2fc9 100644 --- a/telnetlib3/tests/test_tls.py +++ b/telnetlib3/tests/test_tls.py @@ -537,13 +537,7 @@ def _pty_run_client(bind_host, port, extra_argv, server_ssl_ctx=None): marker = b"pty-marker-ok" def _child(): - sys.argv = [ - "telnetlib3-client", - bind_host, - str(port), - "--connect-maxwait=0.5", - "--colormatch=none", - ] + extra_argv + sys.argv = ["telnetlib3-client", bind_host, str(port), "--connect-maxwait=0.5"] + extra_argv from telnetlib3.client import run_client asyncio.run(run_client())