Skip to content

Langchain adapter updates and x402 client update#181

Open
dixitaniket wants to merge 17 commits intomainfrom
ani/langchain-adapater
Open

Langchain adapter updates and x402 client update#181
dixitaniket wants to merge 17 commits intomainfrom
ani/langchain-adapater

Conversation

@dixitaniket
Copy link
Collaborator

@dixitaniket dixitaniket commented Mar 12, 2026

  • langchain adapter to support x402
  • ssl cert refesh on errors (tee registry update)
  • x402 client update to support preinstalled permit2 address

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the LangChain adapter and LLM client behavior to better support async/streaming usage and to automatically recover from a known x402 “Invalid payment required response” failure mode, along with dependency lockfile updates.

Changes:

  • Add “retry once after x402 stack reset” behavior for non-streaming and streaming LLM requests when the known invalid-payment error occurs.
  • Expand the LangChain adapter to support client injection, async methods, streaming chunk conversion, and improved tool-call parsing/serialization.
  • Update pinned dependency versions (including og-test-v2-x402==0.0.12.dev3) and refresh uv.lock.

Reviewed changes

Copilot reviewed 6 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/opengradient/client/llm.py Adds x402 stack reset + retry-once logic for completion/chat, including streaming retry.
src/opengradient/agents/og_langchain.py Refactors LangChain adapter for async paths, streaming support, tool handling, and client lifecycle.
src/opengradient/agents/__init__.py Extends langchain_adapter factory to accept client + connection overrides and new params.
tests/llm_test.py Adds tests asserting a single retry + reset on the invalid-payment error (streaming and non-streaming).
tests/langchain_adapter_test.py Adds coverage for injected client init, missing-key validation, identifying params, and async/stream paths.
pyproject.toml Bumps pinned og-test-v2-x402 dependency.
requirements.txt Bumps pinned og-test-v2-x402 dependency.
uv.lock Updates locked versions (including opengradient metadata and og-test-v2-x402).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +368 to +393
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
sdk_messages = self._convert_messages_to_sdk(messages)
chat_kwargs = self._build_chat_kwargs(sdk_messages, stop, stream=True, **kwargs)
queue: Queue[Any] = Queue()

return ChatResult(generations=[ChatGeneration(message=ai_message, generation_info={"finish_reason": finish_reason})])
def _runner() -> None:
async def _run() -> None:
stream = await self._llm.chat(**chat_kwargs)
async for chunk in cast(AsyncIterator[StreamChunk], stream):
queue.put(self._stream_chunk_to_generation(chunk))

try:
asyncio.run(_run())
except BaseException as exc: # noqa: BLE001
queue.put(exc)
finally:
queue.put(_STREAM_END)

thread = Thread(target=_runner, daemon=True)
thread.start()
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_stream always spawns a new thread and runs self._llm.chat() inside asyncio.run(). Since the SDK client’s underlying async HTTP client is created on the main thread, using it from another thread/event loop is not safe. Consider implementing sync streaming by driving an event loop in the current thread (e.g., create a dedicated loop and iterate the async generator via run_until_complete on __anext__()), or otherwise ensure the HTTP client is created/used within the same thread+loop.

Copilot uses AI. Check for mistakes.
):
resolved_model_cid = model_cid or model
if resolved_model_cid is None:
raise ValueError("model_cid (or model) is required.")
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpenGradientChatModel now accepts model_cid / model as str, but the underlying SDK client expects models in the provider/model form (LLM.chat does model.split("/")[1]). Passing a plain model name like "gpt-5" would raise IndexError at runtime. Consider validating the string format here (or normalizing to the required format) and raising a clear ValueError when it’s not supported.

Suggested change
raise ValueError("model_cid (or model) is required.")
raise ValueError("model_cid (or model) is required.")
# When a plain string is provided, ensure it matches the expected "provider/model" format
if isinstance(resolved_model_cid, str) and "/" not in resolved_model_cid:
raise ValueError(
f"Invalid model identifier '{resolved_model_cid}'. "
"Expected format 'provider/model', e.g. 'openai/gpt-4o'."
)

Copilot uses AI. Check for mistakes.
Comment on lines +126 to +148
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)

queue: Queue[Any] = Queue(maxsize=1)

def _runner() -> None:
try:
queue.put(asyncio.run(coro))
except BaseException as exc: # noqa: BLE001
queue.put(exc)

thread = Thread(target=_runner, daemon=True)
thread.start()
outcome = queue.get()
thread.join()

if isinstance(outcome, BaseException):
raise outcome
return outcome


Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_run_coro_sync runs the coroutine in a new thread when a loop is already running. Because LLM holds an httpx.AsyncClient (via x402HttpxClient), running its async methods in a different thread/event loop is not thread-safe and can fail with “attached to a different loop” style errors. A safer approach is to either (a) raise a clear error instructing callers to use the async methods when a loop is running, or (b) schedule work onto the existing loop (without creating a second event loop) rather than using asyncio.run in another thread.

Suggested change
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
queue: Queue[Any] = Queue(maxsize=1)
def _runner() -> None:
try:
queue.put(asyncio.run(coro))
except BaseException as exc: # noqa: BLE001
queue.put(exc)
thread = Thread(target=_runner, daemon=True)
thread.start()
outcome = queue.get()
thread.join()
if isinstance(outcome, BaseException):
raise outcome
return outcome
"""Run a coroutine synchronously when no event loop is running.
If an event loop is already running in this thread, synchronous execution
is not supported because it may conflict with resources (such as HTTP
clients) that are bound to the existing loop. In that case, callers must
use the async APIs directly.
"""
try:
# Raises RuntimeError if no loop is running in this thread.
asyncio.get_running_loop()
except RuntimeError:
# Safe to create and run a new event loop.
return asyncio.run(coro)
# An event loop is already running; do not create a second loop in
# another thread, as this is not safe for loop-bound resources.
raise RuntimeError(
"Cannot run coroutine synchronously while an event loop is running. "
"Use the async methods of OpenGradientChatModel instead."
)

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 9 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

pyproject.toml Outdated
"openai>=1.58.1",
"pydantic>=2.9.2",
"og-test-v2-x402==0.0.11"
"og-x402==0.0.1.dev2"
Comment on lines +307 to +324
def _build_chat_kwargs(self, sdk_messages: List[Dict[str, Any]], stop: Optional[List[str]], stream: bool, **kwargs: Any) -> Dict[str, Any]:
x402_settlement_mode = kwargs.get("x402_settlement_mode", self.x402_settlement_mode)
if isinstance(x402_settlement_mode, str):
x402_settlement_mode = x402SettlementMode(x402_settlement_mode)
model = kwargs.get("model", self.model_cid)
model = _validate_model_string(model)

return {
"model": model,
"messages": sdk_messages,
"stop_sequence": stop,
"max_tokens": kwargs.get("max_tokens", self.max_tokens),
"temperature": kwargs.get("temperature", self.temperature),
"tools": kwargs.get("tools", self._tools),
"tool_choice": kwargs.get("tool_choice", self._tool_choice),
"x402_settlement_mode": x402_settlement_mode,
"stream": stream,
}
@dixitaniket dixitaniket changed the title Langchain adapter updates Langchain adapter updates and x402 client update Mar 19, 2026
dixitaniket and others added 4 commits March 23, 2026 20:48
# Conflicts:
#	pyproject.toml
#	requirements.txt
#	src/opengradient/client/llm.py
#	uv.lock
Signed-off-by: kukac <adambalogh@users.noreply.github.com>

async def _parse_sse_response(self, response) -> AsyncGenerator[StreamChunk, None]:
retried = False
while True:
Copy link
Collaborator

@adambalogh adambalogh Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need an infinite loop here?

msg = str(current).lower()
if any(
keyword in msg
for keyword in (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite brittle, i'd prefer not to have this kind of logic in the SDK, what is the goal here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i will fix and add the max retry logic

except Exception:
logger.debug("Failed to close previous HTTP client during TEE refresh.", exc_info=True)

async def _retry_once_on_recoverable_error(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd rather just retry X times like normal and if it still fails at the end then re-evaluate if we should use a different TEE

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, i will add that in

tee_signature=result.get("tee_signature"),
tee_timestamp=result.get("tee_timestamp"),
**self._tee_metadata(),
tee_signature=result.get("tee_signature") or tee_headers.get("tee_signature"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reasoning here? why isn't there 1 source of this only?

claude-3-5-haiku was retired by Anthropic on Feb 19 2026.
grok-2 is discontinued by xAI — the gateway now rejects both with 404.
Removing them from the SDK enum so callers get a clear error at the
SDK level rather than a 500 from the gateway.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
google/gemini-3-pro-preview was shut down by Google on March 9, 2026.
Mirrors the same removal made in tee-gateway model_registry.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants