Skip to content

Commit 1e89f98

Browse files
committed
.brokers._daemon: add notes around needed brokerd respawn tech
1 parent ccced4c commit 1e89f98

File tree

2 files changed

+41
-19
lines changed

2 files changed

+41
-19
lines changed

piker/brokers/_daemon.py

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@
1919
``brokerd``.
2020
2121
'''
22+
from __future__ import annotations
2223
from contextlib import (
2324
asynccontextmanager as acm,
2425
)
26+
from typing import TYPE_CHECKING
27+
import exceptiongroup as eg
2528

2629
import tractor
2730
import trio
2831

2932
from . import _util
3033
from . import get_brokermod
3134

35+
if TYPE_CHECKING:
36+
from ..data import _FeedsBus
37+
3238
# `brokerd` enabled modules
3339
# TODO: move this def to the `.data` subpkg..
3440
# NOTE: keeping this list as small as possible is part of our caps-sec
@@ -69,24 +75,40 @@ async def _setup_persistent_brokerd(
6975
# set global for this actor to this new process-wide instance B)
7076
_util.log = log
7177

72-
from piker.data.feed import (
73-
_bus,
74-
get_feed_bus,
75-
)
76-
global _bus
77-
assert not _bus
78-
79-
async with trio.open_nursery() as service_nursery:
80-
# assign a nursery to the feeds bus for spawning
81-
# background tasks from clients
82-
get_feed_bus(brokername, service_nursery)
83-
84-
# unblock caller
85-
await ctx.started()
86-
87-
# we pin this task to keep the feeds manager active until the
88-
# parent actor decides to tear it down
89-
await trio.sleep_forever()
78+
from piker.data import feed
79+
assert not feed._bus
80+
81+
# allocate a nursery to the bus for spawning background
82+
# tasks to service client IPC requests, normally
83+
# `tractor.Context` connections to explicitly required
84+
# `brokerd` endpoints such as:
85+
# - `stream_quotes()`,
86+
# - `manage_history()`,
87+
# - `allocate_persistent_feed()`,
88+
# - `open_symbol_search()`
89+
# NOTE: see ep invocation details inside `.data.feed`.
90+
try:
91+
async with trio.open_nursery() as service_nursery:
92+
bus: _FeedsBus = feed.get_feed_bus(
93+
brokername,
94+
service_nursery,
95+
)
96+
assert bus is feed._bus
97+
98+
# unblock caller
99+
await ctx.started()
100+
101+
# we pin this task to keep the feeds manager active until the
102+
# parent actor decides to tear it down
103+
await trio.sleep_forever()
104+
105+
except eg.ExceptionGroup:
106+
# TODO: likely some underlying `brokerd` IPC connection
107+
# broke so here we handle a respawn and re-connect attempt!
108+
# This likely should pair with development of the OCO task
109+
# nusery in dev over @ `tractor` B)
110+
# https://github.com/goodboy/tractor/pull/363
111+
raise
90112

91113

92114
async def spawn_brokerd(

piker/data/feed.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ def remove_subs(
190190

191191
def get_feed_bus(
192192
brokername: str,
193-
nursery: Optional[trio.Nursery] = None,
193+
nursery: trio.Nursery | None = None,
194194

195195
) -> _FeedsBus:
196196
'''

0 commit comments

Comments
 (0)