Skip to content

Commit 34ec40b

Browse files
authored
[OPIK-2787] [P SDK] GeneratorExit Error with LangGraph Integration (#3986)
* [OPIK-2787] [P SDK] Add skip error callback functionality in OpikTracer for LangChain integration - Introduced `skip_error_callback` to allow selective skipping of error outputs during trace handling. - Updated `OpikTracer` to incorporate conditional error handling based on the callback's result. - Enhanced error processing logic to handle skipped errors with placeholder outputs. - Added integration test `test_langchain_callback__skip_error_callback__error_output_skipped`. * [OPIK-2787] [P SDK] Refine skip error logic and enhance placeholder output handling - Updated `OpikTracer` to improve error processing flow and conditional skipping logic using `skip_error_callback`. - Introduced explicit handling for placeholder outputs when errors are skipped. - Corrected typo in test prompt template for clarity. * [OPIK-2787] [P SDK] Update `OpikTracer` docstring format for initialization parameters - Revised and expanded `OpikTracer` constructor docstring to provide clearer descriptions for initialization parameters. - Removed duplicated docstring content for improved clarity and reduced redundancy.
1 parent f0d550d commit 34ec40b

File tree

2 files changed

+178
-29
lines changed

2 files changed

+178
-29
lines changed

sdks/python/src/opik/integrations/langchain/opik_tracer.py

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
11
import logging
22
import datetime
3-
from typing import Any, Dict, List, Literal, Optional, Set, TYPE_CHECKING, cast, Tuple
3+
from typing import (
4+
Any,
5+
Dict,
6+
List,
7+
Literal,
8+
Optional,
9+
Set,
10+
TYPE_CHECKING,
11+
cast,
12+
Tuple,
13+
Callable,
14+
)
415
import contextvars
516
from uuid import UUID
617

@@ -36,6 +47,15 @@
3647

3748
SpanType = Literal["llm", "tool", "general"]
3849

50+
# A callable that receives an error string and returns True if the error should be skipped,
51+
# or False otherwise.
52+
SkipErrorCallback = Callable[[str], bool]
53+
54+
# Placeholder output dictionary used when errors are intentionally skipped
55+
# via the skip_error_callback. This signals that the output was not produced
56+
# due to a handled/ignored error during execution.
57+
ERROR_SKIPPED_OUTPUTS = {"warning": "Error output skipped by skip_error_callback."}
58+
3959

4060
def _get_span_type(run: Dict[str, Any]) -> SpanType:
4161
if run.get("run_type") in ["llm", "tool"]:
@@ -48,14 +68,7 @@ def _get_span_type(run: Dict[str, Any]) -> SpanType:
4868

4969

5070
class OpikTracer(BaseTracer):
51-
"""Langchain Opik Tracer.
52-
53-
Args:
54-
tags: List of tags to be applied to each trace logged by the tracer.
55-
metadata: Additional metadata for each trace logged by the tracer.
56-
graph: A LangGraph Graph object to track the Graph Definition in Opik.
57-
project_name: The name of the project to log data.
58-
"""
71+
"""Langchain Opik Tracer."""
5972

6073
def __init__(
6174
self,
@@ -65,8 +78,28 @@ def __init__(
6578
project_name: Optional[str] = None,
6679
distributed_headers: Optional[DistributedTraceHeadersDict] = None,
6780
thread_id: Optional[str] = None,
81+
skip_error_callback: Optional[SkipErrorCallback] = None,
6882
**kwargs: Any,
6983
) -> None:
84+
"""
85+
Initializes an instance of the class with various parameters for traces, metadata, and project configuration.
86+
87+
Args:
88+
tags: List of tags associated with logged traces.
89+
metadata: Dictionary containing metadata information for logged traces.
90+
graph: A LangGraph Graph object for representing dependencies or flow
91+
to track the Graph Definition in Opik.
92+
project_name: Name of the project associated with the traces.
93+
distributed_headers: Headers for distributed tracing context.
94+
thread_id: Unique identifier for the conversational thread
95+
to be associated with traces.
96+
skip_error_callback : Callback function to handle skip errors logic.
97+
Allows defining custom logic for handling errors that are intentionally skipped.
98+
Please note that in traces/spans where errors are intentionally skipped,
99+
the output will be replaced with `ERROR_SKIPPED_OUTPUTS`. You can provide
100+
the output manually using `opik_context.get_current_span_data().update(output=...)`.
101+
**kwargs: Additional arguments passed to the parent class constructor.
102+
"""
70103
validator = parameters_validator.create_validator(
71104
method_name="__init__", class_name=self.__class__.__name__
72105
)
@@ -113,6 +146,8 @@ def __init__(
113146
Optional[str]
114147
] = contextvars.ContextVar("root_run_external_parent_span_id", default=None)
115148

149+
self._skip_error_callback = skip_error_callback
150+
116151
def _is_opik_span_created_by_this_tracer(self, span_id: str) -> bool:
117152
return any(span_.id == span_id for span_ in self._span_data_map.values())
118153

@@ -135,17 +170,22 @@ def _persist_run(self, run: Run) -> None:
135170
error_info: Optional[ErrorInfoDict]
136171
trace_additional_metadata: Dict[str, Any] = {}
137172

138-
if run_dict["error"] is not None:
139-
output = None
140-
error_info = ErrorInfoDict(
141-
exception_type="Exception",
142-
traceback=run_dict["error"],
143-
)
144-
else:
145-
output, trace_additional_metadata = (
146-
langchain_helpers.split_big_langgraph_outputs(run_dict["outputs"])
173+
error_str = run_dict.get("error")
174+
outputs = None
175+
error_info = None
176+
177+
if error_str is not None:
178+
if not self._should_skip_error(error_str):
179+
error_info = ErrorInfoDict(
180+
exception_type="Exception",
181+
traceback=error_str,
182+
)
183+
else:
184+
outputs = ERROR_SKIPPED_OUTPUTS
185+
elif (outputs := run_dict.get("outputs")) is not None:
186+
outputs, trace_additional_metadata = (
187+
langchain_helpers.split_big_langgraph_outputs(outputs)
147188
)
148-
error_info = None
149189

150190
if (
151191
span_data.parent_span_id is not None
@@ -169,7 +209,7 @@ def _persist_run(self, run: Run) -> None:
169209
if trace_additional_metadata:
170210
trace_data.update(metadata=trace_additional_metadata)
171211

172-
trace_data.init_end_time().update(output=output, error_info=error_info)
212+
trace_data.init_end_time().update(output=outputs, error_info=error_info)
173213
trace_ = self._opik_client.trace(**trace_data.as_parameters)
174214

175215
assert trace_ is not None
@@ -446,6 +486,12 @@ def _process_end_span(self, run: Run) -> None:
446486
)
447487
self._opik_context_storage.pop_span_data(ensure_id=span_data.id)
448488

489+
def _should_skip_error(self, error_str: str) -> bool:
490+
if self._skip_error_callback is None:
491+
return False
492+
493+
return self._skip_error_callback(error_str)
494+
449495
def _process_end_span_with_error(self, run: Run) -> None:
450496
if run.id not in self._span_data_map:
451497
LOGGER.warning(
@@ -457,15 +503,20 @@ def _process_end_span_with_error(self, run: Run) -> None:
457503
try:
458504
run_dict: Dict[str, Any] = run.dict()
459505
span_data = self._span_data_map[run.id]
460-
error_info: ErrorInfoDict = {
461-
"exception_type": "Exception",
462-
"traceback": run_dict["error"],
463-
}
506+
error_str = run_dict["error"]
507+
508+
if self._should_skip_error(error_str):
509+
span_data.init_end_time().update(output=ERROR_SKIPPED_OUTPUTS)
510+
else:
511+
error_info = ErrorInfoDict(
512+
exception_type="Exception",
513+
traceback=error_str,
514+
)
515+
span_data.init_end_time().update(
516+
output=None,
517+
error_info=error_info,
518+
)
464519

465-
span_data.init_end_time().update(
466-
output=None,
467-
error_info=error_info,
468-
)
469520
if tracing_runtime_config.is_tracing_active():
470521
self._opik_client.span(**span_data.as_parameters)
471522
except Exception as e:

sdks/python/tests/library_integration/langchain/test_langchain.py

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import pytest
22
from langchain_core.language_models import fake
3+
from langchain_core.language_models.fake import FakeStreamingListLLM
34
from langchain_core.prompts import PromptTemplate
5+
from langchain_core.runnables import RunnableConfig
46

57
import opik
68
from opik import context_storage
79
from opik.api_objects import opik_client, span, trace
810
from opik.config import OPIK_PROJECT_DEFAULT_NAME
9-
from opik.integrations.langchain.opik_tracer import OpikTracer
11+
from opik.integrations.langchain.opik_tracer import OpikTracer, ERROR_SKIPPED_OUTPUTS
1012
from opik.types import DistributedTraceHeadersDict
13+
1114
from ...testlib import (
1215
ANY_BUT_NONE,
1316
ANY_DICT,
@@ -653,3 +656,98 @@ def test_langchain_callback__disabled_tracking(fake_backend):
653656

654657
assert len(fake_backend.trace_trees) == 0
655658
assert len(callback.created_traces()) == 0
659+
660+
661+
def test_langchain_callback__skip_error_callback__error_output_skipped(
662+
fake_backend,
663+
):
664+
def _should_skip_error(error: str) -> bool:
665+
if error is not None and error.startswith("FakeListLLMError"):
666+
# skip processing - we are sure that this is OK
667+
return True
668+
else:
669+
return False
670+
671+
callback = OpikTracer(
672+
skip_error_callback=_should_skip_error,
673+
)
674+
675+
llm = FakeStreamingListLLM(
676+
error_on_chunk_number=0, # throw error on the first chunk
677+
responses=["I'm sorry, I don't think I'm talented enough to write a synopsis"],
678+
)
679+
680+
template = "Given the title of play, write a synopsis for that. Title: {title}."
681+
prompt_template = PromptTemplate(input_variables=["title"], template=template)
682+
683+
synopsis_chain = prompt_template | llm
684+
test_prompts = {"title": "Documentary about Bigfoot in Paris"}
685+
686+
stream = synopsis_chain.stream(
687+
input=test_prompts, config=RunnableConfig(callbacks=[callback])
688+
)
689+
try:
690+
for p in stream:
691+
print(p)
692+
except Exception:
693+
# ignoring exception
694+
pass
695+
696+
opik.flush_tracker()
697+
698+
assert len(fake_backend.trace_trees) == 1
699+
700+
EXPECTED_TRACE_TREE = TraceModel(
701+
id=ANY_BUT_NONE,
702+
start_time=ANY_BUT_NONE,
703+
name="RunnableSequence",
704+
project_name="Default Project",
705+
input={"title": "Documentary about Bigfoot in Paris"},
706+
output=ERROR_SKIPPED_OUTPUTS,
707+
metadata={"created_from": "langchain"},
708+
end_time=ANY_BUT_NONE,
709+
spans=[
710+
SpanModel(
711+
id=ANY_BUT_NONE,
712+
start_time=ANY_BUT_NONE,
713+
name="RunnableSequence",
714+
input={"input": ""},
715+
output=ERROR_SKIPPED_OUTPUTS,
716+
metadata={"created_from": "langchain"},
717+
type="general",
718+
end_time=ANY_BUT_NONE,
719+
project_name="Default Project",
720+
spans=[
721+
SpanModel(
722+
id=ANY_BUT_NONE,
723+
start_time=ANY_BUT_NONE,
724+
name="PromptTemplate",
725+
input={"title": "Documentary about Bigfoot in Paris"},
726+
output={"output": ANY_DICT},
727+
metadata={"created_from": "langchain"},
728+
type="tool",
729+
end_time=ANY_BUT_NONE,
730+
project_name="Default Project",
731+
last_updated_at=ANY_BUT_NONE,
732+
),
733+
SpanModel(
734+
id=ANY_BUT_NONE,
735+
start_time=ANY_BUT_NONE,
736+
name="FakeStreamingListLLM",
737+
input={"prompts": ANY_BUT_NONE},
738+
output=ANY_DICT,
739+
tags=None,
740+
metadata=ANY_DICT,
741+
type="llm",
742+
end_time=ANY_BUT_NONE,
743+
project_name="Default Project",
744+
last_updated_at=ANY_BUT_NONE,
745+
),
746+
],
747+
last_updated_at=ANY_BUT_NONE,
748+
)
749+
],
750+
last_updated_at=ANY_BUT_NONE,
751+
)
752+
753+
assert_equal(expected=EXPECTED_TRACE_TREE, actual=fake_backend.trace_trees[0])

0 commit comments

Comments
 (0)