Skip to content

Commit 6f3b04c

Browse files
authored
Don't write a cursor during the global DA => DA sensor migration for sensors that don't target any asset keys (#32794)
Summary: Prevents an issue where: - there is a large cursor - we are migrating from global DA to sensors - there is one or more sensors that don't actually target any assets - the large cursor is written to that sensor - the large cursor is never removed because the sensor never executes/updates Simulate a DA transition via script Existing test coverage of the migration transition ## Summary & Motivation ## How I Tested These Changes ## Changelog > Insert changelog entry or delete this section.
1 parent 7f56fb3 commit 6f3b04c

File tree

2 files changed

+63
-13
lines changed

2 files changed

+63
-13
lines changed

python_modules/dagster/dagster/_daemon/asset_daemon.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -644,9 +644,35 @@ def _create_initial_sensor_cursors_from_raw_cursor(
644644

645645
result = {}
646646

647-
initial_cursor = asset_daemon_cursor_to_instigator_serialized_cursor(pre_sensor_cursor)
648-
649647
for sensor, repo in sensors_and_repos:
648+
selection = sensor.asset_selection
649+
if not selection:
650+
continue
651+
652+
repo_asset_graph = repo.asset_graph
653+
resolved_keys = selection.resolve(repo_asset_graph) | selection.resolve_checks(
654+
repo_asset_graph
655+
)
656+
657+
serialized_cursor = None
658+
659+
if len(resolved_keys) > 0:
660+
# filter down the cursor to just the keys targeted by the sensor
661+
condition_cursors = [
662+
condition_cursor
663+
for condition_cursor in (pre_sensor_cursor.previous_condition_cursors or [])
664+
if condition_cursor.key in resolved_keys
665+
]
666+
667+
cursor_to_use = dataclasses.replace(
668+
pre_sensor_cursor,
669+
previous_condition_cursors=condition_cursors,
670+
)
671+
672+
serialized_cursor = asset_daemon_cursor_to_instigator_serialized_cursor(
673+
cursor_to_use
674+
)
675+
650676
new_auto_materialize_state = InstigatorState(
651677
sensor.get_remote_origin(),
652678
InstigatorType.SENSOR,
@@ -657,7 +683,7 @@ def _create_initial_sensor_cursors_from_raw_cursor(
657683
),
658684
SensorInstigatorData(
659685
min_interval=sensor.min_interval_seconds,
660-
cursor=initial_cursor,
686+
cursor=serialized_cursor,
661687
last_sensor_start_timestamp=get_current_timestamp(),
662688
sensor_type=sensor.sensor_type,
663689
),

python_modules/dagster/dagster_tests/declarative_automation_tests/daemon_tests/test_asset_daemon.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from unittest import mock
77

88
import dagster as dg
9+
import dagster._check as check
910
import pytest
1011
from dagster import AutoMaterializeRule, AutomationCondition, DagsterInstance
1112
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
@@ -449,6 +450,12 @@ def test_daemon_skip_env_var() -> None:
449450
default_status=DefaultSensorStatus.STOPPED,
450451
minimum_interval_seconds=15,
451452
),
453+
dg.AutomationConditionSensorDefinition(
454+
name="auto_materialize_sensor_no_assets",
455+
target=AssetSelection.groups("nonexistant"),
456+
default_status=DefaultSensorStatus.STOPPED,
457+
minimum_interval_seconds=15,
458+
),
452459
# default sensor picks up "C"
453460
]
454461
).with_all_eager(3),
@@ -573,7 +580,7 @@ def test_auto_materialize_sensor_transition():
573580
instigator_type=InstigatorType.SENSOR
574581
)
575582

576-
assert len(sensor_states) == 3 # sensor states for each sensor were created
583+
assert len(sensor_states) == 4 # sensor states for each sensor were created
577584

578585
# Only sensor that was set with default status RUNNING turned on and ran
579586
_assert_sensor_state(
@@ -594,16 +601,33 @@ def test_auto_materialize_sensor_transition():
594601
expected_num_ticks=1,
595602
expected_status=InstigatorStatus.RUNNING,
596603
)
604+
_assert_sensor_state(
605+
instance,
606+
"auto_materialize_sensor_no_assets",
607+
expected_num_ticks=0,
608+
expected_status=InstigatorStatus.RUNNING,
609+
)
610+
611+
asset_graph = daemon_sensor_scenario.initial_spec.asset_graph
612+
defs = daemon_sensor_scenario.initial_spec.defs
597613

598614
for sensor_state in sensor_states:
599-
# cursor was propagated to each sensor, so all subsequent evaluation IDs are higher
600-
assert (
601-
asset_daemon_cursor_from_instigator_serialized_cursor(
615+
if sensor_state.instigator_name == "auto_materialize_sensor_no_assets":
616+
assert cast("SensorInstigatorData", sensor_state.instigator_data).cursor is None
617+
else:
618+
asset_daemon_cursor = asset_daemon_cursor_from_instigator_serialized_cursor(
602619
cast("SensorInstigatorData", sensor_state.instigator_data).cursor,
603620
None,
604-
).evaluation_id
605-
> pre_sensor_evaluation_id
606-
)
621+
)
622+
623+
assert asset_daemon_cursor.evaluation_id > pre_sensor_evaluation_id
624+
625+
assert (
626+
asset_daemon_cursor.previous_condition_cursors_by_key.keys()
627+
== check.not_none(
628+
defs.get_sensor_def(sensor_state.instigator_name).asset_selection
629+
).resolve(asset_graph)
630+
)
607631

608632

609633
# this scenario simulates the true default case in which we have an implicit default sensor
@@ -737,7 +761,7 @@ def test_auto_materialize_sensor_ticks(num_threads):
737761
instigator_type=InstigatorType.SENSOR
738762
)
739763

740-
assert len(sensor_states) == 3 # sensor states for each sensor were created
764+
assert len(sensor_states) == 4 # sensor states for each sensor were created
741765

742766
# Only sensor that was set with default status RUNNING turned on and ran
743767
_assert_sensor_state(
@@ -778,7 +802,7 @@ def test_auto_materialize_sensor_ticks(num_threads):
778802
sensor_states = instance.schedule_storage.all_instigator_state( # pyright: ignore[reportOptionalMemberAccess]
779803
instigator_type=InstigatorType.SENSOR
780804
)
781-
assert len(sensor_states) == 3
805+
assert len(sensor_states) == 4
782806

783807
# No new tick yet for A since only 15 seconds have passed
784808
_assert_sensor_state(
@@ -809,7 +833,7 @@ def test_auto_materialize_sensor_ticks(num_threads):
809833
instigator_type=InstigatorType.SENSOR
810834
)
811835

812-
assert len(sensor_states) == 3
836+
assert len(sensor_states) == 4
813837
_assert_sensor_state(
814838
instance,
815839
"auto_materialize_sensor_a",

0 commit comments

Comments
 (0)