Skip to content
61 changes: 59 additions & 2 deletions app/common/camera_preview_backend/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,27 +569,84 @@ def create_onnx_face_detector(


def detect_faces_onnx(frame_bgr, *, detector_state) -> list[Rect]:
def _prepare_detection_frame(src, *, max_side: int = 640):
h0, w0 = src.shape[:2]
if h0 <= 0 or w0 <= 0:
return src, 1.0, 1.0
side = max(int(w0), int(h0))
if side <= int(max_side):
return src, 1.0, 1.0

ratio = float(max_side) / float(side)
w1 = max(1, int(round(float(w0) * ratio)))
h1 = max(1, int(round(float(h0) * ratio)))

with _CV2_IMPORT_LOCK:
import cv2

resized = cv2.resize(src, (w1, h1), interpolation=cv2.INTER_LINEAR)
scale_x = float(w0) / float(w1)
scale_y = float(h0) / float(h1)
return resized, scale_x, scale_y

def _rescale_rects(
rects: list[Rect],
*,
scale_x: float,
scale_y: float,
frame_size: tuple[int, int],
) -> list[Rect]:
h, w = int(frame_size[0]), int(frame_size[1])
if not rects:
return []
if abs(float(scale_x) - 1.0) < 1e-6 and abs(float(scale_y) - 1.0) < 1e-6:
return list(rects)
scaled: list[Rect] = []
for x, y, bw, bh in rects:
try:
x1 = int(round(float(x) * float(scale_x)))
y1 = int(round(float(y) * float(scale_y)))
w1 = int(round(float(bw) * float(scale_x)))
h1 = int(round(float(bh) * float(scale_y)))
except Exception:
continue
if w1 <= 0 or h1 <= 0:
continue
x1 = max(0, min(x1, w - 1))
y1 = max(0, min(y1, h - 1))
w1 = max(1, min(w1, w - x1))
h1 = max(1, min(h1, h - y1))
scaled.append((x1, y1, w1, h1))
return scaled

kind = ""
try:
kind = str(detector_state.get("kind", "")).lower()
except Exception:
kind = ""

frame_size = frame_bgr.shape[:2]
detect_frame, scale_x, scale_y = _prepare_detection_frame(frame_bgr, max_side=640)
if kind == "yunet":
rects = detect_faces_yunet(
frame_bgr,
detect_frame,
detector=detector_state["detector"],
input_size=detector_state.get("input_size"),
)
rects = _rescale_rects(
rects, scale_x=scale_x, scale_y=scale_y, frame_size=frame_size
)
return merge_face_rects(frame_size, rects)
if kind == "ultralight":
rects = detect_faces_ultralight(
frame_bgr,
detect_frame,
net=detector_state["net"],
input_size=detector_state["input_size"],
priors=detector_state.get("priors"),
)
rects = _rescale_rects(
rects, scale_x=scale_x, scale_y=scale_y, frame_size=frame_size
)
return merge_face_rects(frame_size, rects)
raise RuntimeError("Invalid detector state")

Expand Down
97 changes: 93 additions & 4 deletions app/common/camera_preview_backend/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,31 @@ def __init__(self, parent: Optional[QObject] = None) -> None:
self._input_size = None
self._last_detect = 0.0
self._last_load_attempt = 0.0
self._pending_frame = None
self._detect_timer: Optional[QTimer] = None
self._detect_scheduled = False
self._detect_inflight = False
self._target_interval_s = 0.08
self._min_interval_s = 0.08
self._max_interval_s = 0.14
self._detect_interval_s = self._target_interval_s
self._recent_detect_cost_s = self._target_interval_s

self._detector_state = None
self._cv2 = None

@Slot(bool)
def set_enabled(self, enabled: bool) -> None:
self._enabled = bool(enabled)
if not self._enabled:
self._pending_frame = None
self._detect_scheduled = False
timer = self._detect_timer
if timer is not None and timer.isActive():
timer.stop()
return
if self._pending_frame is not None:
self._schedule_detect(0)

@Slot(object)
def set_model_filename(self, model_filename: object) -> None:
Expand Down Expand Up @@ -560,23 +578,72 @@ def ensure_loaded(self) -> None:

@Slot(object)
def process_frame(self, frame_bgr) -> None:
if frame_bgr is None:
return
self._pending_frame = frame_bgr
if self._enabled:
self._schedule_detect(0)

def _ensure_detect_timer(self) -> QTimer:
timer = self._detect_timer
if timer is not None:
return timer
timer = QTimer(self)
timer.setSingleShot(True)
timer.timeout.connect(self._consume_pending_frame)
self._detect_timer = timer
return timer

def _schedule_detect(self, delay_ms: int) -> None:
if not self._enabled:
return
if self._cv2 is None:
timer = self._ensure_detect_timer()
delay = max(0, int(delay_ms))
if timer.isActive():
try:
remaining = int(timer.remainingTime())
except Exception:
remaining = delay
if remaining <= delay:
return
timer.stop()
self._detect_scheduled = True
timer.start(delay)

@Slot()
def _consume_pending_frame(self) -> None:
self._detect_scheduled = False
if not self._enabled or self._detect_inflight:
return

frame = self._pending_frame
if frame is None:
return

now = time.monotonic()
if now - self._last_detect < 0.05:
elapsed = now - self._last_detect
if self._last_detect > 0.0 and elapsed < self._detect_interval_s:
wait_ms = int(max(1.0, (self._detect_interval_s - elapsed) * 1000.0))
self._schedule_detect(wait_ms)
return

self._pending_frame = None
self._detect_inflight = True
started_at = time.perf_counter()
emitted_error = False
if not self._enabled:
self._detect_inflight = False
return
self._last_detect = now

self.ensure_loaded()

try:
if self._cv2 is None:
return
state = self._detector_state
if state is None:
return
results = detect_faces_onnx(frame_bgr, detector_state=state)
results = detect_faces_onnx(frame, detector_state=state)
except Exception as exc:
logger.exception("人脸检测失败: {}", exc)
key = "detect_failed"
Expand All @@ -587,6 +654,28 @@ def process_frame(self, frame_bgr) -> None:
):
key = "model_incompatible"
self.error_occurred.emit(key, "Face detection failed", msg)
emitted_error = True
return
finally:
cost_s = max(0.0, time.perf_counter() - started_at)
self._recent_detect_cost_s = self._recent_detect_cost_s * 0.7 + cost_s * 0.3
if self._recent_detect_cost_s > self._detect_interval_s * 1.05:
desired = min(
self._max_interval_s,
max(0.10, self._recent_detect_cost_s * 1.2),
)
else:
desired = max(
self._target_interval_s,
self._detect_interval_s * 0.92,
)
self._detect_interval_s = min(
self._max_interval_s,
max(self._min_interval_s, desired),
)
self._last_detect = time.monotonic()
self._detect_inflight = False
if self._pending_frame is not None and not emitted_error:
self._schedule_detect(0)

self.faces_ready.emit(results)
13 changes: 10 additions & 3 deletions app/common/history/background_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from app.Language.obtain_language import get_content_name_async
from app.common.history.history_reader import (
check_class_has_gender_or_group,
check_roll_call_students_have_gender_or_group,
filter_roll_call_history_by_subject,
get_lottery_history_data,
get_lottery_pool_list,
Expand Down Expand Up @@ -79,7 +79,9 @@ def build_roll_call_history_payload(
all_names = [name for _, name, _, _ in cleaned_students if name]
base_history_data = get_roll_call_history_data(class_name)
available_subjects = _collect_roll_call_subjects(base_history_data)
has_gender, has_group = check_class_has_gender_or_group(class_name)
has_gender, has_group = check_roll_call_students_have_gender_or_group(
cleaned_students
)
reverse_order = bool(sort_order_desc)

if mode_index == 0:
Expand All @@ -91,7 +93,12 @@ def build_roll_call_history_payload(
students_data = get_roll_call_students_data(
cleaned_students, history_data, subject_name
)
students_weight_data = calculate_weight(students_data, class_name, subject_name)
students_weight_data = calculate_weight(
students_data,
class_name,
subject_name,
history_data=history_data,
)
format_weight, _, _ = format_weight_for_display(
students_weight_data, "next_weight"
)
Expand Down
17 changes: 11 additions & 6 deletions app/common/history/history_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from loguru import logger

from app.tools.path_utils import get_data_path, open_file, file_exists
from app.common.data.list import get_gender_list, get_group_list


# ==================================================
Expand Down Expand Up @@ -46,6 +45,15 @@ def get_roll_call_student_list(
return []


def check_roll_call_students_have_gender_or_group(
cleaned_students: List[Tuple[str, str, str, str]],
) -> Tuple[bool, bool]:
"""根据已加载的学生快照判断是否包含性别和小组信息"""
has_gender = any(str(gender or "").strip() for _, _, gender, _ in cleaned_students)
has_group = any(str(group or "").strip() for _, _, _, group in cleaned_students)
return has_gender, has_group


def get_roll_call_history_data(
class_name: str,
) -> Dict[str, Any]:
Expand Down Expand Up @@ -325,11 +333,8 @@ def check_class_has_gender_or_group(
Returns:
Tuple[bool, bool]: (has_gender, has_group)
"""
gender_list = get_gender_list(class_name)
group_list = get_group_list(class_name)
has_gender = bool(gender_list) and gender_list != [""]
has_group = bool(group_list) and group_list != [""]
return has_gender, has_group
cleaned_students = get_roll_call_student_list(class_name)
return check_roll_call_students_have_gender_or_group(cleaned_students)


# ==================================================
Expand Down
Loading
Loading