Add native WebSocket support by vendoring httpx-ws#1042
Conversation
>⚠️ This is a very young project. Expect bugs 🐛 Features -------- * `connect_ws` helper to talk to WebSockets synchronously. * `aconnect_ws` helper to talk to WebSockets asynchronously. * `ASGIWebSocketTransport` to test WebSockets in ASGI apps directly.
Features -------- * Add a `.ping` method. Thanks @kousikmitra 🎉 Improvements ------------ * Pin lower bound version of `httpx` and `httpcore` dependencies * `httpx>=0.23.1` * `httpcore>=0.16.1`
- Add test for attribute `response` - Add missing type annotations
BREAKING CHANGE: the `subprotocol` parameter of `WebSocketSession` has been removed.
Breaking changes ---------------- * [`AsyncWebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.AsyncWebSocketSession) and [`WebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.WebSocketSession) no longer accept the `subprotocol` parameter. It's automatically set from the `response` headers (see below). > [!NOTE] > If you only use the `connect_ws` and `aconnect_ws` functions, you don't need to change anything. Improvements ------------ * [`AsyncWebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.AsyncWebSocketSession) and [`WebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.WebSocketSession) now accepts the original HTTPX handshake response in parameter. Thanks @WSH032 🎉
Doing so without re-raising them is harmful from the asyncio uncancellation PoV, and is completely unnecessary with task groups anyway.
Don't catch cancellation exceptions
This avoids the ResourceWarning about unclosed memory object streams, introduced in AnyIO 4.4.0.
Close memory object streams at exit
Add unit test to reproduce #70
Previously the threads responsible for watching the stop event set signal were not being properly terminated, leading then to pile up and becoming a memory leak when using `connect_ws`. This commit fix this issue (#76).
Prevent threads to hang and pile up in WebSocketSession
Bug fixes --------- * Fix (#73) anyio misusages. Thanks @agronholm 🎉 * Fix (#74) unclosed anyio streams. Thanks @agronholm 🎉 * Fix (#76) memory leak with non-async WebSocketSession. Thanks @ro-oliveira95 🎉
* Improve _wait_until_closed() * Fix type annotation for Python<3.9
Bug fixes and improvements -------------------------- * Improve efficiency of `WebSocketSession` by reusing a single thread pool when waiting for messages. Thank you @davidbrochart 🎉
Merge the httpx-ws package (MIT, by @frankie567) into httpx2, preserving its upstream commit history under src/httpx2/httpx2/_websockets/ and the tests under tests/httpx2/websockets/. The code is unmodified at this point and still imports httpx/httpcore/wsproto; subsequent commits adapt it to httpx2. Source: https://github.com/Kludex/httpx-ws @ c3dcf21 (v0.6.2 era)
Rewrite the vendored `_websockets` package to httpx2's namespaces and conventions while keeping httpcore2 lazily imported: - Import the public types from httpx2's own modules (`.._models`, `.._transports.asgi`, `.._types`) instead of `httpx`. - Defer `import httpcore2` into the methods that catch its errors, and drop the `AsyncNetworkStream` base class from the ASGI stream, so importing the package no longer eagerly pulls in httpcore2. - Rename `transport.py` to `_transport.py` to match httpx2's private-module layout. - Move the numeric defaults into a wsproto-free `_defaults` module and add a `require_wsproto()` guard so the package can be imported without `wsproto` installed; `httpx2[ws]` is now an optional extra. - Apply httpx2's typing standards (`from __future__ import annotations`, modern unions, full annotations) so the code passes ruff and mypy --strict.
Add `httpx2.websocket()`, `Client.websocket()` and `AsyncClient.websocket()` context managers, and re-export the WebSocket session classes, the `ASGIWebSocketTransport` and the exception hierarchy from the top-level `httpx2` namespace. These names resolve lazily through `__getattr__` so `import httpx2` keeps working without `wsproto`; a missing dependency raises a clear error pointing to the `httpx2[ws]` extra. A `TYPE_CHECKING` block re-imports them from the typed submodules so static type checkers still see the real types.
Rewrite the vendored WebSocket tests to httpx2's namespaces, point the server fixture at uvicorn's `wsproto`/`websockets-sansio` implementations (the legacy implementation is incompatible with `filterwarnings=error`), and close the mock memory streams so the trio backend doesn't trip an unraisable ResourceWarning. Add `starlette`, `websockets` and `flaky` to the dev group, the `ws` extra to the dev `httpx2[...]` install, and update `test_exported_members` to account for the lazily-exported WebSocket names.
|
Docs preview: |
Merging this PR will not alter performance
Comparing Footnotes
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 29150edd36
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| headers = kwargs.pop("headers", {}) | ||
| headers.update(_get_headers(subprotocols)) |
There was a problem hiding this comment.
Normalize missing WebSocket headers before updating
When the advertised top-level httpx2.websocket(...) is used without headers, _api.websocket() forwards headers=None, so kwargs.pop("headers", {}) returns None here and the next line raises AttributeError before opening a connection. This makes the default public helper unusable; coerce None and other supported HeaderTypes to a mutable headers object before adding the handshake headers.
Useful? React with 👍 / 👎.
| if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket": | ||
| subprotocols: list[str] = [] | ||
| if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None: | ||
| subprotocols = subprotocols_header.split(",") |
There was a problem hiding this comment.
Strip spaces when parsing requested subprotocols
When multiple subprotocols are requested, the client-side helper serializes them as a comma-space header such as chat, superchat, but splitting only on , leaves the second value as " superchat" in scope["subprotocols"]. ASGI apps that compare exact protocol names will fail to negotiate any protocol after the first, so trim whitespace and ignore empty tokens when building the scope.
Useful? React with 👍 / 👎.
| send = self._asgi_send | ||
| try: | ||
| await self.app(scope, receive, send) | ||
| except Exception as e: |
There was a problem hiding this comment.
Honor raise_app_exceptions for websocket ASGI apps
With ASGIWebSocketTransport(..., raise_app_exceptions=True) (the inherited default), exceptions raised by a websocket endpoint are caught here and converted into a close frame instead of being surfaced to the test client. That makes the default behave the same as raise_app_exceptions=False for websocket routes and can let crashing ASGI tests pass as ordinary disconnects; pass the flag into this stream and re-raise when it is enabled.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
7 issues found across 19 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="src/httpx2/httpx2/_websockets/_api.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_api.py:601">
P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</violation>
<violation number="2" location="src/httpx2/httpx2/_websockets/_api.py:1052">
P1: `kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</violation>
</file>
<file name="src/httpx2/httpx2/_websockets/_transport.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_transport.py:115">
P2: `timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</violation>
<violation number="2" location="src/httpx2/httpx2/_websockets/_transport.py:128">
P2: `raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</violation>
<violation number="3" location="src/httpx2/httpx2/_websockets/_transport.py:173">
P2: Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</violation>
<violation number="4" location="src/httpx2/httpx2/_websockets/_transport.py:178">
P2: WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</violation>
</file>
<file name="src/httpx2/httpx2/_websockets/_ping.py">
<violation number="1" location="src/httpx2/httpx2/_websockets/_ping.py:25">
P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
| subprotocols: list[str] | None = None, | ||
| **kwargs: typing.Any, | ||
| ) -> typing.Generator[WebSocketSession, None, None]: | ||
| headers = kwargs.pop("headers", {}) |
There was a problem hiding this comment.
P1: kwargs.pop("headers", {}) returns None when headers=None is explicitly passed (e.g., from httpx2.websocket(...) which defaults headers to None). The default {} only applies when the key is absent from kwargs, not when the value is None. The subsequent headers.update(...) will raise AttributeError: 'NoneType' object has no attribute 'update'. Coerce the result: headers = kwargs.pop("headers", None) or {}.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 1052:
<comment>`kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</comment>
<file context>
@@ -0,0 +1,1291 @@
+ subprotocols: list[str] | None = None,
+ **kwargs: typing.Any,
+) -> typing.Generator[WebSocketSession, None, None]:
+ headers = kwargs.pop("headers", {})
+ headers.update(_get_headers(subprotocols))
+
</file context>
| headers = kwargs.pop("headers", {}) | |
| headers = kwargs.pop("headers", None) or {} |
| @@ -0,0 +1,41 @@ | |||
| from __future__ import annotations | |||
There was a problem hiding this comment.
P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_ping.py, line 25:
<comment>ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</comment>
<file context>
@@ -0,0 +1,41 @@
+ return ping_id, event
+
+ def ack(self, ping_id: bytes | bytearray) -> None:
+ event = self._pings.pop(bytes(ping_id))
+ event.set()
+
</file context>
| if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket": | ||
| subprotocols: list[str] = [] | ||
| if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None: | ||
| subprotocols = subprotocols_header.split(",") |
There was a problem hiding this comment.
P2: Splitting on "," without stripping whitespace leaves leading spaces in protocol names (e.g., "chat, superchat".split(",") → ["chat", " superchat"]). Since _get_headers() serializes subprotocols as ", ".join(subprotocols), any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use [s.strip() for s in subprotocols_header.split(",")] to trim whitespace.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 173:
<comment>Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</comment>
<file context>
@@ -0,0 +1,219 @@
+ if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
+ subprotocols: list[str] = []
+ if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
+ subprotocols = subprotocols_header.split(",")
+
+ scope = {
</file context>
| scope = { | ||
| "type": "websocket", | ||
| "path": request.url.path, | ||
| "raw_path": request.url.raw_path, |
There was a problem hiding this comment.
P2: WebSocket ASGI scope passes raw_path including query string. This can break ASGI apps that expect raw_path to be path-only and use query_string separately.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 178:
<comment>WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</comment>
<file context>
@@ -0,0 +1,219 @@
+ scope = {
+ "type": "websocket",
+ "path": request.url.path,
+ "raw_path": request.url.raw_path,
+ "root_path": self.root_path,
+ "scheme": scheme,
</file context>
| "raw_path": request.url.raw_path, | |
| "raw_path": request.url.raw_path.split(b"?")[0], |
| send = self._asgi_send | ||
| try: | ||
| await self.app(scope, receive, send) | ||
| except Exception as e: |
There was a problem hiding this comment.
P2: raise_app_exceptions is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 128:
<comment>`raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</comment>
<file context>
@@ -0,0 +1,219 @@
+ send = self._asgi_send
+ try:
+ await self.app(scope, receive, send)
+ except Exception as e:
+ message = {
+ "type": "websocket.close",
</file context>
| self._receive_queue.put(message) | ||
|
|
||
| async def receive(self, timeout: float | None = None) -> Message: | ||
| while self._send_queue.empty(): |
There was a problem hiding this comment.
P2: timeout is ineffective in receive(), so read(timeout=...) can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 115:
<comment>`timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</comment>
<file context>
@@ -0,0 +1,219 @@
+ self._receive_queue.put(message)
+
+ async def receive(self, timeout: float | None = None) -> Message:
+ while self._send_queue.empty():
+ await anyio.sleep(0)
+ return self._send_queue.get(timeout=timeout)
</file context>
| self._send_event, self._receive_event = anyio.create_memory_object_stream[ | ||
| wsproto.events.Event | HTTPXWSException | ||
| ]() |
There was a problem hiding this comment.
P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 601:
<comment>Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</comment>
<file context>
@@ -0,0 +1,1291 @@
+
+ async def __aenter__(self) -> AsyncWebSocketSession:
+ async with contextlib.AsyncExitStack() as exit_stack:
+ self._send_event, self._receive_event = anyio.create_memory_object_stream[
+ wsproto.events.Event | HTTPXWSException
+ ]()
</file context>
| self._send_event, self._receive_event = anyio.create_memory_object_stream[ | |
| wsproto.events.Event | HTTPXWSException | |
| ]() | |
| self._send_event, self._receive_event = anyio.create_memory_object_stream[ | |
| wsproto.events.Event | HTTPXWSException | |
| ](max_buffer_size=self._queue_size) |
Replace the `require_wsproto()` helper with a direct `try`/`import` guard in each WebSocket entrypoint (`Client.websocket`, `AsyncClient.websocket` and the top-level `__getattr__` resolution). When `wsproto` is missing the entrypoints raise a clear ImportError pointing to the `httpx2[ws]` extra, instead of surfacing a raw `ModuleNotFoundError`.
|
I like this approach better than #1034's! |
Ports httpx-ws (MIT, by @frankie567) into httpx2 as a native
httpx2._websocketspackage, preserving the full upstream commit history via a subtree-style merge.What you get
httpx2.websocket("ws://…"),Client().websocket(…),AsyncClient().websocket(…)httpx2.ASGIWebSocketTransport(app=…)for testing WebSocket endpoints against ASGI appsWebSocketSession/AsyncWebSocketSessionand the exception hierarchy (HTTPXWSException,WebSocketDisconnect,WebSocketUpgradeError, …) exported from the top levelDesign
wsprotois an optional extrahttpx2[ws], matching thebrotli/http2/sockspattern.import httpx2and constructing aClientwork without it; WebSocket features raise a clear "installhttpx2[ws]" error. The public names resolve lazily through__getattr__, with aTYPE_CHECKINGblock re-importing them from the typed submodules so type checkers still see real types.httpcore2stays lazily imported inside the methods that need it, preserving httpx2's existing lazy-loading.ruffandmypy --strict).History
The vendor commit merges httpx-ws's complete history (98 commits) under
src/httpx2/httpx2/_websockets/, so upstream authorship and provenance are preserved. Three follow-up commits adapt the code, wire the public API, and port the tests.Tests
Vendored httpx-ws tests live under
tests/httpx2/websockets/(122 passing across asyncio and trio). Dev deps added:starlette,websockets,flaky.scripts/checkis green.AI Disclaimer
This PR was developed with the assistance of either Claude or Codex. I've reviewed and verified the changes.