diff --git a/inc/Abilities/Engine/ExecuteStepAbility.php b/inc/Abilities/Engine/ExecuteStepAbility.php index 3718846b5..d1a5a2358 100644 --- a/inc/Abilities/Engine/ExecuteStepAbility.php +++ b/inc/Abilities/Engine/ExecuteStepAbility.php @@ -530,50 +530,6 @@ private function routeAfterExecution( $next_flow_step_id = $navigator->get_next_flow_step_id( $flow_step_id, $payload ); if ( $next_flow_step_id ) { - $packet_count = count( $dataPackets ); - - // Inline continuation: when a step produces 0-1 DataPackets, - // schedule the next step directly on the same job instead of - // creating child jobs. This eliminates recursive fan-out where - // children spawn grandchildren (e.g., AI step → upsert step). - // - // Fan-out is only meaningful when a step produces MULTIPLE - // packets that need parallel processing (e.g., fetch step - // producing one packet per event). A single packet is just - // the same job continuing to the next step. - if ( $packet_count <= 1 ) { - // For fetch/event_import steps with a single item (inline continuation), - // seed dedup context into engine_data so markCompletedItemProcessed() - // can find it when the last step completes. For fan-out children this - // is handled in PipelineBatchScheduler::createChildJob(). - if ( in_array( $step_type, array( 'fetch', 'event_import' ), true ) && ! empty( $dataPackets ) ) { - $packet_meta = $dataPackets[0]['metadata'] ?? array(); - $seed_data = array(); - if ( ! empty( $packet_meta['item_identifier'] ) ) { - $seed_data['item_identifier'] = $packet_meta['item_identifier']; - } - if ( ! empty( $packet_meta['source_type'] ) ) { - $seed_data['source_type'] = $packet_meta['source_type']; - } - if ( ! empty( $seed_data ) ) { - datamachine_merge_engine_data( $job_id, $seed_data ); - } - } - - do_action( - 'datamachine_schedule_next_step', - $job_id, - $next_flow_step_id, - $dataPackets - ); - - return array( - 'success' => true, - 'step_success' => true, - 'outcome' => 'inline_continuation', - ); - } - $engine = $payload['engine'] ?? null; $next_flow_step_config = $engine instanceof EngineData ? $engine->getFlowStepConfig( $next_flow_step_id ) : array(); $transition_route = self::resolveTransitionRoute( $flow_step_config, $next_flow_step_config, $dataPackets ); @@ -601,15 +557,42 @@ private function routeAfterExecution( ); } - $fanout_packets = $transition_route['packets']; + $routed_packets = $transition_route['packets']; + $packet_count = count( $routed_packets ); + + // Inline continuation: when a step produces 0-1 DataPackets, + // schedule the next step directly on the same job instead of + // creating child jobs. This eliminates recursive fan-out where + // children spawn grandchildren (e.g., AI step → upsert step). + // + // Fan-out is only meaningful when a step produces MULTIPLE + // packets that need parallel processing (e.g., fetch step + // producing one packet per event). A single packet is just + // the same job continuing to the next step. + if ( 'inline' === $transition_route['mode'] || $packet_count <= 1 ) { + // For fetch/event_import steps with a single item (inline continuation), + // seed dedup context into engine_data so markCompletedItemProcessed() + // can find it when the last step completes. For fan-out children this + // is handled in PipelineBatchScheduler::createChildJob(). + if ( in_array( $step_type, array( 'fetch', 'event_import' ), true ) && ! empty( $routed_packets ) ) { + $packet_meta = $routed_packets[0]['metadata'] ?? array(); + $seed_data = array(); + if ( ! empty( $packet_meta['item_identifier'] ) ) { + $seed_data['item_identifier'] = $packet_meta['item_identifier']; + } + if ( ! empty( $packet_meta['source_type'] ) ) { + $seed_data['source_type'] = $packet_meta['source_type']; + } + if ( ! empty( $seed_data ) ) { + datamachine_merge_engine_data( $job_id, $seed_data ); + } + } - // After filtering, check if we're back to ≤1 packet — inline instead of fan-out. - if ( 'inline' === $transition_route['mode'] || count( $fanout_packets ) <= 1 ) { do_action( 'datamachine_schedule_next_step', $job_id, $next_flow_step_id, - $fanout_packets + $routed_packets ); return array( @@ -619,14 +602,14 @@ private function routeAfterExecution( ); } - // Fan out: each handler DataPacket becomes its own child job + // Fan out: each routed DataPacket becomes its own child job // continuing through the remaining pipeline steps. $engine_snapshot = datamachine_get_engine_data( $job_id ); $batch_scheduler = new PipelineBatchScheduler(); $batch_result = $batch_scheduler->fanOut( $job_id, $next_flow_step_id, - $fanout_packets, + $routed_packets, $engine_snapshot ); diff --git a/tests/Unit/Abilities/Engine/ExecuteStepFanOutFilterTest.php b/tests/Unit/Abilities/Engine/ExecuteStepFanOutFilterTest.php index 6349a52ee..859a77aec 100644 --- a/tests/Unit/Abilities/Engine/ExecuteStepFanOutFilterTest.php +++ b/tests/Unit/Abilities/Engine/ExecuteStepFanOutFilterTest.php @@ -198,6 +198,38 @@ public function test_ai_to_handler_step_without_handler_packets_fails_transition $this->assertSame( 'handler_requiring_step_missing_handler_packets', $route['reason'] ); } + public function test_ai_to_handler_step_with_single_non_handler_packet_fails_transition(): void { + $route = ExecuteStepAbility::resolveTransitionRoute( + array( 'step_type' => 'ai' ), + array( + 'step_type' => 'publish', + 'handler_slugs' => array( 'publish_post' ), + ), + array( + $this->make_packet( 'tool_result', array( 'tool_name' => 'search' ) ), + ) + ); + + $this->assertSame( 'fail', $route['mode'] ); + $this->assertSame( array(), $route['packets'] ); + $this->assertSame( 'handler_requiring_step_missing_handler_packets', $route['reason'] ); + } + + public function test_ai_to_handler_step_with_no_packets_fails_transition(): void { + $route = ExecuteStepAbility::resolveTransitionRoute( + array( 'step_type' => 'ai' ), + array( + 'step_type' => 'upsert', + 'handler_slugs' => array( 'upsert_event' ), + ), + array() + ); + + $this->assertSame( 'fail', $route['mode'] ); + $this->assertSame( array(), $route['packets'] ); + $this->assertSame( 'handler_requiring_step_missing_handler_packets', $route['reason'] ); + } + public function test_ai_to_multi_handler_step_keeps_handler_packets_together(): void { $route = ExecuteStepAbility::resolveTransitionRoute( array( 'step_type' => 'ai' ), diff --git a/tests/transition-fanout-policy-smoke.php b/tests/transition-fanout-policy-smoke.php index 694eccb04..ed9135aef 100644 --- a/tests/transition-fanout-policy-smoke.php +++ b/tests/transition-fanout-policy-smoke.php @@ -86,6 +86,20 @@ function transition_fanout_assert_same( $expected, $actual, string $name, array transition_fanout_assert_same( 'fail', $route['mode'], 'ai non-handler packets fail before handler step', $failures, $passes ); transition_fanout_assert_same( 'handler_requiring_step_missing_handler_packets', $route['reason'], 'failure reason is explicit', $failures, $passes ); +$route = ExecuteStepAbility::resolveTransitionRoute( + array( 'step_type' => 'ai' ), + array( + 'step_type' => 'publish', + 'handler_slugs' => array( 'publish_post' ), + ), + array( + transition_fanout_packet( 'tool_result', array( 'tool_name' => 'search' ) ), + ) +); + +transition_fanout_assert_same( 'fail', $route['mode'], 'single ai non-handler packet fails before handler step', $failures, $passes ); +transition_fanout_assert_same( 'handler_requiring_step_missing_handler_packets', $route['reason'], 'single-packet failure reason is explicit', $failures, $passes ); + $route = ExecuteStepAbility::resolveTransitionRoute( array( 'step_type' => 'ai' ), array(