Skip to content

Add native WebSocket support by vendoring httpx-ws#1042

Open
Kludex wants to merge 103 commits into
mainfrom
websockets-native
Open

Add native WebSocket support by vendoring httpx-ws#1042
Kludex wants to merge 103 commits into
mainfrom
websockets-native

Conversation

@Kludex

@Kludex Kludex commented Jun 24, 2026

Copy link
Copy Markdown
Member

Ports httpx-ws (MIT, by @frankie567) into httpx2 as a native httpx2._websockets package, preserving the full upstream commit history via a subtree-style merge.

What you get

  • httpx2.websocket("ws://…"), Client().websocket(…), AsyncClient().websocket(…)
  • httpx2.ASGIWebSocketTransport(app=…) for testing WebSocket endpoints against ASGI apps
  • WebSocketSession / AsyncWebSocketSession and the exception hierarchy (HTTPXWSException, WebSocketDisconnect, WebSocketUpgradeError, …) exported from the top level

Design

  • wsproto is an optional extra httpx2[ws], matching the brotli/http2/socks pattern. import httpx2 and constructing a Client work without it; WebSocket features raise a clear "install httpx2[ws]" error. The public names resolve lazily through __getattr__, with a TYPE_CHECKING block re-importing them from the typed submodules so type checkers still see real types.
  • httpcore2 stays lazily imported inside the methods that need it, preserving httpx2's existing lazy-loading.
  • The vendored code is adapted to httpx2's namespaces and typing standards (passes ruff and mypy --strict).

History

The vendor commit merges httpx-ws's complete history (98 commits) under src/httpx2/httpx2/_websockets/, so upstream authorship and provenance are preserved. Three follow-up commits adapt the code, wire the public API, and port the tests.

Tests

Vendored httpx-ws tests live under tests/httpx2/websockets/ (122 passing across asyncio and trio). Dev deps added: starlette, websockets, flaky. scripts/check is green.

AI Disclaimer

This PR was developed with the assistance of either Claude or Codex. I've reviewed and verified the changes.

Review in cubic

frankie567 and others added 30 commits November 22, 2022 09:17
> ⚠️ This is a very young project. Expect bugs 🐛

Features
--------

* `connect_ws` helper to talk to WebSockets synchronously.
* `aconnect_ws` helper to talk to WebSockets asynchronously.
* `ASGIWebSocketTransport` to test WebSockets in ASGI apps directly.
* Add ping method (#2)

* add optional arguement paylod to ping
Features
--------

* Add a `.ping` method. Thanks @kousikmitra 🎉

Improvements
------------

* Pin lower bound version of `httpx` and `httpcore` dependencies
    * `httpx>=0.23.1`
    * `httpcore>=0.16.1`
WSH032 and others added 22 commits April 3, 2024 13:21
- Add test for attribute `response`
- Add missing type annotations
BREAKING CHANGE: the `subprotocol` parameter of `WebSocketSession` has been removed.
Breaking changes
----------------

* [`AsyncWebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.AsyncWebSocketSession) and [`WebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.WebSocketSession) no longer accept the `subprotocol` parameter. It's automatically set from the `response` headers (see below).

> [!NOTE]
> If you only use the `connect_ws` and `aconnect_ws` functions, you don't need to change anything.

Improvements
------------

* [`AsyncWebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.AsyncWebSocketSession) and [`WebSocketSession`](https://frankie567.github.io/httpx-ws/reference/httpx_ws/#httpx_ws.WebSocketSession) now accepts the original HTTPX handshake response in parameter. Thanks @WSH032 🎉
Doing so without re-raising them is harmful from the asyncio uncancellation PoV, and is completely unnecessary with task groups anyway.
Don't catch cancellation exceptions
This avoids the ResourceWarning about unclosed memory object streams, introduced in AnyIO 4.4.0.
Add unit test to reproduce #70
Previously the threads responsible for watching the stop event set
signal were not being properly terminated, leading then to pile
up and becoming a memory leak when using `connect_ws`. This commit
fix this issue (#76).
Prevent threads to hang and pile up in WebSocketSession
Bug fixes
---------

* Fix (#73) anyio misusages. Thanks @agronholm 🎉
* Fix (#74) unclosed anyio streams. Thanks @agronholm 🎉
* Fix (#76) memory leak with non-async WebSocketSession. Thanks @ro-oliveira95 🎉
* Improve _wait_until_closed()

* Fix type annotation for Python<3.9
Bug fixes and improvements
--------------------------

* Improve efficiency of `WebSocketSession` by reusing a single thread pool when waiting for messages. Thank you @davidbrochart 🎉
Merge the httpx-ws package (MIT, by @frankie567) into httpx2, preserving its
upstream commit history under src/httpx2/httpx2/_websockets/ and the tests
under tests/httpx2/websockets/. The code is unmodified at this point and still
imports httpx/httpcore/wsproto; subsequent commits adapt it to httpx2.

Source: https://github.com/Kludex/httpx-ws @ c3dcf21 (v0.6.2 era)
Rewrite the vendored `_websockets` package to httpx2's namespaces and
conventions while keeping httpcore2 lazily imported:

- Import the public types from httpx2's own modules (`.._models`,
  `.._transports.asgi`, `.._types`) instead of `httpx`.
- Defer `import httpcore2` into the methods that catch its errors, and drop
  the `AsyncNetworkStream` base class from the ASGI stream, so importing the
  package no longer eagerly pulls in httpcore2.
- Rename `transport.py` to `_transport.py` to match httpx2's private-module
  layout.
- Move the numeric defaults into a wsproto-free `_defaults` module and add a
  `require_wsproto()` guard so the package can be imported without `wsproto`
  installed; `httpx2[ws]` is now an optional extra.
- Apply httpx2's typing standards (`from __future__ import annotations`,
  modern unions, full annotations) so the code passes ruff and mypy --strict.
Add `httpx2.websocket()`, `Client.websocket()` and `AsyncClient.websocket()`
context managers, and re-export the WebSocket session classes, the
`ASGIWebSocketTransport` and the exception hierarchy from the top-level
`httpx2` namespace.

These names resolve lazily through `__getattr__` so `import httpx2` keeps
working without `wsproto`; a missing dependency raises a clear error pointing
to the `httpx2[ws]` extra. A `TYPE_CHECKING` block re-imports them from the
typed submodules so static type checkers still see the real types.
Rewrite the vendored WebSocket tests to httpx2's namespaces, point the
server fixture at uvicorn's `wsproto`/`websockets-sansio` implementations
(the legacy implementation is incompatible with `filterwarnings=error`), and
close the mock memory streams so the trio backend doesn't trip an unraisable
ResourceWarning.

Add `starlette`, `websockets` and `flaky` to the dev group, the `ws` extra to
the dev `httpx2[...]` install, and update `test_exported_members` to account
for the lazily-exported WebSocket names.
@github-actions

Copy link
Copy Markdown

Docs preview:

@codspeed-hq

codspeed-hq Bot commented Jun 24, 2026

Copy link
Copy Markdown

Merging this PR will not alter performance

✅ 15 untouched benchmarks
⏩ 7 skipped benchmarks1


Comparing websockets-native (6022321) with main (82b9e2d)

Open in CodSpeed

Footnotes

  1. 7 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 29150edd36

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +1052 to +1053
headers = kwargs.pop("headers", {})
headers.update(_get_headers(subprotocols))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Normalize missing WebSocket headers before updating

When the advertised top-level httpx2.websocket(...) is used without headers, _api.websocket() forwards headers=None, so kwargs.pop("headers", {}) returns None here and the next line raises AttributeError before opening a connection. This makes the default public helper unusable; coerce None and other supported HeaderTypes to a mutable headers object before adding the handshake headers.

Useful? React with 👍 / 👎.

if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
subprotocols: list[str] = []
if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
subprotocols = subprotocols_header.split(",")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Strip spaces when parsing requested subprotocols

When multiple subprotocols are requested, the client-side helper serializes them as a comma-space header such as chat, superchat, but splitting only on , leaves the second value as " superchat" in scope["subprotocols"]. ASGI apps that compare exact protocol names will fail to negotiate any protocol after the first, so trim whitespace and ignore empty tokens when building the scope.

Useful? React with 👍 / 👎.

send = self._asgi_send
try:
await self.app(scope, receive, send)
except Exception as e:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor raise_app_exceptions for websocket ASGI apps

With ASGIWebSocketTransport(..., raise_app_exceptions=True) (the inherited default), exceptions raised by a websocket endpoint are caught here and converted into a close frame instead of being surfaced to the test client. That makes the default behave the same as raise_app_exceptions=False for websocket routes and can let crashing ASGI tests pass as ordinary disconnects; pass the flag into this stream and re-raise when it is enabled.

Useful? React with 👍 / 👎.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 19 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="src/httpx2/httpx2/_websockets/_api.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_api.py:601">
P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</violation>

<violation number="2" location="src/httpx2/httpx2/_websockets/_api.py:1052">
P1: `kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</violation>
</file>

<file name="src/httpx2/httpx2/_websockets/_transport.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_transport.py:115">
P2: `timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</violation>

<violation number="2" location="src/httpx2/httpx2/_websockets/_transport.py:128">
P2: `raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</violation>

<violation number="3" location="src/httpx2/httpx2/_websockets/_transport.py:173">
P2: Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</violation>

<violation number="4" location="src/httpx2/httpx2/_websockets/_transport.py:178">
P2: WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</violation>
</file>

<file name="src/httpx2/httpx2/_websockets/_ping.py">

<violation number="1" location="src/httpx2/httpx2/_websockets/_ping.py:25">
P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</violation>
</file>

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

subprotocols: list[str] | None = None,
**kwargs: typing.Any,
) -> typing.Generator[WebSocketSession, None, None]:
headers = kwargs.pop("headers", {})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: kwargs.pop("headers", {}) returns None when headers=None is explicitly passed (e.g., from httpx2.websocket(...) which defaults headers to None). The default {} only applies when the key is absent from kwargs, not when the value is None. The subsequent headers.update(...) will raise AttributeError: 'NoneType' object has no attribute 'update'. Coerce the result: headers = kwargs.pop("headers", None) or {}.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 1052:

<comment>`kwargs.pop("headers", {})` returns `None` when `headers=None` is explicitly passed (e.g., from `httpx2.websocket(...)` which defaults `headers` to `None`). The default `{}` only applies when the key is absent from `kwargs`, not when the value is `None`. The subsequent `headers.update(...)` will raise `AttributeError: 'NoneType' object has no attribute 'update'`. Coerce the result: `headers = kwargs.pop("headers", None) or {}`.</comment>

<file context>
@@ -0,0 +1,1291 @@
+    subprotocols: list[str] | None = None,
+    **kwargs: typing.Any,
+) -> typing.Generator[WebSocketSession, None, None]:
+    headers = kwargs.pop("headers", {})
+    headers.update(_get_headers(subprotocols))
+
</file context>
Suggested change
headers = kwargs.pop("headers", {})
headers = kwargs.pop("headers", None) or {}

@@ -0,0 +1,41 @@
from __future__ import annotations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_ping.py, line 25:

<comment>ack() crashes on unmatched Pong payloads because dict.pop has no default. Background receive paths call ack() directly, so this can kill websocket receive processing instead of safely ignoring unexpected/late pongs.</comment>

<file context>
@@ -0,0 +1,41 @@
+        return ping_id, event
+
+    def ack(self, ping_id: bytes | bytearray) -> None:
+        event = self._pings.pop(bytes(ping_id))
+        event.set()
+
</file context>

if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
subprotocols: list[str] = []
if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
subprotocols = subprotocols_header.split(",")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Splitting on "," without stripping whitespace leaves leading spaces in protocol names (e.g., "chat, superchat".split(",")["chat", " superchat"]). Since _get_headers() serializes subprotocols as ", ".join(subprotocols), any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use [s.strip() for s in subprotocols_header.split(",")] to trim whitespace.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 173:

<comment>Splitting on `","` without stripping whitespace leaves leading spaces in protocol names (e.g., `"chat, superchat".split(",")` → `["chat", " superchat"]`). Since `_get_headers()` serializes subprotocols as `", ".join(subprotocols)`, any ASGI app comparing exact protocol names will fail to negotiate protocols after the first. Use `[s.strip() for s in subprotocols_header.split(",")]` to trim whitespace.</comment>

<file context>
@@ -0,0 +1,219 @@
+        if scheme in {"ws", "wss"} or headers.get("upgrade") == "websocket":
+            subprotocols: list[str] = []
+            if (subprotocols_header := headers.get("sec-websocket-protocol")) is not None:
+                subprotocols = subprotocols_header.split(",")
+
+            scope = {
</file context>

scope = {
"type": "websocket",
"path": request.url.path,
"raw_path": request.url.raw_path,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: WebSocket ASGI scope passes raw_path including query string. This can break ASGI apps that expect raw_path to be path-only and use query_string separately.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 178:

<comment>WebSocket ASGI scope passes `raw_path` including query string. This can break ASGI apps that expect `raw_path` to be path-only and use `query_string` separately.</comment>

<file context>
@@ -0,0 +1,219 @@
+            scope = {
+                "type": "websocket",
+                "path": request.url.path,
+                "raw_path": request.url.raw_path,
+                "root_path": self.root_path,
+                "scheme": scheme,
</file context>
Suggested change
"raw_path": request.url.raw_path,
"raw_path": request.url.raw_path.split(b"?")[0],

send = self._asgi_send
try:
await self.app(scope, receive, send)
except Exception as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: raise_app_exceptions is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 128:

<comment>`raise_app_exceptions` is ignored for websocket requests. App errors are always swallowed into close code 1011, preventing expected exception propagation when the flag is true.</comment>

<file context>
@@ -0,0 +1,219 @@
+        send = self._asgi_send
+        try:
+            await self.app(scope, receive, send)
+        except Exception as e:
+            message = {
+                "type": "websocket.close",
</file context>

self._receive_queue.put(message)

async def receive(self, timeout: float | None = None) -> Message:
while self._send_queue.empty():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: timeout is ineffective in receive(), so read(timeout=...) can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_transport.py, line 115:

<comment>`timeout` is ineffective in `receive()`, so `read(timeout=...)` can wait forever instead of timing out. This breaks timeout behavior expected by the stream API.</comment>

<file context>
@@ -0,0 +1,219 @@
+        self._receive_queue.put(message)
+
+    async def receive(self, timeout: float | None = None) -> Message:
+        while self._send_queue.empty():
+            await anyio.sleep(0)
+        return self._send_queue.get(timeout=timeout)
</file context>

Comment on lines +601 to +603
self._send_event, self._receive_event = anyio.create_memory_object_stream[
wsproto.events.Event | HTTPXWSException
]()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At src/httpx2/httpx2/_websockets/_api.py, line 601:

<comment>Async WebSocket session ignores queue_size; memory stream created with default max_buffer_size=0 so caller-specified queue buffering never applies. Pass max_buffer_size=self._queue_size to keep async behavior consistent with the sync session and avoid unintended blocking.</comment>

<file context>
@@ -0,0 +1,1291 @@
+
+    async def __aenter__(self) -> AsyncWebSocketSession:
+        async with contextlib.AsyncExitStack() as exit_stack:
+            self._send_event, self._receive_event = anyio.create_memory_object_stream[
+                wsproto.events.Event | HTTPXWSException
+            ]()
</file context>
Suggested change
self._send_event, self._receive_event = anyio.create_memory_object_stream[
wsproto.events.Event | HTTPXWSException
]()
self._send_event, self._receive_event = anyio.create_memory_object_stream[
wsproto.events.Event | HTTPXWSException
](max_buffer_size=self._queue_size)

Replace the `require_wsproto()` helper with a direct `try`/`import` guard in
each WebSocket entrypoint (`Client.websocket`, `AsyncClient.websocket` and the
top-level `__getattr__` resolution). When `wsproto` is missing the entrypoints
raise a clear ImportError pointing to the `httpx2[ws]` extra, instead of
surfacing a raw `ModuleNotFoundError`.
@Graeme22

Copy link
Copy Markdown

I like this approach better than #1034's!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants