Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 33 additions & 50 deletions inc/Abilities/Engine/ExecuteStepAbility.php
Original file line number Diff line number Diff line change
Expand Up @@ -530,50 +530,6 @@ private function routeAfterExecution(
$next_flow_step_id = $navigator->get_next_flow_step_id( $flow_step_id, $payload );

if ( $next_flow_step_id ) {
$packet_count = count( $dataPackets );

// Inline continuation: when a step produces 0-1 DataPackets,
// schedule the next step directly on the same job instead of
// creating child jobs. This eliminates recursive fan-out where
// children spawn grandchildren (e.g., AI step → upsert step).
//
// Fan-out is only meaningful when a step produces MULTIPLE
// packets that need parallel processing (e.g., fetch step
// producing one packet per event). A single packet is just
// the same job continuing to the next step.
if ( $packet_count <= 1 ) {
// For fetch/event_import steps with a single item (inline continuation),
// seed dedup context into engine_data so markCompletedItemProcessed()
// can find it when the last step completes. For fan-out children this
// is handled in PipelineBatchScheduler::createChildJob().
if ( in_array( $step_type, array( 'fetch', 'event_import' ), true ) && ! empty( $dataPackets ) ) {
$packet_meta = $dataPackets[0]['metadata'] ?? array();
$seed_data = array();
if ( ! empty( $packet_meta['item_identifier'] ) ) {
$seed_data['item_identifier'] = $packet_meta['item_identifier'];
}
if ( ! empty( $packet_meta['source_type'] ) ) {
$seed_data['source_type'] = $packet_meta['source_type'];
}
if ( ! empty( $seed_data ) ) {
datamachine_merge_engine_data( $job_id, $seed_data );
}
}

do_action(
'datamachine_schedule_next_step',
$job_id,
$next_flow_step_id,
$dataPackets
);

return array(
'success' => true,
'step_success' => true,
'outcome' => 'inline_continuation',
);
}

$engine = $payload['engine'] ?? null;
$next_flow_step_config = $engine instanceof EngineData ? $engine->getFlowStepConfig( $next_flow_step_id ) : array();
$transition_route = self::resolveTransitionRoute( $flow_step_config, $next_flow_step_config, $dataPackets );
Expand Down Expand Up @@ -601,15 +557,42 @@ private function routeAfterExecution(
);
}

$fanout_packets = $transition_route['packets'];
$routed_packets = $transition_route['packets'];
$packet_count = count( $routed_packets );

// Inline continuation: when a step produces 0-1 DataPackets,
// schedule the next step directly on the same job instead of
// creating child jobs. This eliminates recursive fan-out where
// children spawn grandchildren (e.g., AI step → upsert step).
//
// Fan-out is only meaningful when a step produces MULTIPLE
// packets that need parallel processing (e.g., fetch step
// producing one packet per event). A single packet is just
// the same job continuing to the next step.
if ( 'inline' === $transition_route['mode'] || $packet_count <= 1 ) {
// For fetch/event_import steps with a single item (inline continuation),
// seed dedup context into engine_data so markCompletedItemProcessed()
// can find it when the last step completes. For fan-out children this
// is handled in PipelineBatchScheduler::createChildJob().
if ( in_array( $step_type, array( 'fetch', 'event_import' ), true ) && ! empty( $routed_packets ) ) {
$packet_meta = $routed_packets[0]['metadata'] ?? array();
$seed_data = array();
if ( ! empty( $packet_meta['item_identifier'] ) ) {
$seed_data['item_identifier'] = $packet_meta['item_identifier'];
}
if ( ! empty( $packet_meta['source_type'] ) ) {
$seed_data['source_type'] = $packet_meta['source_type'];
}
if ( ! empty( $seed_data ) ) {
datamachine_merge_engine_data( $job_id, $seed_data );
}
}

// After filtering, check if we're back to ≤1 packet — inline instead of fan-out.
if ( 'inline' === $transition_route['mode'] || count( $fanout_packets ) <= 1 ) {
do_action(
'datamachine_schedule_next_step',
$job_id,
$next_flow_step_id,
$fanout_packets
$routed_packets
);

return array(
Expand All @@ -619,14 +602,14 @@ private function routeAfterExecution(
);
}

// Fan out: each handler DataPacket becomes its own child job
// Fan out: each routed DataPacket becomes its own child job
// continuing through the remaining pipeline steps.
$engine_snapshot = datamachine_get_engine_data( $job_id );
$batch_scheduler = new PipelineBatchScheduler();
$batch_result = $batch_scheduler->fanOut(
$job_id,
$next_flow_step_id,
$fanout_packets,
$routed_packets,
$engine_snapshot
);

Expand Down
32 changes: 32 additions & 0 deletions tests/Unit/Abilities/Engine/ExecuteStepFanOutFilterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,38 @@ public function test_ai_to_handler_step_without_handler_packets_fails_transition
$this->assertSame( 'handler_requiring_step_missing_handler_packets', $route['reason'] );
}

public function test_ai_to_handler_step_with_single_non_handler_packet_fails_transition(): void {
$route = ExecuteStepAbility::resolveTransitionRoute(
array( 'step_type' => 'ai' ),
array(
'step_type' => 'publish',
'handler_slugs' => array( 'publish_post' ),
),
array(
$this->make_packet( 'tool_result', array( 'tool_name' => 'search' ) ),
)
);

$this->assertSame( 'fail', $route['mode'] );
$this->assertSame( array(), $route['packets'] );
$this->assertSame( 'handler_requiring_step_missing_handler_packets', $route['reason'] );
}

public function test_ai_to_handler_step_with_no_packets_fails_transition(): void {
$route = ExecuteStepAbility::resolveTransitionRoute(
array( 'step_type' => 'ai' ),
array(
'step_type' => 'upsert',
'handler_slugs' => array( 'upsert_event' ),
),
array()
);

$this->assertSame( 'fail', $route['mode'] );
$this->assertSame( array(), $route['packets'] );
$this->assertSame( 'handler_requiring_step_missing_handler_packets', $route['reason'] );
}

public function test_ai_to_multi_handler_step_keeps_handler_packets_together(): void {
$route = ExecuteStepAbility::resolveTransitionRoute(
array( 'step_type' => 'ai' ),
Expand Down
14 changes: 14 additions & 0 deletions tests/transition-fanout-policy-smoke.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ function transition_fanout_assert_same( $expected, $actual, string $name, array
transition_fanout_assert_same( 'fail', $route['mode'], 'ai non-handler packets fail before handler step', $failures, $passes );
transition_fanout_assert_same( 'handler_requiring_step_missing_handler_packets', $route['reason'], 'failure reason is explicit', $failures, $passes );

$route = ExecuteStepAbility::resolveTransitionRoute(
array( 'step_type' => 'ai' ),
array(
'step_type' => 'publish',
'handler_slugs' => array( 'publish_post' ),
),
array(
transition_fanout_packet( 'tool_result', array( 'tool_name' => 'search' ) ),
)
);

transition_fanout_assert_same( 'fail', $route['mode'], 'single ai non-handler packet fails before handler step', $failures, $passes );
transition_fanout_assert_same( 'handler_requiring_step_missing_handler_packets', $route['reason'], 'single-packet failure reason is explicit', $failures, $passes );

$route = ExecuteStepAbility::resolveTransitionRoute(
array( 'step_type' => 'ai' ),
array(
Expand Down
Loading