@@ -588,7 +588,9 @@ WITH retries AS (
588588 s." id" AS " stepId" ,
589589 s." timeout" AS " stepTimeout" ,
590590 s." scheduleTimeout" AS " scheduleTimeout" ,
591- wr." id" AS " workflowRunId"
591+ sr." jobRunId" ,
592+ wr." id" AS " workflowRunId" ,
593+ wr." concurrencyGroupId" AS " workflowConcurrencyGroupId"
592594 FROM
593595 retries
594596 JOIN
@@ -601,23 +603,98 @@ WITH retries AS (
601603 " WorkflowRun" wr ON jr." workflowRunId" = wr." id"
602604 WHERE
603605 sr." status" NOT IN (' SUCCEEDED' , ' FAILED' , ' CANCELLED' )
604- ), updated_step_runs AS (
606+ ),
607+ srs_with_concurrency AS (
608+ SELECT
609+ srs.* ,
610+ srs." workflowConcurrencyGroupId"
611+ FROM
612+ srs
613+ WHERE
614+ srs." workflowConcurrencyGroupId" IS NOT NULL
615+ ),
616+ srs_without_concurrency AS (
617+ SELECT
618+ srs.* ,
619+ NULL AS " workflowConcurrencyGroupId"
620+ FROM
621+ srs
622+ WHERE
623+ srs." workflowConcurrencyGroupId" IS NULL
624+ ),
625+ updated_step_runs_with_concurrency AS (
605626 UPDATE " StepRun" sr
606627 SET
607- " scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs ." scheduleTimeout" ), INTERVAL ' 5 minutes' ),
628+ " scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs_with_concurrency ." scheduleTimeout" ), INTERVAL ' 5 minutes' ),
608629 " updatedAt" = CURRENT_TIMESTAMP ,
609- " retryCount" = srs ." retryCount" + 1
610- FROM srs
611- WHERE sr." id" = srs ." id"
630+ " retryCount" = srs_with_concurrency ." retryCount" + 1
631+ FROM srs_with_concurrency
632+ WHERE sr." id" = srs_with_concurrency ." id"
612633 RETURNING sr." id"
613- ), updated_workflow_runs AS (
634+ ),
635+ updated_workflow_runs_with_concurrency AS (
614636 UPDATE " WorkflowRun" wr
615637 SET
616638 " status" = ' QUEUED' ,
617639 " updatedAt" = CURRENT_TIMESTAMP
618- FROM srs
619- WHERE wr." id" = srs ." workflowRunId"
640+ FROM srs_with_concurrency
641+ WHERE wr." id" = srs_with_concurrency ." workflowRunId"
620642 RETURNING wr." id"
643+ ),
644+ updated_step_runs_without_concurrency AS (
645+ UPDATE " StepRun" sr
646+ SET
647+ " status" = ' PENDING_ASSIGNMENT' ,
648+ " scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs_without_concurrency." scheduleTimeout" ), INTERVAL ' 5 minutes' ),
649+ " updatedAt" = CURRENT_TIMESTAMP ,
650+ " retryCount" = srs_without_concurrency." retryCount" + 1
651+ FROM srs_without_concurrency
652+ WHERE sr." id" = srs_without_concurrency." id"
653+ RETURNING sr." id"
654+ ),
655+ updated_workflow_runs_without_concurrency AS (
656+ UPDATE " WorkflowRun" wr
657+ SET
658+ " status" = ' RUNNING' ,
659+ " updatedAt" = CURRENT_TIMESTAMP
660+ FROM srs_without_concurrency
661+ WHERE wr." id" = srs_without_concurrency." workflowRunId"
662+ RETURNING wr." id"
663+ ),
664+ update_job_runs_without_concurrency AS (
665+ UPDATE " JobRun" jr
666+ SET
667+ " status" = ' RUNNING' ,
668+ " updatedAt" = CURRENT_TIMESTAMP
669+ FROM srs_without_concurrency
670+ WHERE jr." id" = srs_without_concurrency." jobRunId"
671+ RETURNING jr." id"
672+ ), inserted_sqs_without_concurrency AS (
673+ INSERT INTO " QueueItem" (
674+ " stepRunId" ,
675+ " stepId" ,
676+ " actionId" ,
677+ " scheduleTimeoutAt" ,
678+ " stepTimeout" ,
679+ " priority" ,
680+ " isQueued" ,
681+ " tenantId" ,
682+ " queue"
683+ )
684+ SELECT
685+ srs_without_concurrency." id" ,
686+ srs_without_concurrency." stepId" ,
687+ srs_without_concurrency." actionId" ,
688+ CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs_without_concurrency." scheduleTimeout" ), INTERVAL ' 5 minutes' ),
689+ srs_without_concurrency." stepTimeout" ,
690+ -- Queue with priority 4 so that retry gets highest priority
691+ 4 ,
692+ true,
693+ srs_without_concurrency." tenantId" ,
694+ srs_without_concurrency." actionId"
695+ FROM
696+ srs_without_concurrency
697+ RETURNING " stepRunId"
621698)
622699SELECT * FROM retries;
623700
0 commit comments