Skip to content

Commit 61d38b5

Browse files
author
Harrison Ngo
committed
[APO-2194] Raise error on json serialization errors for outputs
1 parent e092529 commit 61d38b5

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

src/vellum/workflows/nodes/bases/tests/test_base_node.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from vellum.client.types.string_vellum_value_request import StringVellumValueRequest
77
from vellum.workflows.constants import undefined
88
from vellum.workflows.descriptors.tests.test_utils import FixtureState
9+
from vellum.workflows.errors.types import WorkflowErrorCode
910
from vellum.workflows.inputs.base import BaseInputs
1011
from vellum.workflows.nodes import FinalOutputNode
1112
from vellum.workflows.nodes.bases.base import BaseNode
@@ -15,6 +16,7 @@
1516
from vellum.workflows.references.node import NodeReference
1617
from vellum.workflows.references.output import OutputReference
1718
from vellum.workflows.state.base import BaseState, StateMeta
19+
from vellum.workflows.workflows.base import BaseWorkflow
1820

1921

2022
def test_base_node__node_resolution__unset_pydantic_fields():
@@ -379,3 +381,28 @@ class Ports(MyNode.Ports):
379381
# Potentially in the future, we support inheriting ports from multiple parents.
380382
# For now, we take only the declared ports, so that not all nodes have the default port.
381383
assert ports == ["bar"]
384+
385+
386+
def test_base_node__bytes_output_raises_serialization_error():
387+
"""Test that returning bytes in node outputs rejects the workflow execution."""
388+
389+
class BytesOutputNode(BaseNode):
390+
class Outputs(BaseNode.Outputs):
391+
result: str
392+
393+
def run(self) -> "BytesOutputNode.Outputs":
394+
b = b"hello"
395+
return self.Outputs(result=b)
396+
397+
class BytesWorkflow(BaseWorkflow):
398+
graph = BytesOutputNode
399+
400+
workflow = BytesWorkflow()
401+
402+
# WHEN we run the workflow
403+
result = workflow.run()
404+
405+
# THEN the execution is rejected with a helpful error
406+
assert result.name == "workflow.execution.rejected"
407+
assert result.error.code == WorkflowErrorCode.INVALID_OUTPUTS
408+
assert "bytes" in result.error.message.lower()

src/vellum/workflows/runner/runner.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,14 @@
4949
NodeExecutionRejectedBody,
5050
NodeExecutionStreamingBody,
5151
)
52-
from vellum.workflows.events.types import BaseEvent, NodeParentContext, ParentContext, SpanLink, WorkflowParentContext
52+
from vellum.workflows.events.types import (
53+
BaseEvent,
54+
NodeParentContext,
55+
ParentContext,
56+
SpanLink,
57+
WorkflowParentContext,
58+
default_serializer,
59+
)
5360
from vellum.workflows.events.workflow import (
5461
WorkflowEventStream,
5562
WorkflowExecutionFulfilledBody,
@@ -565,6 +572,16 @@ def initiate_node_streaming_output(
565572

566573
with execution_context(parent_context=updated_parent_context, trace_id=execution.trace_id):
567574
for output in node_run_response:
575+
try:
576+
default_serializer(output)
577+
except (TypeError, ValueError) as exc:
578+
raise NodeException(
579+
message=(
580+
f"Node {node.__class__.__name__} produced output: "
581+
f"'{output.name}' that could not be serialized to JSON: {exc}"
582+
),
583+
code=WorkflowErrorCode.INVALID_OUTPUTS,
584+
) from exc
568585
invoked_ports = output > ports
569586
if output.is_initiated:
570587
yield from initiate_node_streaming_output(output)
@@ -599,6 +616,20 @@ def initiate_node_streaming_output(
599616
parent=execution.parent_context,
600617
)
601618

619+
for descriptor, output_value in outputs:
620+
if output_value is undefined:
621+
continue
622+
try:
623+
default_serializer(output_value)
624+
except (TypeError, ValueError) as exc:
625+
raise NodeException(
626+
message=(
627+
f"Node {node.__class__.__name__} produced output '{descriptor.name}' "
628+
f"that could not be serialized to JSON: {exc}"
629+
),
630+
code=WorkflowErrorCode.INVALID_OUTPUTS,
631+
) from exc
632+
602633
node.state.meta.node_execution_cache.fulfill_node_execution(node.__class__, span_id)
603634

604635
with execution_context(parent_context=updated_parent_context, trace_id=execution.trace_id):

0 commit comments

Comments
 (0)