Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dagster._core.storage.dagster_run import (
CANCELABLE_RUN_STATUSES,
NOT_FINISHED_STATUSES,
DagsterRunStatus,
RunsFilter,
)
from dagster._core.storage.tags import BACKFILL_ID_TAG, USER_TAG
Expand Down Expand Up @@ -557,26 +558,50 @@ def cancel_backfill_runs_and_cancellation_complete(

while True:
# Cancel all cancelable runs for the backfill in batches

# start with the queued runs since those will be faster to cancel
runs_to_cancel_in_iteration = instance.run_storage.get_runs(
filters=RunsFilter(
statuses=CANCELABLE_RUN_STATUSES,
statuses=[DagsterRunStatus.QUEUED],
tags={
BACKFILL_ID_TAG: backfill_id,
},
),
limit=CANCELABLE_RUNS_BATCH_SIZE,
ascending=True,
)

if not runs_to_cancel_in_iteration:
break
# once all queued runs are canceled, cancel all other cancelable runs
runs_to_cancel_in_iteration = instance.run_storage.get_runs(
filters=RunsFilter(
statuses=CANCELABLE_RUN_STATUSES,
tags={
BACKFILL_ID_TAG: backfill_id,
},
),
limit=CANCELABLE_RUNS_BATCH_SIZE,
ascending=True,
)
if not runs_to_cancel_in_iteration:
break

canceled_any_runs = True
for run in runs_to_cancel_in_iteration:
run_id = run.run_id
logger.info(f"Terminating submitted run {run_id}")
# calling cancel_run will immediately set its status to CANCELING or CANCELED,

# in both cases this will synchonrously set its status to CANCELING or CANCELED,
# ensuring that it will not be returned in the next loop
instance.run_coordinator.cancel_run(run_id)

if run.status == DagsterRunStatus.QUEUED:
instance.report_run_canceling(
run,
message="Canceling run from the queue.",
)
instance.report_run_canceled(run)
else:
instance.run_launcher.terminate(run_id)

if canceled_any_runs:
# since we are canceling some runs in this iteration, we know that there is more work to do.
Expand Down