|
24 | 24 | instrument_tags, |
25 | 25 | ) |
26 | 26 |
|
27 | | - |
28 | | -from workflows.utils import _nanoid as nanoid |
29 | 27 | from workflows.errors import WorkflowRuntimeError |
30 | 28 | from workflows.events import ( |
31 | 29 | Event, |
32 | 30 | StartEvent, |
33 | 31 | ) |
| 32 | +from workflows.handler import WorkflowHandler |
34 | 33 | from workflows.runtime.control_loop import control_loop, rebuild_state_from_ticks |
35 | 34 | from workflows.runtime.types.internal_state import BrokerState |
36 | 35 | from workflows.runtime.types.plugin import Plugin, WorkflowRuntime, as_snapshottable |
|
49 | 48 | ) |
50 | 49 | from workflows.runtime.types.ticks import TickAddEvent, TickCancelRun, WorkflowTick |
51 | 50 | from workflows.runtime.workflow_registry import workflow_registry |
| 51 | +from workflows.utils import _nanoid as nanoid |
52 | 52 |
|
53 | 53 | from ..context.state_store import MODEL_T |
54 | 54 |
|
55 | | -from workflows.handler import WorkflowHandler |
56 | | - |
57 | 55 | if TYPE_CHECKING: |
58 | 56 | from workflows import Workflow |
59 | 57 | from workflows.context.context import Context |
@@ -105,7 +103,15 @@ def __init__( |
105 | 103 | def _execute_task(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]: |
106 | 104 | task = asyncio.create_task(coro) |
107 | 105 | self._workers.append(task) |
108 | | - task.add_done_callback(lambda _: self._workers.remove(task)) |
| 106 | + |
| 107 | + def _remove_task(_: asyncio.Task[Any]) -> None: |
| 108 | + try: |
| 109 | + self._workers.remove(task) |
| 110 | + except ValueError: |
| 111 | + # Handle Task was already cleared during shutdown or cleanup. |
| 112 | + pass |
| 113 | + |
| 114 | + task.add_done_callback(_remove_task) |
109 | 115 | return task |
110 | 116 |
|
111 | 117 | # context API only |
|
0 commit comments