Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0b47e2d
added state of health (not working, currently converting old state of…
JamesDCowley Apr 4, 2025
ac8c555
remove unused function parameter and update microcontroller temperatu…
JamesDCowley Apr 12, 2025
3a15ef7
remove deprecated state of health methods from functions class
JamesDCowley Apr 12, 2025
d5811af
Add null checks for battery and solar power monitors in state of heal…
JamesDCowley Apr 16, 2025
874923a
Add unit tests for StateOfHealth class methods
JamesDCowley Apr 17, 2025
581230c
all tests passing
JamesDCowley Apr 17, 2025
a5973f3
remove unused Satellite parameter and replace it with Flag parameters…
JamesDCowley Apr 18, 2025
fc99a88
boot_count uses Counter and update state retrieval methods
JamesDCowley Apr 18, 2025
020033d
Remove uptime from soh
JamesDCowley Apr 18, 2025
89af0bc
Add fsk_flag
JamesDCowley Apr 18, 2025
91c75e5
all tests pass + fmt
JamesDCowley Apr 18, 2025
2e101fc
boot_count flag -> counter
JamesDCowley Apr 19, 2025
00e7ccd
Merge branch 'main' of github.com:proveskit/circuitpy_flight_software…
nateinaction Apr 20, 2025
6f912d3
New new state of health (#261)
nateinaction Jun 10, 2025
44aeddb
Merge branch 'main' into new-state-of-health
nateinaction Jun 10, 2025
249cd34
Cursor Corrected Unit Tests
Mikefly123 Jun 19, 2025
37e6431
Appeased the Linter
Mikefly123 Jun 20, 2025
d7145f7
Reduced Function Complexity
Mikefly123 Jun 20, 2025
d2861c5
Updated to Use Config Schema for Bound Checking
Mikefly123 Jun 20, 2025
94ed561
Update tests/unit/test_state_of_health.py
Mikefly123 Jun 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 0 additions & 46 deletions pysquared/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import random

import microcontroller

from .cdh import CommandDataHandler
from .config.config import Config
from .logger import Logger
Expand All @@ -21,11 +19,6 @@
from .sleep_helper import SleepHelper
from .watchdog import Watchdog

try:
from typing import OrderedDict
except Exception:
pass


class functions:
def __init__(
Expand Down Expand Up @@ -108,45 +101,6 @@ def beacon(self) -> None:
def joke(self) -> None:
self.radio.send(random.choice(self.jokes))

def format_state_of_health(self, hardware: OrderedDict[str, bool]) -> str:
to_return: str = ""
for key, value in hardware.items():
to_return = to_return + key + "="
if value:
to_return += "1"
else:
to_return += "0"

if len(to_return) > 245:
return to_return

return to_return

def state_of_health(self) -> None:
self.state_list: list = []
# list of state information
try:
self.state_list: list[str] = [
f"PM:{self.cubesat.power_mode}",
# f"VB:{self.cubesat.battery_voltage}",
# f"ID:{self.cubesat.current_draw}",
f"IC:{self.cubesat.charge_current}",
f"UT:{self.cubesat.get_system_uptime}",
f"BN:{self.cubesat.boot_count.get()}",
f"MT:{microcontroller.cpu.temperature}",
f"RT:{self.radio.get_temperature()}",
f"AT:{self.imu.get_temperature()}",
# f"BT:{self.last_battery_temp}",
f"EC:{self.logger.get_error_count()}",
f"AB:{int(self.cubesat.f_burned.get())}",
f"BO:{int(self.cubesat.f_brownout.get())}",
f"FK:{self.radio.get_modulation()}",
]
except Exception as e:
self.logger.error("Couldn't aquire data for the state of health: ", e)

self.radio.send("State of Health " + str(self.state_list))

def listen(self) -> bool:
# This just passes the message through. Maybe add more functionality later.
try:
Expand Down
204 changes: 204 additions & 0 deletions pysquared/state_of_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
from typing import Any

from pysquared.config.config import Config
from pysquared.logger import Logger
from pysquared.protos.power_monitor import PowerMonitorProto
from pysquared.protos.temperature_sensor import TemperatureSensorProto

try:
from typing import Callable, List

except Exception:
pass


class State:
pass


class NOMINAL(State):
pass


class DEGRADED(State):
pass


class StateOfHealth:
def __init__(
self,
logger: Logger,
config: Config,
*args: PowerMonitorProto | TemperatureSensorProto | Any,
) -> None:
self.logger: Logger = logger
self.config: Config = config
self._sensors: tuple[PowerMonitorProto | TemperatureSensorProto | Any, ...] = (
args
)

def get(self) -> NOMINAL | DEGRADED:
"""
Get the state of health by checking all sensors
"""
errors: List[str] = []

for sensor in self._sensors:
self.logger.debug("Sensor: ", sensor=sensor)
sensor_errors = self._check_sensor(sensor)
errors.extend(sensor_errors)

return self._determine_state(errors)

def _check_sensor(self, sensor) -> List[str]:
"""
Check a single sensor and return any errors found
"""
if isinstance(sensor, PowerMonitorProto):
return self._check_power_monitor(sensor)
elif isinstance(sensor, TemperatureSensorProto):
return self._check_temperature_sensor(sensor)
elif hasattr(sensor, "temperature"): # CPU-like object
return self._check_cpu_sensor(sensor)
else:
return []

def _check_power_monitor(self, sensor: PowerMonitorProto) -> List[str]:
"""
Check power monitor sensor readings against CONFIG_SCHEMA bounds
"""
errors = []

# Get average readings
bus_voltage = self._avg_reading(sensor.get_bus_voltage)
shunt_voltage = self._avg_reading(sensor.get_shunt_voltage)
current = self._avg_reading(sensor.get_current)

# Check current reading against normal_charge_current bounds (0.0-2000.0)
if current is not None:
current_errors = self._check_against_schema_bounds(
current, "normal_charge_current", "Current reading"
)
errors.extend(current_errors)

# Check bus voltage reading against normal_battery_voltage bounds (6.0-8.4)
if bus_voltage is not None:
bus_voltage_errors = self._check_against_schema_bounds(
bus_voltage, "normal_battery_voltage", "Bus voltage reading"
)
errors.extend(bus_voltage_errors)

# Check shunt voltage reading against normal_battery_voltage bounds (6.0-8.4)
if shunt_voltage is not None:
shunt_voltage_errors = self._check_against_schema_bounds(
shunt_voltage, "normal_battery_voltage", "Shunt voltage reading"
)
errors.extend(shunt_voltage_errors)

return errors

def _check_temperature_sensor(self, sensor: TemperatureSensorProto) -> List[str]:
"""
Check temperature sensor readings against CONFIG_SCHEMA bounds
"""
temperature = self._avg_reading(sensor.get_temperature)
self.logger.debug("Temp: ", temperature=temperature)

if temperature is not None:
return self._check_against_schema_bounds(
temperature, "normal_temp", "Temperature reading"
)

return []

def _check_cpu_sensor(self, sensor) -> List[str]:
"""
Check CPU-like sensor readings against CONFIG_SCHEMA bounds
"""
temperature = sensor.temperature
self.logger.debug("Temp: ", temperature=temperature)

if temperature is not None:
return self._check_against_schema_bounds(
temperature, "normal_micro_temp", "Processor temperature reading"
)

return []

def _check_against_schema_bounds(
self, reading: float, config_key: str, reading_name: str
) -> List[str]:
"""
Check if a reading is within the bounds defined in CONFIG_SCHEMA

:param reading: The sensor reading to check
:param config_key: The config key to check against (e.g., 'normal_temp')
:param reading_name: Human-readable name for the reading type
:return: List of error messages if reading is out of bounds
"""
errors = []
if reading is None:
return errors
if config_key not in self.config.CONFIG_SCHEMA:
self.logger.warning(f"Config key '{config_key}' not found in CONFIG_SCHEMA")
return errors
schema = self.config.CONFIG_SCHEMA[config_key]
config_value = getattr(self.config, config_key, None)
if config_value is None:
self.logger.warning(f"Config value for '{config_key}' is None")
return errors
# Check against min bound
if "min" in schema and reading < schema["min"]:
errors.append(
f"{reading_name} {reading} is below minimum bound {schema['min']} for {config_key}"
)
# Check against max bound
if "max" in schema and reading > schema["max"]:
errors.append(
f"{reading_name} {reading} is above maximum bound {schema['max']} for {config_key}"
)
return errors

def _is_out_of_range(
self, reading: float, normal_value: float, tolerance: float = 10
) -> bool:
"""
Check if a reading is outside the acceptable range
DEPRECATED: Use _check_against_schema_bounds instead

:param reading: The sensor reading to check
:param normal_value: The expected normal value
:param tolerance: The acceptable deviation from normal (default: 10)
:return: True if reading is out of range, False otherwise
"""
return abs(reading - normal_value) > tolerance

def _determine_state(self, errors: List[str]) -> NOMINAL | DEGRADED:
"""
Determine the final state based on collected errors
"""
if errors:
self.logger.warning("State of health is DEGRADED", errors=errors)
return DEGRADED()
else:
self.logger.info("State of health is NOMINAL")
return NOMINAL()

def _avg_reading(
self, func: Callable[..., float | None], num_readings: int = 50
) -> float | None:
"""
Get average reading from a sensor

:param func: function to call
:param num_readings: number of readings to take
:return: average of the readings
"""
readings: float = 0.0
for _ in range(num_readings):
reading = func()
if reading is None:
self.logger.warning(f"Couldn't get reading from {func.__name__}")
return None
readings += reading
return readings / num_readings
Loading
Loading