From 331701285f8809eff49a9e38326fc30933de99a1 Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Sun, 24 May 2026 09:33:12 -0400 Subject: [PATCH] fix: add drain timeout margin --- docs/core-system/wp-cli.md | 3 +- inc/Cli/Commands/DrainCommand.php | 49 ++++++++++++++++++++++-------- tests/flow-run-cli-drain-smoke.php | 4 +++ 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/docs/core-system/wp-cli.md b/docs/core-system/wp-cli.md index 0583ad17e..2e4e0f7f3 100644 --- a/docs/core-system/wp-cli.md +++ b/docs/core-system/wp-cli.md @@ -399,10 +399,11 @@ wp datamachine drain # Bound work for cron/supervisors wp datamachine drain --limit=500 --batch-size=25 --time-limit=300 +wp datamachine drain --time-limit=600 --stop-before-timeout=30 --format=json wp datamachine drain --limit=500 --time-limit=300 --format=json ``` -**Options**: `--limit`, `--batch-size`, `--time-limit`, `--format=table|json` +**Options**: `--limit`, `--batch-size`, `--time-limit`, `--stop-before-timeout`, `--job-id`, `--format=table|json` ### datamachine cycle / cycles diff --git a/inc/Cli/Commands/DrainCommand.php b/inc/Cli/Commands/DrainCommand.php index ba9882fa3..91749eccd 100644 --- a/inc/Cli/Commands/DrainCommand.php +++ b/inc/Cli/Commands/DrainCommand.php @@ -46,6 +46,14 @@ class DrainCommand extends BaseCommand { * default: 0 * --- * + * [--stop-before-timeout=] + * : Stop this many seconds before the wall-clock limit so the drain exits + * cleanly before an external supervisor timeout. Only applies when + * --time-limit is greater than 0. + * --- + * default: 0 + * --- + * * [--job-id=] * : Optional comma-separated Data Machine job IDs to drain. Useful when * unrelated due work is blocked ahead of a known cleanup or retry run. @@ -72,10 +80,11 @@ public function __invoke( array $args, array $assoc_args ): void { $stats = self::drain( array( - 'limit' => isset( $assoc_args['limit'] ) ? (int) $assoc_args['limit'] : 0, - 'batch_size' => isset( $assoc_args['batch-size'] ) ? (int) $assoc_args['batch-size'] : 25, - 'time_limit' => isset( $assoc_args['time-limit'] ) ? (int) $assoc_args['time-limit'] : 0, - 'job_ids' => isset( $assoc_args['job-id'] ) ? (string) $assoc_args['job-id'] : '', + 'limit' => isset( $assoc_args['limit'] ) ? (int) $assoc_args['limit'] : 0, + 'batch_size' => isset( $assoc_args['batch-size'] ) ? (int) $assoc_args['batch-size'] : 25, + 'time_limit' => isset( $assoc_args['time-limit'] ) ? (int) $assoc_args['time-limit'] : 0, + 'stop_before_timeout' => isset( $assoc_args['stop-before-timeout'] ) ? (int) $assoc_args['stop-before-timeout'] : 0, + 'job_ids' => isset( $assoc_args['job-id'] ) ? (string) $assoc_args['job-id'] : '', ) ); @@ -94,28 +103,37 @@ public function __invoke( array $args, array $assoc_args ): void { * so this drain runs concrete due action IDs from that same group instead of * a hand-maintained hook allow-list. * - * @param array{limit?:int,batch_size?:int,time_limit?:int,hooks?:string[],job_ids?:string|int[]} $options Drain options. + * @param array{limit?:int,batch_size?:int,time_limit?:int,stop_before_timeout?:int,hooks?:string[],job_ids?:string|int[]} $options Drain options. * @return array Drain stats. */ public static function drain( array $options = array() ): array { - $limit = max( 0, (int) ( $options['limit'] ?? 0 ) ); - $batch_size = max( 1, (int) ( $options['batch_size'] ?? 25 ) ); - $time_limit = max( 0, (int) ( $options['time_limit'] ?? 0 ) ); - $hooks = self::normalizeHooks( $options['hooks'] ?? null ); - $job_ids = self::normalizeJobIds( $options['job_ids'] ?? null ); + $limit = max( 0, (int) ( $options['limit'] ?? 0 ) ); + $batch_size = max( 1, (int) ( $options['batch_size'] ?? 25 ) ); + $time_limit = max( 0, (int) ( $options['time_limit'] ?? 0 ) ); + $stop_before_timeout = max( 0, (int) ( $options['stop_before_timeout'] ?? 0 ) ); + $hooks = self::normalizeHooks( $options['hooks'] ?? null ); + $job_ids = self::normalizeJobIds( $options['job_ids'] ?? null ); $started_at = time(); $before_counts = self::getStatusCounts( $hooks, $job_ids ); $batches = 0; $warnings = 0; + $stop_reason = 'empty'; while ( self::getDuePendingCount( $hooks, $job_ids ) > 0 ) { - $stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids ); + $stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids, $stop_reason ); if ( $limit > 0 && (int) $stats['actions_processed'] >= $limit ) { + $stop_reason = 'limit'; break; } if ( $time_limit > 0 && ( time() - $started_at ) >= $time_limit ) { + $stop_reason = 'time_limit'; + break; + } + + if ( $time_limit > 0 && ( $time_limit - ( time() - $started_at ) ) <= $stop_before_timeout ) { + $stop_reason = 'timeout_margin'; break; } @@ -124,11 +142,13 @@ public static function drain( array $options = array() ): array { $current_batch_size = min( $batch_size, $limit - (int) $stats['actions_processed'] ); } if ( $current_batch_size <= 0 ) { + $stop_reason = 'limit'; break; } $action_ids = self::getDuePendingActionIds( $current_batch_size, $hooks, $job_ids ); if ( empty( $action_ids ) ) { + $stop_reason = 'empty'; break; } @@ -141,6 +161,7 @@ public static function drain( array $options = array() ): array { ++$warnings; $message = trim( (string) ( $result->stderr ?? '' ) ); WP_CLI::warning( '' === $message ? 'Action Scheduler CLI drain failed.' : $message ); + $stop_reason = 'warning'; break; } @@ -149,11 +170,12 @@ public static function drain( array $options = array() ): array { if ( 0 === $progress && self::getDuePendingCount( $hooks, $job_ids ) >= $due_before ) { ++$warnings; WP_CLI::warning( 'Drain stopped because Action Scheduler made no observable progress.' ); + $stop_reason = 'no_progress'; break; } } - return self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids ); + return self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids, $stop_reason ); } /** @@ -213,7 +235,7 @@ private static function runActionSchedulerActions( array $action_ids ): object { * @param int $warnings Warning count. * @return array Stats. */ - private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null, array $job_ids = array() ): array { + private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null, array $job_ids = array(), string $stop_reason = 'empty' ): array { $batch_completed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'complete' ); $batch_failed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'failed' ); $step_completed = self::delta( $before_counts, $after_counts, self::HOOK_EXECUTE_STEP, 'complete' ); @@ -238,6 +260,7 @@ private static function buildStats( array $before_counts, array $after_counts, i 'remaining_pending' => self::getDuePendingCount( $hooks, $job_ids ), 'total_pending' => self::getPendingCount( $hooks, $job_ids ), 'warnings' => $warnings, + 'stop_reason' => $stop_reason, 'hooks' => implode( ',', self::processedHooks( $before_counts, $after_counts ) ), 'job_ids' => implode( ',', $job_ids ), ); diff --git a/tests/flow-run-cli-drain-smoke.php b/tests/flow-run-cli-drain-smoke.php index 2e923dadf..0268d4887 100644 --- a/tests/flow-run-cli-drain-smoke.php +++ b/tests/flow-run-cli-drain-smoke.php @@ -40,6 +40,8 @@ function assert_drain_contains( string $needle, string $haystack, string $messag assert_drain_contains( "datamachine_pipeline_batch_chunk'", $drain_src, 'drain includes pipeline batch chunk hook' ); assert_drain_contains( "datamachine_execute_step'", $drain_src, 'drain includes execute step hook' ); assert_drain_contains( '[--job-id=]', $drain_src, 'drain documents optional job-id scope' ); +assert_drain_contains( '[--stop-before-timeout=]', $drain_src, 'drain documents optional timeout safety margin' ); +assert_drain_contains( "'stop_before_timeout'", $drain_src, 'drain accepts worker-style timeout safety margin option' ); assert_drain_contains( 'normalizeJobIds', $drain_src, 'drain normalizes optional job-id scope' ); assert_drain_contains( 'hookWhereSql( $hooks, $job_ids )', $drain_src, 'drain supports optional hook and job-id scopes' ); assert_drain_contains( 'a.args LIKE %s', $drain_src, 'drain can filter pending actions by serialized job_id args' ); @@ -56,6 +58,8 @@ function assert_drain_contains( string $needle, string $haystack, string $messag assert_drain_contains( "'other_actions'", $drain_src, 'drain reports non-pipeline action counts' ); assert_drain_contains( "'completions'", $drain_src, 'drain reports completions' ); assert_drain_contains( "'failures'", $drain_src, 'drain reports failures' ); +assert_drain_contains( "'stop_reason'", $drain_src, 'drain reports why it stopped' ); +assert_drain_contains( "'timeout_margin'", $drain_src, 'drain reports timeout margin stops' ); $run_flow_start = strpos( $src, 'private function runFlow' ); assert_true( false !== $run_flow_start, 'runFlow method found' );