-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
137 lines (114 loc) · 4.64 KB
/
Copy pathmain.py
File metadata and controls
137 lines (114 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""
Perception service.
Subscribes to the RGB and depth streams from the Pi-side sensor publisher,
throttles to `perception.tick_hz`, runs a detector on the latest
synchronized frameset, and publishes a SceneHeader on
`services.perception_scene_pub`. Provides a cheap, always-available scene
graph at a stable tick rate; the orchestrator may still reason on pixels
directly via its own VLM.
"""
from __future__ import annotations
import argparse
import dataclasses
import signal
import time
import zmq
from lexaire import logs, transport
from lexaire.config import load_config
from lexaire.messages import SceneHeader, encode_header
from lexaire.subscriber import SensorSubscriber
from .detectors import DetectorInputs, build_detector
def cli() -> int:
p = argparse.ArgumentParser(description="Lexaire perception service")
p.add_argument("--config", help="path to config.yaml")
p.add_argument("--once", action="store_true", help="produce one scene then exit (for tests)")
args = p.parse_args()
cfg = load_config(args.config)
log = logs.configure("perception", cfg.get("logging.level", "INFO"))
# Clamp tick_hz: 0 would divide-by-zero below; negative is meaningless.
raw_tick = float(cfg.get("perception.tick_hz", 2.0))
tick_hz = max(raw_tick, 0.1)
if tick_hz != raw_tick:
log.warning("perception.tick_hz=%.2f clamped to %.2f", raw_tick, tick_hz)
scene_pub_ep = cfg.require("services.perception_scene_pub")
detector = build_detector(cfg)
ctx = zmq.Context()
sub = SensorSubscriber(
ctx,
rgb_endpoint=cfg.require("sensor.channels.rgb"),
depth_endpoint=cfg.require("sensor.channels.depth"),
)
pub = transport.pub(ctx, scene_pub_ep)
stop = False
def _stop(*_):
nonlocal stop
stop = True
signal.signal(signal.SIGINT, _stop)
signal.signal(signal.SIGTERM, _stop)
sub.start()
log.info("perception up tick=%.1fHz scene_pub=%s", tick_hz, scene_pub_ep)
interval_s = 1.0 / tick_hz
last_tick = 0.0
latest_fs = None
latest_fs_ts = 0.0 # monotonic at write
# Without this gate, scene detections keep flowing with a frozen
# frame_seq after the publisher dies (mirrors the orchestrator's
# frame_max_age_s on its own RGB path).
frame_max_age_s = float(cfg.get("perception.frame_max_age_s", 2.0))
# --once bails after 10s with no frame so a missing publisher
# doesn't hang CI runs.
once_deadline = time.monotonic() + 10.0 if args.once else None
try:
while not stop:
# Drain to newest — one-per-tick against a faster publisher
# saturates the matcher's queue and lags YOLO behind real-time.
fs = sub.get_nowait()
while fs is not None:
latest_fs = fs
latest_fs_ts = time.monotonic()
fs = sub.get_nowait()
now = time.monotonic()
if once_deadline is not None and latest_fs is None and now >= once_deadline:
log.error("--once: no frame from publisher within 10s; exiting")
return 2
if now - last_tick < interval_s:
time.sleep(min(interval_s - (now - last_tick), 0.05))
continue
last_tick = now
if latest_fs is None:
continue
if now - latest_fs_ts > frame_max_age_s:
continue
inp = DetectorInputs(
rgb=latest_fs.rgb,
depth=latest_fs.depth,
depth_scale_m=latest_fs.depth_scale_m,
intrinsics=latest_fs.intrinsics,
)
try:
detections = detector.detect(inp)
except Exception as e:
log.exception("detector error: %s", e)
continue
header = SceneHeader(
ts_ns=latest_fs.ts_ns,
frame_seq=latest_fs.seq,
detections=[dataclasses.asdict(d) for d in detections],
)
pub.send(encode_header(header))
log.debug("scene ts_ns=%d seq=%d n=%d", header.ts_ns, header.frame_seq, len(header.detections))
if args.once:
# transport.pub() sets LINGER=0 by default — fine for the
# streaming case where missing one frame doesn't matter,
# but for --once mode we need a graceful drain or the
# one-and-only frame can vanish on close.
pub.setsockopt(zmq.LINGER, 500)
break
finally:
sub.stop()
pub.close()
ctx.term()
log.info("perception shutdown")
return 0
if __name__ == "__main__":
raise SystemExit(cli())