diff --git a/README.md b/README.md index b956d64..21bf1b3 100644 --- a/README.md +++ b/README.md @@ -14,15 +14,50 @@ # progressive-automations-python -> Python package for programmatically controlling progressive automations tools (e.g., LG-07 modular lifting column via FLTCON). +> Python package for programmatically controlling Progressive Automations desk lifters (e.g., LG-07 modular lifting column via FLTCON) with Prefect workflow orchestration. -A longer description of your project goes here... +Control your desk lifter remotely via Prefect Cloud deployments with asynchronous execution and position polling. Perfect for laboratory automation where the desk position needs to be coordinated with other equipment. -## Setup +## Quick Start -For Raspberry Pi 5 setup instructions, see [docs/raspberry-pi-setup.md](docs/raspberry-pi-setup.md). +```bash +# 1. Install +pip install progressive-automations-python -For bill of materials, see [docs/bill_of_materials.md](docs/bill_of_materials.md). +# 2. Configure Prefect Cloud +prefect cloud login -k +prefect work-pool create desk-lifter-pool --type process + +# 3. Deploy flows +python -c "from progressive_automations_python.deployment import create_deployments; create_deployments()" + +# 4. Start worker on Raspberry Pi +prefect worker start --pool desk-lifter-pool +``` + +## Usage + +Trigger movements asynchronously from any Python environment: + +```python +from prefect.deployments import run_deployment + +# Trigger movement (returns immediately) +flow_run = run_deployment( + name="simple-movement-flow/move-to-position", + parameters={"target_height": 35.5}, + timeout=0 # Don't wait, return immediately +) + +# Poll status later +# ... (see documentation for polling examples) +``` + +## Documentation + +- [Installation and Usage Guide](docs/installation-and-usage.md) - Complete setup and API reference +- [Raspberry Pi Setup](docs/raspberry-pi-setup.md) - Hardware configuration +- [Bill of Materials](docs/bill_of_materials.md) - Required components diff --git a/docs/index.md b/docs/index.md index 5118389..8a52c08 100644 --- a/docs/index.md +++ b/docs/index.md @@ -20,11 +20,13 @@ Python package for programmatically controlling progressive automations tools (e :maxdepth: 2 Overview +Installation and Usage +Raspberry Pi Setup +Bill of Materials Contributions & Help License Authors Changelog -Bill of Materials Module Reference ``` diff --git a/docs/installation-and-usage.md b/docs/installation-and-usage.md new file mode 100644 index 0000000..8508339 --- /dev/null +++ b/docs/installation-and-usage.md @@ -0,0 +1,453 @@ +# Installation and Usage Guide + +This guide covers the complete installation and usage workflow for the Progressive Automations desk lifter control system. + +## Prerequisites + +1. **Hardware Setup**: Follow the [Raspberry Pi Setup Guide](raspberry-pi-setup.md) for hardware configuration +2. **Bill of Materials**: See [Bill of Materials](bill_of_materials.md) for required components +3. **Raspberry Pi**: Raspberry Pi 5 with Debian Trixie and Python 3.11+ + +## Configuration + +After installation, you can customize duty cycle limits and calibration values by editing the configuration file: + +```python +# Location: site-packages/progressive_automations_python/config.py +# Or find it with: python -c "import progressive_automations_python.config as c; print(c.__file__)" + +# Key settings to adjust during initial setup: +# - DUTY_CYCLE_PERCENTAGE: Motor duty cycle (default: 0.10 = 10%) +# - MAX_CONTINUOUS_RUNTIME: Maximum single movement time (default: 30s) +# - LOWEST_HEIGHT / HIGHEST_HEIGHT: Your desk's physical range +# - UP_RATE / DOWN_RATE: Measured movement rates (inches/second) +``` + +The configuration includes validation to prevent invalid values. + +## Installation + +### Step 1: Install the Package + +```bash +pip install progressive-automations-python +``` + +This installs the package with all dependencies, including Prefect for workflow orchestration. + +### Step 2: Configure Prefect Cloud + +You need a Prefect Cloud account for remote workflow orchestration. + +1. Sign up at [https://www.prefect.io/](https://www.prefect.io/) +2. Get your API key from the Prefect Cloud dashboard +3. Login from your Raspberry Pi using Prefect's CLI: + +```bash +prefect cloud login -k +``` + +### Step 3: Create Work Pool + +Create a work pool for the desk lifter using Prefect's CLI: + +```bash +prefect work-pool create desk-lifter-pool --type process +``` + +### Step 4: Deploy Flows to Prefect Cloud + +Deploy all desk control flows using Python: + +```python +from progressive_automations_python.deployment import create_deployments + +create_deployments("desk-lifter-pool") +``` + +This creates the following deployments: +- `simple-movement-flow/move-to-position` - Move to a specific height +- `custom-movements-flow/custom-movements` - Execute multiple configured movements +- `test-sequence-flow/test-sequence` - Test sequence (up, wait, down) +- `duty-cycle-monitoring-flow/duty-cycle-monitor` - On-demand duty cycle check + +### Step 5: Start a Prefect Worker + +On your Raspberry Pi, start a worker to execute flows using Prefect's CLI: + +```bash +prefect worker start --pool desk-lifter-pool +``` + +Keep this running in a terminal or set up as a systemd service for automatic startup. + +## Testing During Initial Setup + +> **Note**: The `progressive_automations_python` CLI is provided for initial hardware testing and troubleshooting only. For production use, trigger flows via `run_deployment()` or Prefect's CLI. + +### Test Hardware Connections + +Test UP or DOWN movement for 2 seconds to verify GPIO connections: + +```bash +progressive_automations_python --test UP +progressive_automations_python --test DOWN +``` + +### Check Duty Cycle Status + +View current duty cycle usage during debugging: + +```bash +progressive_automations_python --status +``` + +## Production Usage: Async Deployment and Position Polling + +**This is the primary way to use the desk lifter system.** Trigger movements asynchronously from external systems and poll their status later. + +### Triggering Movements Asynchronously + +From any Python environment with network access to Prefect Cloud: + +```python +from prefect.deployments import run_deployment + +# Trigger a movement (returns immediately with timeout=0) +flow_run = run_deployment( + name="simple-movement-flow/move-to-position", + parameters={"target_height": 35.5, "current_height": 24.0}, + timeout=0 # Return immediately without waiting +) + +print(f"Movement started with flow run ID: {flow_run.id}") +# Continue with other work while the desk moves... +``` + +### Polling Position Status + +Check if the movement has completed: + +```python +from prefect import get_client +import asyncio + +async def check_movement_status(flow_run_id): + """Check if the movement has completed""" + async with get_client() as client: + flow_run = await client.read_flow_run(flow_run_id) + + print(f"Status: {flow_run.state.type}") + + if flow_run.state.type == "COMPLETED": + # Movement completed successfully + result = await flow_run.state.result() + print(f"✅ Movement completed!") + print(f" Final position: {result['movement_result']['end_height']}\"") + print(f" At target: {abs(result['movement_result']['end_height'] - result['movement_result']['start_height']) < 0.1}") + print(f" Duty cycle remaining: {result['final_duty_status']['remaining_capacity']:.1f}s") + return result + elif flow_run.state.type == "FAILED": + print(f"❌ Movement failed: {flow_run.state.message}") + return None + else: + print(f"⏳ Still running... (state: {flow_run.state.type})") + return None + +# Check status +result = asyncio.run(check_movement_status(flow_run.id)) +``` + +### Complete Polling Example + +Wait for a movement to complete with periodic polling: + +```python +import asyncio +import time + +async def wait_for_movement_completion(flow_run_id, check_interval=5, max_wait=300): + """ + Poll until movement completes or timeout. + + This is similar to preheating an oven - you set the temperature and check later, + not sit in front of it the whole time. + + Args: + flow_run_id: The flow run ID from run_deployment + check_interval: Seconds between status checks (default: 5) + max_wait: Maximum time to wait in seconds (default: 300) + + Returns: + dict with completion status and result + """ + from prefect import get_client + + start_time = time.time() + + async with get_client() as client: + while time.time() - start_time < max_wait: + flow_run = await client.read_flow_run(flow_run_id) + + if flow_run.state.is_final(): + if flow_run.state.type == "COMPLETED": + result = await flow_run.state.result() + return { + "completed": True, + "success": True, + "result": result + } + else: + return { + "completed": True, + "success": False, + "error": flow_run.state.message + } + + print(f"⏳ Still moving... ({time.time() - start_time:.1f}s elapsed)") + await asyncio.sleep(check_interval) + + return { + "completed": False, + "success": False, + "error": "Timeout waiting for movement" + } + +# Use it +result = asyncio.run(wait_for_movement_completion(flow_run.id)) + +if result["completed"] and result["success"]: + print(f"✅ Desk reached target position!") + print(f"Details: {result['result']}") +else: + print(f"❌ Movement did not complete: {result.get('error', 'Unknown error')}") +``` + +### Checking Duty Cycle Before Triggering + +Check if there's enough duty cycle capacity before triggering a movement: + +```python +from prefect.deployments import run_deployment + +# Check current duty cycle status +status_run = run_deployment( + name="duty-cycle-monitoring-flow/duty-cycle-monitor", + timeout=30 # Wait for result +) + +remaining = status_run["status"]["remaining_capacity"] + +if remaining > 10: # Need at least 10 seconds + # Safe to trigger movement + flow_run = run_deployment( + name="simple-movement-flow/move-to-position", + parameters={"target_height": 35.5}, + timeout=0 + ) + print(f"Movement triggered: {flow_run.id}") +else: + print(f"⚠️ Insufficient duty cycle capacity ({remaining:.1f}s remaining)") + print("Wait for duty cycle window to reset") +``` + +### Using Prefect CLI for Manual Triggers + +You can also trigger flows directly using Prefect's CLI: + +```bash +# Trigger a movement to 30 inches +prefect deployment run 'simple-movement-flow/move-to-position' --param target_height=30.0 + +# Run a test sequence +prefect deployment run 'test-sequence-flow/test-sequence' --param movement_distance=0.5 --param rest_time=10.0 + +# Check duty cycle +prefect deployment run 'duty-cycle-monitoring-flow/duty-cycle-monitor' +``` + +## Integration with Other Equipment + +When integrating with other equipment that depends on the desk position: + +```python +async def orchestrate_equipment_workflow(): + """ + Example: Move desk, wait for completion, then trigger dependent equipment + """ + from prefect.deployments import run_deployment + from prefect import get_client + import asyncio + + # Step 1: Trigger desk movement + print("Step 1: Moving desk to position...") + desk_run = run_deployment( + name="simple-movement-flow/move-to-position", + parameters={"target_height": 30.0}, + timeout=0 + ) + + # Step 2: Poll until desk reaches position + print("Step 2: Waiting for desk to reach position...") + result = await wait_for_movement_completion(desk_run.id) + + if not (result["completed"] and result["success"]): + raise RuntimeError("Desk movement failed") + + print(f"✅ Desk at position: {result['result']['movement_result']['end_height']}\"") + + # Step 3: Now safe to trigger dependent equipment + print("Step 3: Triggering dependent equipment...") + # ... trigger your other equipment here ... + + return {"desk_movement": result, "equipment_triggered": True} + +# Run the orchestration +result = asyncio.run(orchestrate_equipment_workflow()) +``` + +## Duty Cycle Management + +The system enforces a 10% duty cycle (2 minutes on, 18 minutes off) to protect the motor: + +- **Maximum continuous runtime**: 30 seconds +- **Maximum usage in 20-minute window**: 120 seconds (2 minutes) +- **Automatic tracking**: All movements are tracked automatically +- **Safety enforcement**: Movements exceeding limits are rejected + +View current usage: + +```bash +progressive_automations_python --status +``` + +Output example: +``` +=== DUTY CYCLE STATUS === +Current usage: 15.2s / 120.0s (12.7%) +Remaining capacity: 104.8s +Percentage used: 12.7% +Window period: 1200s (20 minutes) +Current position: 24.0" +Last movement: 2.1s ago + +✅ GOOD CAPACITY - Normal operations possible +``` + +## Generating Movement Configurations + +Generate optimized movement sequences based on current duty cycle: + +```bash +progressive_automations_python --generate-movements +``` + +This creates `movement_configs.json` with movements that: +1. Respect the 30-second continuous runtime limit +2. Use available capacity efficiently +3. Demonstrate successful movements within limits +4. Show duty cycle protection when limits would be exceeded + +## Troubleshooting + +### Movement Rejected: Insufficient Duty Cycle + +**Error**: `Movement would exceed 10% duty cycle limit` + +**Solution**: Wait for the duty cycle window to reset. Check status with: +```bash +progressive_automations_python --status +``` + +### GPIO Permission Denied + +**Error**: `Permission denied` when accessing GPIO + +**Solution**: Ensure your user is in the `gpio` group: +```bash +sudo usermod -a -G gpio $USER +# Then reboot +``` + +### Prefect Worker Not Running + +**Error**: Flow triggered but never executes + +**Solution**: Ensure a Prefect worker is running: +```bash +prefect worker start --pool default-process-pool +``` + +Consider setting up a systemd service to keep the worker running. + +### Position Unknown + +**Error**: `No current height provided and no last known position` + +**Solution**: Provide the current height explicitly: +```bash +progressive_automations_python --move 30.0 --current 24.0 +``` + +The system will remember the position for future movements. + +## Command Reference + +### Prefect CLI (Primary Interface) + +```bash +# Login to Prefect Cloud +prefect cloud login -k + +# Create work pool +prefect work-pool create desk-lifter-pool --type process + +# Start worker (keep running) +prefect worker start --pool desk-lifter-pool + +# Trigger deployments manually +prefect deployment run 'simple-movement-flow/move-to-position' --param target_height=30.0 +prefect deployment run 'test-sequence-flow/test-sequence' +prefect deployment run 'duty-cycle-monitoring-flow/duty-cycle-monitor' +``` + +### Package CLI (Testing/Debugging Only) + +```bash +# Hardware testing (initial setup) +progressive_automations_python --test UP|DOWN + +# Status check (debugging) +progressive_automations_python --status +``` + +## Python API Examples + +For viewing complete Python examples for async deployment and polling: + +```bash +progressive_automations_python --examples +``` + +This displays comprehensive code examples for: +- Async movement triggering +- Status polling +- Polling loops with timeout +- Duty cycle checking +- Equipment workflow orchestration + +## Next Steps + +1. ✅ Complete hardware setup per [Raspberry Pi Setup](raspberry-pi-setup.md) +2. ✅ Install package: `pip install progressive-automations-python` +3. ✅ Configure Prefect Cloud: `prefect cloud login -k ` +4. ✅ Create work pool: `prefect work-pool create desk-lifter-pool --type process` +5. ✅ Deploy flows via Python: `create_deployments("desk-lifter-pool")` +6. ✅ Start worker: `prefect worker start --pool desk-lifter-pool` +7. ✅ Test hardware (optional): `progressive_automations_python --test UP` +8. ✅ **Trigger flows via `run_deployment()` from your automation code!** + +For more information, see: +- [Raspberry Pi Setup](raspberry-pi-setup.md) +- [Bill of Materials](bill_of_materials.md) +- [Prefect Documentation](https://docs.prefect.io/) diff --git a/docs/quick-start.md b/docs/quick-start.md new file mode 100644 index 0000000..1b3b64b --- /dev/null +++ b/docs/quick-start.md @@ -0,0 +1,134 @@ +# Quick Start Guide + +This guide shows the complete end-to-end workflow for using the desk lifter control system. + +## Complete Setup (One Time) + +```bash +# 1. Install the package +pip install progressive-automations-python + +# 2. Login to Prefect Cloud +prefect cloud login -k + +# 3. Create the work pool +prefect work-pool create desk-lifter-pool --type process + +# 4. Deploy the flows +python -c "from progressive_automations_python.deployment import create_deployments; create_deployments()" + +# 5. Start the worker (keep this running) +prefect worker start --pool desk-lifter-pool +``` + +## Testing Hardware (Optional) + +```bash +# Test GPIO connections +progressive_automations_python --test UP +progressive_automations_python --test DOWN + +# Check duty cycle status +progressive_automations_python --status +``` + +## Production Usage + +### From External Python Code + +```python +from prefect.deployments import run_deployment +from prefect import get_client +import asyncio + +# Trigger movement asynchronously (returns immediately) +flow_run = run_deployment( + name="simple-movement-flow/move-to-position", + parameters={"target_height": 35.5}, + timeout=0 # Don't wait, return immediately +) + +print(f"Movement triggered: {flow_run.id}") + +# Poll status later +async def check_status(flow_run_id): + async with get_client() as client: + flow_run = await client.read_flow_run(flow_run_id) + + if flow_run.state.type == "COMPLETED": + result = await flow_run.state.result() + print(f"✅ At position: {result['movement_result']['end_height']}\"") + return True + elif flow_run.state.type == "FAILED": + print(f"❌ Failed: {flow_run.state.message}") + return False + else: + print(f"⏳ Still moving...") + return None + +# Check if complete +complete = asyncio.run(check_status(flow_run.id)) +``` + +### From Prefect CLI + +```bash +# Trigger a movement +prefect deployment run 'simple-movement-flow/move-to-position' --param target_height=30.0 + +# Run test sequence +prefect deployment run 'test-sequence-flow/test-sequence' + +# Check duty cycle +prefect deployment run 'duty-cycle-monitoring-flow/duty-cycle-monitor' +``` + +## Equipment Orchestration Pattern + +```python +async def run_experiment(): + """Example: coordinate desk with other equipment""" + from prefect.deployments import run_deployment + from prefect import get_client + import asyncio + + # Step 1: Move desk to position + desk_run = run_deployment( + name="simple-movement-flow/move-to-position", + parameters={"target_height": 30.0}, + timeout=0 + ) + + # Step 2: Wait for desk to reach position + async with get_client() as client: + while True: + flow_run = await client.read_flow_run(desk_run.id) + if flow_run.state.is_final(): + if flow_run.state.type == "COMPLETED": + print("✅ Desk in position") + break + else: + raise RuntimeError("Desk movement failed") + await asyncio.sleep(2) + + # Step 3: Now trigger dependent equipment + print("Triggering other equipment...") + # ... your other equipment code here ... + +# Run it +asyncio.run(run_experiment()) +``` + +## Key Concepts + +1. **Async Execution**: Use `timeout=0` in `run_deployment()` to return immediately +2. **Position Polling**: Use Prefect's `get_client()` to check `flow_run.state` +3. **Duty Cycle**: System enforces 10% duty cycle (2min on / 18min off) automatically +4. **Work Pool**: `desk-lifter-pool` must be running on the Raspberry Pi +5. **Testing CLI**: Only use `progressive_automations_python` for initial hardware testing + +## Next Steps + +- Read [Installation and Usage Guide](installation-and-usage.md) for complete details +- View [Raspberry Pi Setup](raspberry-pi-setup.md) for hardware configuration +- Check [Bill of Materials](bill_of_materials.md) for required components diff --git a/setup.cfg b/setup.cfg index 4b2188a..1dd1729 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,6 +49,8 @@ package_dir = # For more information, check out https://semver.org/. install_requires = importlib-metadata; python_version<"3.8" + prefect>=3.0.0 + rpi-lgpio; platform_machine=="aarch64" or platform_machine=="armv7l" [options.packages.find] @@ -77,6 +79,8 @@ testing = # And any other entry points, for example: # pyscaffold.cli = # awesome = pyscaffoldext.awesome.extension:AwesomeExtension +console_scripts = + progressive_automations_python = progressive_automations_python.cli:main [tool:pytest] # Specify command line options as you would do when invoking pytest directly. diff --git a/src/progressive_automations_python/__init__.py b/src/progressive_automations_python/__init__.py index 35cfe37..5a31c7c 100644 --- a/src/progressive_automations_python/__init__.py +++ b/src/progressive_automations_python/__init__.py @@ -14,3 +14,56 @@ __version__ = "unknown" finally: del version, PackageNotFoundError + +# Export main API +from progressive_automations_python.config import ( + LOWEST_HEIGHT, + HIGHEST_HEIGHT, + UP_RATE, + DOWN_RATE, + UP_PIN, + DOWN_PIN +) + +from progressive_automations_python.desk_controller import ( + move_to_height, + check_duty_cycle_status_before_execution +) + +from progressive_automations_python.duty_cycle import ( + load_state, + save_state, + get_duty_cycle_status, + show_duty_cycle_status +) + +from progressive_automations_python.prefect_flows import ( + simple_movement_flow, + custom_movements_flow, + test_sequence_flow, + duty_cycle_monitoring_flow, + scheduled_duty_cycle_check +) + +from progressive_automations_python.deployment import ( + create_deployments +) + +__all__ = [ + "__version__", + "LOWEST_HEIGHT", + "HIGHEST_HEIGHT", + "UP_RATE", + "DOWN_RATE", + "UP_PIN", + "DOWN_PIN", + "move_to_height", + "check_duty_cycle_status_before_execution", + "load_state", + "save_state", + "get_duty_cycle_status", + "show_duty_cycle_status", + "simple_movement_flow", + "simple_movement_flow", + "create_deployments" +] diff --git a/src/progressive_automations_python/cli.py b/src/progressive_automations_python/cli.py new file mode 100644 index 0000000..5ba57c6 --- /dev/null +++ b/src/progressive_automations_python/cli.py @@ -0,0 +1,106 @@ +""" +Command-line interface for progressive automations desk lifter control. +""" + +import argparse +import sys + + +def test_movement(direction: str): + """Test UP or DOWN movement for 2 seconds""" + try: + # Import GPIO control functions + from progressive_automations_python.movement_control import ( + setup_gpio, cleanup_gpio, press_up, release_up, press_down, release_down + ) + import time + + setup_gpio() + + print(f"Testing {direction} movement for 2 seconds...") + + if direction.upper() == "UP": + press_up() + time.sleep(2.0) + release_up() + elif direction.upper() == "DOWN": + press_down() + time.sleep(2.0) + release_down() + else: + print(f"Invalid direction: {direction}. Use UP or DOWN.") + return 1 + + cleanup_gpio() + print(f"{direction} test complete!") + return 0 + + except ImportError as e: + print(f"Error: GPIO library not available. This command must be run on a Raspberry Pi.") + print(f"Details: {e}") + return 1 + except Exception as e: + print(f"Error during test: {e}") + return 1 + + +def show_status(): + """Show current duty cycle status""" + try: + from progressive_automations_python.duty_cycle import show_duty_cycle_status + show_duty_cycle_status() + return 0 + except Exception as e: + print(f"Error showing status: {e}") + return 1 + + + + + + + + + + + +def main(): + """Main CLI entry point""" + parser = argparse.ArgumentParser( + description="Progressive Automations Desk Lifter Control", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples (for testing/debugging only): + progressive_automations_python --test UP + progressive_automations_python --test DOWN + progressive_automations_python --status + """ + ) + + parser.add_argument( + "--test", + type=str, + choices=["UP", "DOWN", "up", "down"], + help="Test UP or DOWN movement for 2 seconds" + ) + + parser.add_argument( + "--status", + action="store_true", + help="Show current duty cycle status" + ) + + args = parser.parse_args() + + # Handle commands + if args.test: + return test_movement(args.test) + elif args.status: + return show_status() + else: + parser.print_help() + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/progressive_automations_python/config.py b/src/progressive_automations_python/config.py new file mode 100644 index 0000000..96bd4ac --- /dev/null +++ b/src/progressive_automations_python/config.py @@ -0,0 +1,118 @@ +""" +User-editable configuration for desk lifter control. + +Edit these values during initial setup to match your hardware and requirements. +""" + +# ============================================================================= +# GPIO PIN CONFIGURATION +# ============================================================================= +# BCM pin numbering +UP_PIN = 18 # BCM numbering, physical pin 12 +DOWN_PIN = 17 # BCM numbering, physical pin 11 + + +# ============================================================================= +# CALIBRATION DATA +# ============================================================================= +# Measure these values for your specific desk lifter setup +LOWEST_HEIGHT = 23.7 # inches - minimum height position +HIGHEST_HEIGHT = 54.5 # inches - maximum height position +UP_RATE = 0.54 # inches per second - measured movement rate going up +DOWN_RATE = 0.55 # inches per second - measured movement rate going down + + +# ============================================================================= +# DUTY CYCLE LIMITS (MOTOR PROTECTION) +# ============================================================================= +# These values protect the motor from overheating +# Adjust only if your motor specs differ from standard PA desk lifters + +# Duty cycle percentage (e.g., 0.10 = 10% = 2 min on, 18 min off) +DUTY_CYCLE_PERCENTAGE = 0.10 + +# Sliding window period in seconds (default: 1200s = 20 minutes) +DUTY_CYCLE_PERIOD = 1200 + +# Maximum total on-time within the window (default: 120s = 2 minutes) +# Calculated as: DUTY_CYCLE_PERIOD * DUTY_CYCLE_PERCENTAGE +DUTY_CYCLE_MAX_ON_TIME = 120 + +# Maximum continuous runtime in seconds (default: 45s) +# This prevents single long movements from damaging the motor +MAX_CONTINUOUS_RUNTIME = 45 + + +# ============================================================================= +# STATE MANAGEMENT +# ============================================================================= +# File to store persistent state (position, duty cycle usage, etc.) +STATE_FILE = "lifter_state.json" + + +# ============================================================================= +# VALIDATION +# ============================================================================= +def validate_config(): + """Validate configuration values""" + errors = [] + + # Validate height range + if LOWEST_HEIGHT >= HIGHEST_HEIGHT: + errors.append(f"LOWEST_HEIGHT ({LOWEST_HEIGHT}) must be less than HIGHEST_HEIGHT ({HIGHEST_HEIGHT})") + + # Validate movement rates + if UP_RATE <= 0: + errors.append(f"UP_RATE ({UP_RATE}) must be positive") + if DOWN_RATE <= 0: + errors.append(f"DOWN_RATE ({DOWN_RATE}) must be positive") + + # Validate duty cycle + if not (0 < DUTY_CYCLE_PERCENTAGE <= 1.0): + errors.append(f"DUTY_CYCLE_PERCENTAGE ({DUTY_CYCLE_PERCENTAGE}) must be between 0 and 1") + + if DUTY_CYCLE_MAX_ON_TIME > DUTY_CYCLE_PERIOD: + errors.append(f"DUTY_CYCLE_MAX_ON_TIME ({DUTY_CYCLE_MAX_ON_TIME}) cannot exceed DUTY_CYCLE_PERIOD ({DUTY_CYCLE_PERIOD})") + + expected_max_on = DUTY_CYCLE_PERIOD * DUTY_CYCLE_PERCENTAGE + if abs(DUTY_CYCLE_MAX_ON_TIME - expected_max_on) > 1: + errors.append( + f"DUTY_CYCLE_MAX_ON_TIME ({DUTY_CYCLE_MAX_ON_TIME}) should equal " + f"DUTY_CYCLE_PERIOD * DUTY_CYCLE_PERCENTAGE ({expected_max_on:.1f})" + ) + + if MAX_CONTINUOUS_RUNTIME > DUTY_CYCLE_MAX_ON_TIME: + errors.append( + f"MAX_CONTINUOUS_RUNTIME ({MAX_CONTINUOUS_RUNTIME}) cannot exceed " + f"DUTY_CYCLE_MAX_ON_TIME ({DUTY_CYCLE_MAX_ON_TIME})" + ) + + if errors: + raise ValueError("Configuration validation failed:\n" + "\n".join(f" - {e}" for e in errors)) + + return True + + +# Validate on import +validate_config() + + +# ============================================================================= +# SMOKE TEST PARAMETERS +# ============================================================================= +# Quick test parameters for initial validation and debugging +# These are NOT used in production - only for manual testing + +SMOKE_TEST = { + # Small movement for quick testing (inches) + "movement_distance": 0.5, + + # Short wait time for test sequences (seconds) + "rest_time": 5.0, + + # Test target heights within safe range + "test_heights": [24.0, 25.0, 24.5], + + # Quick duty cycle test (uses minimal capacity) + "quick_cycle_test": True +} diff --git a/src/progressive_automations_python/deployment.py b/src/progressive_automations_python/deployment.py new file mode 100644 index 0000000..febefb9 --- /dev/null +++ b/src/progressive_automations_python/deployment.py @@ -0,0 +1,61 @@ +""" +Prefect deployment configuration for desk lifter control. + +This module provides utilities for creating and managing Prefect deployments +that can be triggered externally and run asynchronously. +""" + +from pathlib import Path + + +def create_deployments(work_pool_name: str = "desk-lifter-pool"): + """ + Create Prefect deployment for desk control. + + This should be run once during setup to register the flow with Prefect Cloud. + + Args: + work_pool_name: Name of the work pool to use (default: "desk-lifter-pool") + + Usage: + from progressive_automations_python.deployment import create_deployments + create_deployments("my-work-pool") + """ + from progressive_automations_python.prefect_flows import simple_movement_flow + + # Get the source directory (where the package is installed) + source_dir = Path(__file__).parent + + print(f"Creating deployment with work pool: {work_pool_name}") + print("=== DEPLOYING DESK CONTROL FLOW ===") + + # Deploy simple movement flow + simple_movement_flow.from_source( + source=str(source_dir.parent.parent), + entrypoint="progressive_automations_python/prefect_flows.py:simple_movement_flow", + ).deploy( + name="move-to-position", + work_pool_name=work_pool_name, + description="Move desk to a specific height position with duty cycle management" + ) + print(f"✓ Deployed 'simple-movement-flow/move-to-position'") + + print(f"\n🎉 Deployment created successfully!") + print(f"\nNext steps:") + print(f"1. Start a worker: prefect worker start --pool {work_pool_name}") + print(f"2. Trigger a flow from Python:") + print(f" from prefect.deployments import run_deployment") + print(f" run_deployment('simple-movement-flow/move-to-position', parameters={{'target_height': 30.0}}, timeout=0)") + print(f"3. Or from CLI:") + print(f" prefect deployment run 'simple-movement-flow/move-to-position' --param target_height=30.0") + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + work_pool = sys.argv[1] + else: + work_pool = "desk-lifter-pool" + + create_deployments(work_pool) diff --git a/src/progressive_automations_python/desk_controller.py b/src/progressive_automations_python/desk_controller.py new file mode 100644 index 0000000..002bb18 --- /dev/null +++ b/src/progressive_automations_python/desk_controller.py @@ -0,0 +1,168 @@ +""" +High-level desk controller with height management and safety checks. + +Combines movement control and duty cycle management to provide safe desk operations. +Handles height calculations, movement planning, and state management. +""" + +from typing import Optional +from progressive_automations_python.config import ( + DUTY_CYCLE_MAX_ON_TIME, + DUTY_CYCLE_PERIOD, + LOWEST_HEIGHT, + HIGHEST_HEIGHT, + UP_RATE, + DOWN_RATE +) +from progressive_automations_python.duty_cycle import ( + check_movement_against_duty_cycle, + record_usage_period, + get_duty_cycle_status, + get_current_duty_cycle_usage, + show_duty_cycle_status, + load_state, + save_state +) +from progressive_automations_python.movement_control import setup_gpio, cleanup_gpio, move_up, move_down + + +def check_duty_cycle_status_before_execution() -> dict: + """ + Check current duty cycle status before executing movements. + Returns comprehensive status information for decision making. + + Returns: + dict: { + "current_usage": float, # Seconds used in current window + "remaining_capacity": float, # Seconds remaining + "percentage_used": float, # Percentage of duty cycle used + "max_single_movement": float, # Max movement time within height limits + "movements_possible": int, # Est. number of max movements possible + "current_position": float, # Current desk position + "window_period": int, # Duty cycle window (1200s) + "recommendations": list # Usage recommendations + } + """ + print("=== PRE-EXECUTION DUTY CYCLE STATUS CHECK ===") + + state = load_state() + current_usage = get_current_duty_cycle_usage(state) + remaining_capacity = DUTY_CYCLE_MAX_ON_TIME - current_usage + percentage_used = (current_usage / DUTY_CYCLE_MAX_ON_TIME) * 100 + current_position = state.get("last_position", 24.0) + + # Calculate max possible movement within height range + height_range_max = HIGHEST_HEIGHT - LOWEST_HEIGHT + # Average rate for max movement estimation + avg_rate = (UP_RATE + DOWN_RATE) / 2 + max_single_movement = height_range_max / avg_rate + + # Estimate how many max movements are possible + movements_possible = int(remaining_capacity / max_single_movement) if remaining_capacity > 0 else 0 + + # Display status + print(f"Current usage: {current_usage:.1f}s / {DUTY_CYCLE_MAX_ON_TIME}s ({percentage_used:.1f}%)") + print(f"Remaining capacity: {remaining_capacity:.1f}s") + print(f"Current position: {current_position}\"") + print(f"Max single movement: {max_single_movement:.1f}s (within height range)") + print(f"Estimated large movements possible: {movements_possible}") + print() + + return { + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "percentage_used": percentage_used, + "max_single_movement": max_single_movement, + "movements_possible": movements_possible, + "current_position": current_position, + "window_period": DUTY_CYCLE_PERIOD + } + + +def move_to_height(target_height: float) -> dict: + """ + Move desk to target height with safety checks and duty cycle enforcement + + Args: + target_height: Desired height in inches + + Returns: + dict with movement results and status information + """ + # Validate height range + if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): + raise ValueError(f"Target height {target_height}'' is out of range [{LOWEST_HEIGHT}-{HIGHEST_HEIGHT}].") + + # Setup GPIO + setup_gpio() + + # Load current state + state = load_state() + + # Get current height from state + if state["last_position"] is None: + cleanup_gpio() + raise ValueError("No last known position in state file. Initialize position first.") + + current_height = state["last_position"] + + # Calculate movement requirements + delta = target_height - current_height + if abs(delta) < 0.01: + cleanup_gpio() + print(f"Already at {target_height}'' (within tolerance). No movement needed.") + return { + "success": True, + "movement": "none", + "message": f"Already at target height {target_height}''", + "duty_cycle": get_duty_cycle_status(state) + } + + # Determine direction and calculate time + direction = "up" if delta > 0 else "down" + rate = UP_RATE if delta > 0 else DOWN_RATE + required_time = abs(delta) / rate + + # Check duty cycle limits + check_result = check_movement_against_duty_cycle(target_height, current_height, UP_RATE, DOWN_RATE) + + if not check_result["allowed"]: + cleanup_gpio() + raise ValueError(check_result["error"]) + + print(f"Duty cycle OK: {check_result['current_usage']:.1f}s + {check_result['estimated_duration']:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s") + + # Execute movement + move_func = move_up if delta > 0 else move_down + start_time, end_time, actual_duration = move_func(required_time) + + # Record the usage period and update state + state = record_usage_period(state, start_time, end_time, actual_duration) + if delta > 0: + state["total_up_time"] += actual_duration + + # Update position and save state + state["last_position"] = target_height + save_state(state) + + # Get final duty cycle info + duty_status = get_duty_cycle_status(state) + + print(f"Arrived at {target_height}'' (approximate). State saved.") + print(f"Duty cycle usage: {duty_status['current_usage']:.1f}s / {duty_status['max_usage']}s ({duty_status['percentage_used']:.1f}%)") + print(f"Remaining duty time: {duty_status['remaining_time']:.1f}s") + print(f"Total up time: {state['total_up_time']:.1f}s") + + # Always clean up GPIO + cleanup_gpio() + + return { + "success": True, + "movement": direction, + "start_height": current_height, + "end_height": target_height, + "distance": abs(delta), + "duration": actual_duration, + "duty_cycle": duty_status, + "total_up_time": state["total_up_time"] + } diff --git a/src/progressive_automations_python/duty_cycle.py b/src/progressive_automations_python/duty_cycle.py new file mode 100644 index 0000000..9a21b5a --- /dev/null +++ b/src/progressive_automations_python/duty_cycle.py @@ -0,0 +1,205 @@ +""" +Duty cycle management for motor protection. + +Implements a 10% duty cycle (2 minutes on, 18 minutes off) using a sliding window approach. +Tracks individual usage periods and enforces both continuous runtime and total usage limits. +""" + +import time +import json +import os +from datetime import datetime +from typing import List, Tuple, Dict, Any, Optional + +from progressive_automations_python.config import ( + LOWEST_HEIGHT, + DUTY_CYCLE_PERIOD, + DUTY_CYCLE_MAX_ON_TIME, + DUTY_CYCLE_PERCENTAGE, + MAX_CONTINUOUS_RUNTIME, + STATE_FILE +) + + +def load_state(): + """Load the current state from file""" + try: + with open(STATE_FILE, "r") as f: + state = json.load(f) + + # Ensure all required keys exist with proper defaults + if "usage_periods" not in state: + state["usage_periods"] = [] + if "last_position" not in state: + state["last_position"] = LOWEST_HEIGHT # Default to minimum height + if "total_up_time" not in state: + state["total_up_time"] = 0.0 + + return state + except FileNotFoundError: + # Return default state if file doesn't exist + return { + "usage_periods": [], + "last_position": LOWEST_HEIGHT, + "total_up_time": 0.0 + } + + +def save_state(state: Dict[str, Any]) -> None: + """Save state to JSON file""" + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + + +def clean_old_usage_periods(state: Dict[str, Any]) -> Dict[str, Any]: + """Remove usage periods older than the duty cycle period""" + current_time = time.time() + cutoff_time = current_time - DUTY_CYCLE_PERIOD + + # Keep only periods that end after the cutoff time + state["usage_periods"] = [ + period for period in state["usage_periods"] + if period[1] > cutoff_time # period[1] is end_timestamp + ] + return state + + +def get_current_duty_cycle_usage(state: Dict[str, Any]) -> float: + """Calculate current duty cycle usage in the sliding window""" + clean_old_usage_periods(state) + current_time = time.time() + + total_usage = 0.0 + for start_time, end_time, duration in state["usage_periods"]: + # Only count usage that's within the duty cycle period + window_start = current_time - DUTY_CYCLE_PERIOD + + # Adjust start and end times to the current window + effective_start = max(start_time, window_start) + effective_end = min(end_time, current_time) + + if effective_end > effective_start: + total_usage += effective_end - effective_start + + return total_usage + + +def record_usage_period(state: Dict[str, Any], start_time: float, end_time: float, duration: float) -> Dict[str, Any]: + """Record a usage period in the duty cycle tracking""" + state["usage_periods"].append([start_time, end_time, duration]) + return state + + +def check_movement_against_duty_cycle(target_height: float, current_height: Optional[float] = None, up_rate: float = 4.8, down_rate: float = 4.8) -> dict: + """ + Check if a movement to a target height would exceed duty cycle limits. + + Args: + target_height: Target height in mm/inches + current_height: Current height (if None, loads from state) + up_rate: Movement rate upward (mm/s or inches/s) + down_rate: Movement rate downward (mm/s or inches/s) + + Returns: + dict: { + "allowed": bool, + "error": str or None, + "estimated_duration": float, + "current_usage": float, + "remaining_capacity": float, + "movement_type": "UP" or "DOWN", + "distance": float + } + """ + # Load current state + state = load_state() + + if current_height is None: + current_height = state.get("last_position", LOWEST_HEIGHT) + + # Calculate movement requirements + distance = abs(target_height - current_height) + movement_type = "UP" if target_height > current_height else "DOWN" + rate = up_rate if movement_type == "UP" else down_rate + + if distance == 0: + return { + "allowed": True, + "error": None, + "estimated_duration": 0.0, + "current_usage": get_current_duty_cycle_usage(state), + "remaining_capacity": DUTY_CYCLE_MAX_ON_TIME - get_current_duty_cycle_usage(state), + "movement_type": movement_type, + "distance": distance + } + + estimated_duration = distance / rate + + # Check continuous runtime limit + if estimated_duration > MAX_CONTINUOUS_RUNTIME: + return { + "allowed": False, + "error": f"Movement would take {estimated_duration:.1f}s, exceeding {MAX_CONTINUOUS_RUNTIME}s continuous runtime limit", + "estimated_duration": estimated_duration, + "current_usage": get_current_duty_cycle_usage(state), + "remaining_capacity": DUTY_CYCLE_MAX_ON_TIME - get_current_duty_cycle_usage(state), + "movement_type": movement_type, + "distance": distance + } + + # Check duty cycle limits + current_usage = get_current_duty_cycle_usage(state) + remaining_capacity = DUTY_CYCLE_MAX_ON_TIME - current_usage + + if estimated_duration > remaining_capacity: + return { + "allowed": False, + "error": f"Movement would exceed 10% duty cycle limit. Current usage: {current_usage:.1f}s, Remaining: {remaining_capacity:.1f}s in {DUTY_CYCLE_PERIOD:.0f}s window", + "estimated_duration": estimated_duration, + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "movement_type": movement_type, + "distance": distance + } + + return { + "allowed": True, + "error": None, + "estimated_duration": estimated_duration, + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "movement_type": movement_type, + "distance": distance + } + + +def get_duty_cycle_status(state: Dict[str, Any]) -> Dict[str, float]: + """Get current duty cycle status information""" + current_usage = get_current_duty_cycle_usage(state) + remaining_time = max(0, DUTY_CYCLE_MAX_ON_TIME - current_usage) + percentage_used = current_usage / DUTY_CYCLE_MAX_ON_TIME * 100 + + return { + "current_usage": current_usage, + "max_usage": DUTY_CYCLE_MAX_ON_TIME, + "remaining_time": remaining_time, + "percentage_used": percentage_used, + "window_period": DUTY_CYCLE_PERIOD + } + + +def show_duty_cycle_status(): + """Display current duty cycle status in a user-friendly format""" + state = load_state() + status = get_duty_cycle_status(state) + current_usage = get_current_duty_cycle_usage(state) + + print("Current Duty Cycle Status:") + print(f" Current usage: {current_usage:.2f}s / {status['max_usage']}s") + print(f" Percentage used: {status['percentage_used']:.2f}%") + print(f" Remaining time: {status['remaining_time']:.2f}s") + print(f" Window period: {status['window_period']}s ({status['window_period']/60:.0f} minutes)") + + if len(state.get("usage_periods", [])) > 0: + print(f" Recent usage periods: {len(state['usage_periods'])}") + print(f" Total up time (all time): {state.get('total_up_time', 0):.1f}s") \ No newline at end of file diff --git a/src/progressive_automations_python/movement_control.py b/src/progressive_automations_python/movement_control.py new file mode 100644 index 0000000..a85f123 --- /dev/null +++ b/src/progressive_automations_python/movement_control.py @@ -0,0 +1,86 @@ +""" +GPIO-based movement control for the desk lifter. + +Handles low-level GPIO operations, pin management, and movement execution. +Provides safe pin control with proper initialization and cleanup. +""" + +import time +from typing import Tuple + +import RPi.GPIO as GPIO +from progressive_automations_python.config import UP_PIN, DOWN_PIN + + +def setup_gpio() -> None: + """Initialize GPIO settings""" + GPIO.setmode(GPIO.BCM) + + +def release_up() -> None: + """Set UP pin to high-impedance state""" + GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + + +def press_up() -> None: + """Set UP pin to drive low (button pressed)""" + GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) + + +def release_down() -> None: + """Set DOWN pin to high-impedance state""" + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + + +def press_down() -> None: + """Set DOWN pin to drive low (button pressed)""" + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + + +def cleanup_gpio() -> None: + """Clean up GPIO resources""" + release_up() + release_down() + GPIO.cleanup() + + +def move_up(up_time: float) -> Tuple[float, float, float]: + """ + Execute upward movement for specified time + + Returns: + (start_time, end_time, actual_duration) + """ + print(f"Moving UP for {up_time:.1f} seconds...") + + release_up() + start_time = time.time() + press_up() + time.sleep(up_time) + release_up() + end_time = time.time() + actual_duration = end_time - start_time + + print(f"UP movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration + + +def move_down(down_time: float) -> Tuple[float, float, float]: + """ + Execute downward movement for specified time + + Returns: + (start_time, end_time, actual_duration) + """ + print(f"Moving DOWN for {down_time:.1f} seconds...") + + release_down() + start_time = time.time() + press_down() + time.sleep(down_time) + release_down() + end_time = time.time() + actual_duration = end_time - start_time + + print(f"DOWN movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration \ No newline at end of file diff --git a/src/progressive_automations_python/prefect_flows.py b/src/progressive_automations_python/prefect_flows.py new file mode 100644 index 0000000..793061f --- /dev/null +++ b/src/progressive_automations_python/prefect_flows.py @@ -0,0 +1,175 @@ +""" +Prefect flow for desk lifter control. + +Provides workflow orchestration using Prefect for remote desk control. +""" + +from prefect import flow, task +from prefect.logging import get_run_logger + +from progressive_automations_python.desk_controller import ( + move_to_height, + check_duty_cycle_status_before_execution +) +from progressive_automations_python.duty_cycle import check_movement_against_duty_cycle, load_state +from progressive_automations_python.config import UP_RATE, DOWN_RATE + +# Decorate core functions as tasks +move_to_height_task = task(move_to_height) +check_duty_cycle_status_task = task(check_duty_cycle_status_before_execution) + + +@flow +def simple_movement_flow(target_height: float): + """Prefect flow for moving desk to a specific height with duty cycle management + + Movement will execute even if duty cycle capacity is full - it will wait as needed. + """ + logger = get_run_logger() + logger.info(f"=== SIMPLE MOVEMENT FLOW ===") + logger.info(f"Target: {target_height}\"") + + # Check duty cycle status + initial_status = check_duty_cycle_status_task() + logger.info(f"Initial duty cycle: {initial_status['current_usage']:.1f}s / {initial_status['window_period']}s used") + + # Get current position + state = load_state() + current_height = state.get("last_position") + + if current_height is None: + logger.error("❌ MOVEMENT ABORTED: No last known position") + raise ValueError("No last known position in state file") + + # Check movement requirements + check_result = check_movement_against_duty_cycle(target_height, current_height, UP_RATE, DOWN_RATE) + + if not check_result["allowed"]: + # Movement not immediately possible - will wait + wait_time = check_result.get("wait_time_needed", 0) + logger.warning(f"⏳ Duty cycle capacity insufficient - will wait {wait_time:.1f}s before movement") + + # Wait for duty cycle to free up + import time + time.sleep(wait_time) + logger.info(f"✅ Wait complete - proceeding with movement") + + # Execute the movement + result = move_to_height_task(target_height) + + # Check final duty cycle status + final_status = check_duty_cycle_status_task() + + # Log usage + capacity_used = initial_status["remaining_capacity"] - final_status["remaining_capacity"] + logger.info(f"Movement completed - Duty cycle used: {capacity_used:.1f}s") + + return { + **result, + "initial_duty_status": initial_status, + "final_duty_status": final_status, + "capacity_used": capacity_used, + "wait_time": check_result.get("wait_time_needed", 0) if not check_result["allowed"] else 0 + } + + +# ============================================================================= +# DEPLOYMENT FUNCTIONS +# ============================================================================= + +def deploy_custom_movements_flow(deployment_name: str = "custom-movements"): + """Deploy the main custom movements flow""" + + deployment = custom_movements_flow.from_source( + source=".", + entrypoint="prefect_flows.py:custom_movements_flow", + ).deploy( + name=deployment_name, + work_pool_name="default-process-pool", + ) + + print(f"✅ Deployment '{deployment_name}' created!") + print(f"To run: prefect deployment run 'custom-movements-flow/{deployment_name}'") + return deployment_name + + +def deploy_duty_cycle_monitoring(deployment_name: str = "duty-cycle-monitor", schedule_cron: str = None): + """Deploy duty cycle monitoring flow with optional scheduling""" + + deploy_kwargs = { + "name": deployment_name, + "work_pool_name": "default-process-pool", + } + + if schedule_cron: + from prefect.client.schemas.schedules import CronSchedule + deploy_kwargs["schedule"] = CronSchedule(cron=schedule_cron) + print(f"Deploying with cron schedule: {schedule_cron}") + + deployment = scheduled_duty_cycle_check.from_source( + source=".", + entrypoint="prefect_flows.py:scheduled_duty_cycle_check", + ).deploy(**deploy_kwargs) + + print(f"✅ Deployment '{deployment_name}' created!") + if schedule_cron: + print(f"Scheduled to run: {schedule_cron}") + else: + print(f"To run: prefect deployment run 'scheduled-duty-cycle-check/{deployment_name}'") + return deployment_name + + +def deploy_test_sequence(deployment_name: str = "test-sequence"): + """Deploy test sequence flow""" + + deployment = test_sequence_flow.from_source( + source=".", + entrypoint="prefect_flows.py:test_sequence_flow", + ).deploy( + name=deployment_name, + work_pool_name="default-process-pool", + ) + + print(f"✅ Deployment '{deployment_name}' created!") + print(f"To run: prefect deployment run 'test-sequence-flow/{deployment_name}'") + return deployment_name + + +def deploy_all_flows(): + """Deploy all desk control flows""" + print("=== DEPLOYING ALL SIMPLIFIED DESK CONTROL FLOWS ===") + + # Deploy main flows + deploy_custom_movements_flow() + deploy_test_sequence() + + # Deploy monitoring flows + deploy_duty_cycle_monitoring("duty-cycle-monitor-scheduled", "*/10 * * * *") + deploy_duty_cycle_monitoring("duty-cycle-monitor-immediate") + + print("\n🎉 All deployments created!") + print("\nAvailable flows:") + print(" 1. custom-movements - Main movement execution") + print(" 2. test-sequence - Automated test sequence") + print(" 3. duty-cycle-monitor-scheduled - Auto monitoring (every 10min)") + print(" 4. duty-cycle-monitor-immediate - On-demand monitoring") + + return True + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + if sys.argv[1] == "test": + test_sequence_flow() + elif sys.argv[1] == "movements": + custom_movements_flow() + elif sys.argv[1] == "monitor": + duty_cycle_monitoring_flow() + elif sys.argv[1] == "deploy": + deploy_all_flows() + else: + print("Usage: python prefect_flows.py [test|movements|monitor|deploy]") + else: + custom_movements_flow() \ No newline at end of file diff --git a/tests/testing.py b/tests/testing.py new file mode 100644 index 0000000..f3f7b79 --- /dev/null +++ b/tests/testing.py @@ -0,0 +1,197 @@ +""" +Testing utilities for desk lifter control. + +These functions are for testing and troubleshooting only, not production use. +""" + +import json +import os +from typing import Optional +from progressive_automations_python.config import LOWEST_HEIGHT +from progressive_automations_python.desk_controller import move_to_height, check_duty_cycle_status_before_execution +from progressive_automations_python.duty_cycle import check_movement_against_duty_cycle + + +def test_sequence(movement_distance: float = 0.5, rest_time: float = 10.0) -> dict: + """ + Execute a test sequence: move up, rest, move down + + Args: + movement_distance: Distance to move in inches + rest_time: Time to rest between movements in seconds + + Returns: + dict with test results + """ + start_height = LOWEST_HEIGHT + up_target = start_height + movement_distance + + print("Starting test sequence...") + print(f"Starting at: {start_height}\"") + print(f"Will move to: {up_target}\"") + print(f"Then rest for {rest_time} seconds") + print(f"Then return to: {start_height}\"") + + results = [] + + # Phase 1: Move up + print(f"\n--- Phase 1: Moving UP {movement_distance} inches ---") + result1 = move_to_height(up_target) + results.append(result1) + + if not result1["success"]: + return {"success": False, "phase": 1, "error": result1.get("error", "Unknown error")} + + # Phase 2: Rest + print(f"\n--- Phase 2: Resting for {rest_time} seconds ---") + import time + time.sleep(rest_time) + print("Rest complete.") + + # Phase 3: Move down + print(f"\n--- Phase 3: Moving DOWN {movement_distance} inches ---") + result2 = move_to_height(start_height) + results.append(result2) + + if not result2["success"]: + return {"success": False, "phase": 3, "error": result2.get("error", "Unknown error")} + + print("\nTest sequence complete!") + + return { + "success": True, + "results": results, + "total_duration": sum(r.get("duration", 0) for r in results if r["success"]), + "final_duty_cycle": results[-1]["duty_cycle"] if results else None + } + + +def load_movement_configs(config_file: str = "movement_configs.json") -> list: + """Load movement configurations from JSON file""" + print(f"Loading movement configurations from {config_file}") + + if not os.path.exists(config_file): + raise FileNotFoundError(f"Configuration file {config_file} not found") + + with open(config_file, 'r') as f: + config = json.load(f) + + # Filter for enabled movements only + enabled_movements = [m for m in config.get("movements", []) if m.get("enabled", True)] + + print(f"Found {len(enabled_movements)} enabled movements") + return enabled_movements + + +def validate_movement_config(movement: dict) -> dict: + """Validate a movement configuration before execution""" + movement_id = movement.get("id", "unknown") + target_height = movement["target_height"] + current_height = movement.get("current_height") + + print(f"Validating movement {movement_id}: {current_height}\" → {target_height}\"") + + # Check duty cycle limits + check_result = check_movement_against_duty_cycle(target_height, current_height) + + if not check_result["allowed"]: + error_msg = f"Movement {movement_id} rejected: {check_result['error']}" + print(f"❌ {error_msg}") + raise ValueError(error_msg) + + print(f"✅ Movement {movement_id} validated: {check_result['estimated_duration']:.1f}s, {check_result['movement_type']}") + return check_result + + +def execute_movement_config(movement: dict) -> dict: + """Execute a movement from configuration""" + movement_id = movement.get("id", "unknown") + target_height = movement["target_height"] + + print(f"Executing configured movement {movement_id}: {movement.get('description', '')}") + + result = move_to_height(target_height) + + if result["success"]: + print(f"✅ Movement {movement_id} completed: {result['duration']:.1f}s, final height: {result['end_height']}\"") + else: + print(f"❌ Movement {movement_id} failed") + raise ValueError("Movement failed") + + return result + + +def execute_custom_movements(config_file: str = "movement_configs.json") -> dict: + """Execute custom movements from configuration file""" + print("=== CUSTOM MOVEMENTS EXECUTION ===") + + # ALWAYS check duty cycle status before execution + duty_status = check_duty_cycle_status_before_execution() + + # If very low capacity, warn and potentially abort + if duty_status["remaining_capacity"] < 1.0: + print("❌ EXECUTION ABORTED: Insufficient duty cycle capacity remaining") + return { + "success": False, + "error": "Insufficient duty cycle capacity", + "duty_status": duty_status + } + + # Load movement configurations + print("Loading movement configurations from movement_configs.json") + movements = load_movement_configs(config_file) + + if not movements: + print("⚠️ No enabled movements found in configuration") + return {"success": False, "error": "No movements to execute"} + + results = [] + + for movement in movements: + movement_id = movement.get("id", "unknown") + print(f"\nProcessing movement: {movement_id}") + + try: + # Validate movement first + validation_result = validate_movement_config(movement) + + # Execute the movement if validation passed + execution_result = execute_movement_config(movement) + + results.append({ + "movement_id": movement_id, + "success": True, + "validation": validation_result, + "execution": execution_result + }) + + except Exception as e: + print(f"❌ Movement {movement_id} failed: {str(e)}") + results.append({ + "movement_id": movement_id, + "success": False, + "error": str(e) + }) + # Continue with remaining movements + + successful_movements = [r for r in results if r["success"]] + failed_movements = [r for r in results if not r["success"]] + + print(f"\n=== EXECUTION SUMMARY ===") + print(f"Total movements: {len(results)}") + print(f"Successful: {len(successful_movements)}") + print(f"Failed: {len(failed_movements)}") + + # Show final duty cycle status + print(f"\n=== FINAL DUTY CYCLE STATUS ===") + final_status = check_duty_cycle_status_before_execution() + + return { + "success": len(failed_movements) == 0, + "total_movements": len(results), + "successful": len(successful_movements), + "failed": len(failed_movements), + "results": results, + "initial_duty_status": duty_status, + "final_duty_status": final_status + }