Skip to content

feat: add IngestTraces client with GALILEO_INGEST_URL routing#520

Open
pratduv wants to merge 2 commits into
mainfrom
prat/ingest-client-routing
Open

feat: add IngestTraces client with GALILEO_INGEST_URL routing#520
pratduv wants to merge 2 commits into
mainfrom
prat/ingest-client-routing

Conversation

@pratduv
Copy link
Copy Markdown
Contributor

@pratduv pratduv commented Mar 26, 2026

User description

Summary

  • Adds IngestTraces client that sends traces/spans directly to the dedicated Go ingest service via httpx
  • Routes ingestion to the ingest service when GALILEO_INGEST_URL env var is set, otherwise uses the existing API client
  • Adds /ingest/traces/{project_id} and /ingest/spans/{project_id} route definitions

Context

The Go ingest service handles multimodal content processing (base64 decoding, S3 upload, file ref rewriting). This PR adds the SDK-side routing so that when GALILEO_INGEST_URL is set (e.g. http://localhost:8081), traces go directly to the ingest service instead of the Python API.

This is primarily for internal Galileo testing of the ingest pipeline. No tests added intentionally, since the routing is validated via E2E tests against the local stack.

Related: #501

Changes

src/galileo/constants/routes.py: Added ingest_traces and ingest_spans routes

src/galileo/traces.py: Added IngestTraces class that:

  • Uses httpx.AsyncClient directly with Galileo-API-Key header auth
  • Sets log_stream_id/experiment_id on requests (same as Traces)
  • Has ingest_traces() and ingest_spans() methods posting to /ingest/ routes

src/galileo/logger/logger.py:

  • Unified client creation into _create_traces_client() (was duplicated between init and lazy path)
  • _create_traces_client() checks GALILEO_INGEST_URL env var and returns IngestTraces if set

Made with Cursor


Generated description

Below is a concise technical summary of the changes proposed in this PR:
Introduce IngestTraces to post traces and spans through the /ingest/... endpoints using httpx.AsyncClient, SDK metadata, and Galileo-API-Key headers. Route GalileoLogger to instantiate IngestTraces when GALILEO_INGEST_URL is set and fall back to Traces otherwise.

TopicDetails
Ingest client Introduce IngestTraces to send trace and span payloads directly to /ingest/traces/{project_id} and /ingest/spans/{project_id} with per-thread httpx.AsyncClient, headers, and SDK metadata such as log_stream_id or experiment_id.
Modified files (2)
  • src/galileo/constants/routes.py
  • src/galileo/traces.py
Latest Contributors(2)
UserCommitDate
vamaq@users.noreply.gi...fix: Define explicit e...February 03, 2026
nachiket@galileo.aifeat: Upgrade linters ...September 24, 2025
Client selection Route GalileoLogger's _create_traces_client to return IngestTraces when GALILEO_INGEST_URL is configured and log that endpoint usage, while otherwise creating the existing Traces client.
Modified files (1)
  • src/galileo/logger/logger.py
Latest Contributors(2)
UserCommitDate
pratyusha@galileo.aifeat: serialize trace ...March 24, 2026
calebe.sep@hotmail.comfix: auto-convert non-...March 02, 2026
This pull request is reviewed by Baz. Review like a pro on (Baz).

When GALILEO_INGEST_URL env var is set, the SDK routes trace/span
ingestion to the dedicated Go ingest service instead of the main API.
This enables multimodal content processing (base64 upload, file ref
rewriting) handled by the ingest service.
@pratduv pratduv requested a review from a team as a code owner March 26, 2026 15:29
@pratduv pratduv requested a review from jasmine-ab-tea March 26, 2026 15:29

_logger = logging.getLogger("galileo.logger")
_traces_client: Optional["Traces"] = None
_traces_client: Optional[Union["Traces", "IngestTraces"]] = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add from __future__ import annotations as the first import in logger.py and change _traces_client: Optional[Union["Traces", "IngestTraces"]] = None to _traces_client: Optional[Union[Traces, IngestTraces]] = None?

Finding type: AI Coding Guidelines | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

Before applying, verify this suggestion against the current code. In
src/galileo/logger/logger.py around line 164, the attribute _traces_client is declared
with quoted forward references. Add `from __future__ import annotations` as the very
first import in this file (before all other imports), then update the class attribute
declaration at line 164 to remove the quotes and use a normal annotation:
`_traces_client: Optional[Union[Traces, IngestTraces]] = None`. Keep all other logic the
same and run a quick type-check to ensure no other forward references need updating.

Comment on lines +357 to +368
def _create_traces_client(self) -> Union[Traces, IngestTraces]:
"""Create the appropriate traces client.

If GALILEO_INGEST_URL is set, routes ingestion to the dedicated Go
ingest service via IngestTraces. Otherwise uses the standard API client.
"""
if not self.project_id:
self._init_project()
if not (self.log_stream_id or self.experiment_id):
self._init_log_stream()

ingest_url = os.environ.get(GALILEO_INGEST_URL_ENV)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_traces_client returns IngestTraces when GALILEO_INGEST_URL is set but IngestTraces doesn't include update_trace/update_span/get_sessions/create_session — should we return full Traces for distributed/session ops or split into separate _traces_client and _ingest_client attributes?

Finding types: Type Inconsistency Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

Before applying, verify this suggestion against the current code. In
src/galileo/logger/logger.py around lines 357-380, the _create_traces_client method
currently returns IngestTraces whenever GALILEO_INGEST_URL is set, but IngestTraces does
not implement methods like update_trace/update_span/get_sessions/create_session used by
distributed-mode and session helpers. Choose one of two approaches: (1) Change
_create_traces_client to return a full Traces client (Traces) when the logger is in
distributed mode (self.mode == "distributed") or when session/update methods are
required, otherwise keep returning IngestTraces for ingestion-only batch flows; or (2)
Refactor the class to maintain two separate clients—a full Traces client for general
operations (updates/sessions) and a separate IngestTraces client used only for
ingest_traces/ingest_spans—by adding a new _ingest_client attribute populated only
when GALILEO_INGEST_URL is set and updating call sites to route through the appropriate
client. Ensure the function's return type annotation remains Union[Traces, IngestTraces]
and add a short comment explaining the routing logic.

Comment thread src/galileo/traces.py
Comment on lines +195 to +199
async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]:
if self.experiment_id:
traces_ingest_request.experiment_id = UUID(self.experiment_id)
elif self.log_stream_id:
traces_ingest_request.log_stream_id = UUID(self.log_stream_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IngestTraces.ingest_traces duplicates the scope-assignment used by Traces methods, should we extract a _apply_scope(request, log_stream_id, experiment_id) helper and call it before posting?

Finding type: Code Dedup and Conventions | Severity: 🟢 Low


Want Baz to fix this for you? Activate Fixer

The IngestTraces client shared a single httpx.AsyncClient across
all EventLoopThreadPool threads. When consecutive flushes landed
on different threads, the client's internal asyncio primitives
were bound to the wrong event loop, causing RuntimeError and
silently dropping traces.

Switch to a per-thread client via threading.local(), matching the
pattern already used in galileo-core's ApiClient.

Made-with: Cursor
Comment thread src/galileo/traces.py
Comment on lines +193 to +200
self._thread_local = local()

@property
def _client(self) -> httpx.AsyncClient:
"""Per-thread AsyncClient to avoid cross-event-loop errors."""
if not hasattr(self._thread_local, "client"):
self._thread_local.client = httpx.AsyncClient(timeout=60)
return self._thread_local.client
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IngestTraces._client caches a single httpx.AsyncClient per thread but repeated asyncio.run calls create fresh event loops and trigger 'AsyncClient is bound to a different event loop' — should we key the client cache by asyncio.get_running_loop() so each loop gets its own AsyncClient?

Finding type: Logical Bugs | Severity: 🔴 High


Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents:

Before applying, verify this suggestion against the current code. In
src/galileo/traces.py around lines 193-200, the _client property currently stores a
single httpx.AsyncClient on self._thread_local and reuses it across different asyncio
event loops on the same thread, which causes 'AsyncClient is bound to a different event
loop' errors. Change the implementation to key the cache by the current running event
loop (use asyncio.get_running_loop()) — e.g., store a dict on self._thread_local
mapping loop -> AsyncClient, create a new AsyncClient when the current loop has no
entry, and return the client for the current loop. Ensure the code handles the case
where no running loop exists (raise or fall back) and avoid reusing an AsyncClient bound
to another loop.

@galileo-automation
Copy link
Copy Markdown
Contributor

No activity for 30 days — this PR will be closed in 5 days unless updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants