feat: add IngestTraces client with GALILEO_INGEST_URL routing#520
feat: add IngestTraces client with GALILEO_INGEST_URL routing#520pratduv wants to merge 2 commits into
Conversation
When GALILEO_INGEST_URL env var is set, the SDK routes trace/span ingestion to the dedicated Go ingest service instead of the main API. This enables multimodal content processing (base64 upload, file ref rewriting) handled by the ingest service.
|
|
||
| _logger = logging.getLogger("galileo.logger") | ||
| _traces_client: Optional["Traces"] = None | ||
| _traces_client: Optional[Union["Traces", "IngestTraces"]] = None |
There was a problem hiding this comment.
Should we add from __future__ import annotations as the first import in logger.py and change _traces_client: Optional[Union["Traces", "IngestTraces"]] = None to _traces_client: Optional[Union[Traces, IngestTraces]] = None?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
Before applying, verify this suggestion against the current code. In
src/galileo/logger/logger.py around line 164, the attribute _traces_client is declared
with quoted forward references. Add `from __future__ import annotations` as the very
first import in this file (before all other imports), then update the class attribute
declaration at line 164 to remove the quotes and use a normal annotation:
`_traces_client: Optional[Union[Traces, IngestTraces]] = None`. Keep all other logic the
same and run a quick type-check to ensure no other forward references need updating.
| def _create_traces_client(self) -> Union[Traces, IngestTraces]: | ||
| """Create the appropriate traces client. | ||
|
|
||
| If GALILEO_INGEST_URL is set, routes ingestion to the dedicated Go | ||
| ingest service via IngestTraces. Otherwise uses the standard API client. | ||
| """ | ||
| if not self.project_id: | ||
| self._init_project() | ||
| if not (self.log_stream_id or self.experiment_id): | ||
| self._init_log_stream() | ||
|
|
||
| ingest_url = os.environ.get(GALILEO_INGEST_URL_ENV) |
There was a problem hiding this comment.
_create_traces_client returns IngestTraces when GALILEO_INGEST_URL is set but IngestTraces doesn't include update_trace/update_span/get_sessions/create_session — should we return full Traces for distributed/session ops or split into separate _traces_client and _ingest_client attributes?
Finding types: Type Inconsistency Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
Before applying, verify this suggestion against the current code. In
src/galileo/logger/logger.py around lines 357-380, the _create_traces_client method
currently returns IngestTraces whenever GALILEO_INGEST_URL is set, but IngestTraces does
not implement methods like update_trace/update_span/get_sessions/create_session used by
distributed-mode and session helpers. Choose one of two approaches: (1) Change
_create_traces_client to return a full Traces client (Traces) when the logger is in
distributed mode (self.mode == "distributed") or when session/update methods are
required, otherwise keep returning IngestTraces for ingestion-only batch flows; or (2)
Refactor the class to maintain two separate clients—a full Traces client for general
operations (updates/sessions) and a separate IngestTraces client used only for
ingest_traces/ingest_spans—by adding a new _ingest_client attribute populated only
when GALILEO_INGEST_URL is set and updating call sites to route through the appropriate
client. Ensure the function's return type annotation remains Union[Traces, IngestTraces]
and add a short comment explaining the routing logic.
| async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]: | ||
| if self.experiment_id: | ||
| traces_ingest_request.experiment_id = UUID(self.experiment_id) | ||
| elif self.log_stream_id: | ||
| traces_ingest_request.log_stream_id = UUID(self.log_stream_id) |
There was a problem hiding this comment.
IngestTraces.ingest_traces duplicates the scope-assignment used by Traces methods, should we extract a _apply_scope(request, log_stream_id, experiment_id) helper and call it before posting?
Finding type: Code Dedup and Conventions | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
The IngestTraces client shared a single httpx.AsyncClient across all EventLoopThreadPool threads. When consecutive flushes landed on different threads, the client's internal asyncio primitives were bound to the wrong event loop, causing RuntimeError and silently dropping traces. Switch to a per-thread client via threading.local(), matching the pattern already used in galileo-core's ApiClient. Made-with: Cursor
| self._thread_local = local() | ||
|
|
||
| @property | ||
| def _client(self) -> httpx.AsyncClient: | ||
| """Per-thread AsyncClient to avoid cross-event-loop errors.""" | ||
| if not hasattr(self._thread_local, "client"): | ||
| self._thread_local.client = httpx.AsyncClient(timeout=60) | ||
| return self._thread_local.client |
There was a problem hiding this comment.
IngestTraces._client caches a single httpx.AsyncClient per thread but repeated asyncio.run calls create fresh event loops and trigger 'AsyncClient is bound to a different event loop' — should we key the client cache by asyncio.get_running_loop() so each loop gets its own AsyncClient?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
Before applying, verify this suggestion against the current code. In
src/galileo/traces.py around lines 193-200, the _client property currently stores a
single httpx.AsyncClient on self._thread_local and reuses it across different asyncio
event loops on the same thread, which causes 'AsyncClient is bound to a different event
loop' errors. Change the implementation to key the cache by the current running event
loop (use asyncio.get_running_loop()) — e.g., store a dict on self._thread_local
mapping loop -> AsyncClient, create a new AsyncClient when the current loop has no
entry, and return the client for the current loop. Ensure the code handles the case
where no running loop exists (raise or fall back) and avoid reusing an AsyncClient bound
to another loop.
|
No activity for 30 days — this PR will be closed in 5 days unless updated. |
User description
Summary
IngestTracesclient that sends traces/spans directly to the dedicated Go ingest service viahttpxGALILEO_INGEST_URLenv var is set, otherwise uses the existing API client/ingest/traces/{project_id}and/ingest/spans/{project_id}route definitionsContext
The Go ingest service handles multimodal content processing (base64 decoding, S3 upload, file ref rewriting). This PR adds the SDK-side routing so that when
GALILEO_INGEST_URLis set (e.g.http://localhost:8081), traces go directly to the ingest service instead of the Python API.This is primarily for internal Galileo testing of the ingest pipeline. No tests added intentionally, since the routing is validated via E2E tests against the local stack.
Related: #501
Changes
src/galileo/constants/routes.py: Addedingest_tracesandingest_spansroutessrc/galileo/traces.py: AddedIngestTracesclass that:httpx.AsyncClientdirectly withGalileo-API-Keyheader authlog_stream_id/experiment_idon requests (same asTraces)ingest_traces()andingest_spans()methods posting to/ingest/routessrc/galileo/logger/logger.py:_create_traces_client()(was duplicated between init and lazy path)_create_traces_client()checksGALILEO_INGEST_URLenv var and returnsIngestTracesif setMade with Cursor
Generated description
Below is a concise technical summary of the changes proposed in this PR:
Introduce
IngestTracesto post traces and spans through the/ingest/...endpoints usinghttpx.AsyncClient, SDK metadata, andGalileo-API-Keyheaders. RouteGalileoLoggerto instantiateIngestTraceswhenGALILEO_INGEST_URLis set and fall back toTracesotherwise.IngestTracesto send trace and span payloads directly to/ingest/traces/{project_id}and/ingest/spans/{project_id}with per-threadhttpx.AsyncClient, headers, and SDK metadata such aslog_stream_idorexperiment_id.Modified files (2)
Latest Contributors(2)
GalileoLogger's_create_traces_clientto returnIngestTraceswhenGALILEO_INGEST_URLis configured and log that endpoint usage, while otherwise creating the existingTracesclient.Modified files (1)
Latest Contributors(2)