Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/core-system/wp-cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,10 +399,11 @@ wp datamachine drain

# Bound work for cron/supervisors
wp datamachine drain --limit=500 --batch-size=25 --time-limit=300
wp datamachine drain --time-limit=600 --stop-before-timeout=30 --format=json
wp datamachine drain --limit=500 --time-limit=300 --format=json
```

**Options**: `--limit`, `--batch-size`, `--time-limit`, `--format=table|json`
**Options**: `--limit`, `--batch-size`, `--time-limit`, `--stop-before-timeout`, `--job-id`, `--format=table|json`

### datamachine cycle / cycles

Expand Down
49 changes: 36 additions & 13 deletions inc/Cli/Commands/DrainCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ class DrainCommand extends BaseCommand {
* default: 0
* ---
*
* [--stop-before-timeout=<seconds>]
* : Stop this many seconds before the wall-clock limit so the drain exits
* cleanly before an external supervisor timeout. Only applies when
* --time-limit is greater than 0.
* ---
* default: 0
* ---
*
* [--job-id=<ids>]
* : Optional comma-separated Data Machine job IDs to drain. Useful when
* unrelated due work is blocked ahead of a known cleanup or retry run.
Expand All @@ -72,10 +80,11 @@ public function __invoke( array $args, array $assoc_args ): void {

$stats = self::drain(
array(
'limit' => isset( $assoc_args['limit'] ) ? (int) $assoc_args['limit'] : 0,
'batch_size' => isset( $assoc_args['batch-size'] ) ? (int) $assoc_args['batch-size'] : 25,
'time_limit' => isset( $assoc_args['time-limit'] ) ? (int) $assoc_args['time-limit'] : 0,
'job_ids' => isset( $assoc_args['job-id'] ) ? (string) $assoc_args['job-id'] : '',
'limit' => isset( $assoc_args['limit'] ) ? (int) $assoc_args['limit'] : 0,
'batch_size' => isset( $assoc_args['batch-size'] ) ? (int) $assoc_args['batch-size'] : 25,
'time_limit' => isset( $assoc_args['time-limit'] ) ? (int) $assoc_args['time-limit'] : 0,
'stop_before_timeout' => isset( $assoc_args['stop-before-timeout'] ) ? (int) $assoc_args['stop-before-timeout'] : 0,
'job_ids' => isset( $assoc_args['job-id'] ) ? (string) $assoc_args['job-id'] : '',
)
);

Expand All @@ -94,28 +103,37 @@ public function __invoke( array $args, array $assoc_args ): void {
* so this drain runs concrete due action IDs from that same group instead of
* a hand-maintained hook allow-list.
*
* @param array{limit?:int,batch_size?:int,time_limit?:int,hooks?:string[],job_ids?:string|int[]} $options Drain options.
* @param array{limit?:int,batch_size?:int,time_limit?:int,stop_before_timeout?:int,hooks?:string[],job_ids?:string|int[]} $options Drain options.
* @return array<string,int|string> Drain stats.
*/
public static function drain( array $options = array() ): array {
$limit = max( 0, (int) ( $options['limit'] ?? 0 ) );
$batch_size = max( 1, (int) ( $options['batch_size'] ?? 25 ) );
$time_limit = max( 0, (int) ( $options['time_limit'] ?? 0 ) );
$hooks = self::normalizeHooks( $options['hooks'] ?? null );
$job_ids = self::normalizeJobIds( $options['job_ids'] ?? null );
$limit = max( 0, (int) ( $options['limit'] ?? 0 ) );
$batch_size = max( 1, (int) ( $options['batch_size'] ?? 25 ) );
$time_limit = max( 0, (int) ( $options['time_limit'] ?? 0 ) );
$stop_before_timeout = max( 0, (int) ( $options['stop_before_timeout'] ?? 0 ) );
$hooks = self::normalizeHooks( $options['hooks'] ?? null );
$job_ids = self::normalizeJobIds( $options['job_ids'] ?? null );

$started_at = time();
$before_counts = self::getStatusCounts( $hooks, $job_ids );
$batches = 0;
$warnings = 0;
$stop_reason = 'empty';

while ( self::getDuePendingCount( $hooks, $job_ids ) > 0 ) {
$stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids );
$stats = self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids, $stop_reason );
if ( $limit > 0 && (int) $stats['actions_processed'] >= $limit ) {
$stop_reason = 'limit';
break;
}

if ( $time_limit > 0 && ( time() - $started_at ) >= $time_limit ) {
$stop_reason = 'time_limit';
break;
}

if ( $time_limit > 0 && ( $time_limit - ( time() - $started_at ) ) <= $stop_before_timeout ) {
$stop_reason = 'timeout_margin';
break;
}

Expand All @@ -124,11 +142,13 @@ public static function drain( array $options = array() ): array {
$current_batch_size = min( $batch_size, $limit - (int) $stats['actions_processed'] );
}
if ( $current_batch_size <= 0 ) {
$stop_reason = 'limit';
break;
}

$action_ids = self::getDuePendingActionIds( $current_batch_size, $hooks, $job_ids );
if ( empty( $action_ids ) ) {
$stop_reason = 'empty';
break;
}

Expand All @@ -141,6 +161,7 @@ public static function drain( array $options = array() ): array {
++$warnings;
$message = trim( (string) ( $result->stderr ?? '' ) );
WP_CLI::warning( '' === $message ? 'Action Scheduler CLI drain failed.' : $message );
$stop_reason = 'warning';
break;
}

Expand All @@ -149,11 +170,12 @@ public static function drain( array $options = array() ): array {
if ( 0 === $progress && self::getDuePendingCount( $hooks, $job_ids ) >= $due_before ) {
++$warnings;
WP_CLI::warning( 'Drain stopped because Action Scheduler made no observable progress.' );
$stop_reason = 'no_progress';
break;
}
}

return self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids );
return self::buildStats( $before_counts, self::getStatusCounts( $hooks, $job_ids ), $batches, $warnings, $hooks, $job_ids, $stop_reason );
}

/**
Expand Down Expand Up @@ -213,7 +235,7 @@ private static function runActionSchedulerActions( array $action_ids ): object {
* @param int $warnings Warning count.
* @return array<string,int|string> Stats.
*/
private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null, array $job_ids = array() ): array {
private static function buildStats( array $before_counts, array $after_counts, int $batches, int $warnings, ?array $hooks = null, array $job_ids = array(), string $stop_reason = 'empty' ): array {
$batch_completed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'complete' );
$batch_failed = self::delta( $before_counts, $after_counts, self::HOOK_BATCH_CHUNK, 'failed' );
$step_completed = self::delta( $before_counts, $after_counts, self::HOOK_EXECUTE_STEP, 'complete' );
Expand All @@ -238,6 +260,7 @@ private static function buildStats( array $before_counts, array $after_counts, i
'remaining_pending' => self::getDuePendingCount( $hooks, $job_ids ),
'total_pending' => self::getPendingCount( $hooks, $job_ids ),
'warnings' => $warnings,
'stop_reason' => $stop_reason,
'hooks' => implode( ',', self::processedHooks( $before_counts, $after_counts ) ),
'job_ids' => implode( ',', $job_ids ),
);
Expand Down
4 changes: 4 additions & 0 deletions tests/flow-run-cli-drain-smoke.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ function assert_drain_contains( string $needle, string $haystack, string $messag
assert_drain_contains( "datamachine_pipeline_batch_chunk'", $drain_src, 'drain includes pipeline batch chunk hook' );
assert_drain_contains( "datamachine_execute_step'", $drain_src, 'drain includes execute step hook' );
assert_drain_contains( '[--job-id=<ids>]', $drain_src, 'drain documents optional job-id scope' );
assert_drain_contains( '[--stop-before-timeout=<seconds>]', $drain_src, 'drain documents optional timeout safety margin' );
assert_drain_contains( "'stop_before_timeout'", $drain_src, 'drain accepts worker-style timeout safety margin option' );
assert_drain_contains( 'normalizeJobIds', $drain_src, 'drain normalizes optional job-id scope' );
assert_drain_contains( 'hookWhereSql( $hooks, $job_ids )', $drain_src, 'drain supports optional hook and job-id scopes' );
assert_drain_contains( 'a.args LIKE %s', $drain_src, 'drain can filter pending actions by serialized job_id args' );
Expand All @@ -56,6 +58,8 @@ function assert_drain_contains( string $needle, string $haystack, string $messag
assert_drain_contains( "'other_actions'", $drain_src, 'drain reports non-pipeline action counts' );
assert_drain_contains( "'completions'", $drain_src, 'drain reports completions' );
assert_drain_contains( "'failures'", $drain_src, 'drain reports failures' );
assert_drain_contains( "'stop_reason'", $drain_src, 'drain reports why it stopped' );
assert_drain_contains( "'timeout_margin'", $drain_src, 'drain reports timeout margin stops' );

$run_flow_start = strpos( $src, 'private function runFlow' );
assert_true( false !== $run_flow_start, 'runFlow method found' );
Expand Down
Loading