-
|
I am trying to persist state in-between sensor pokes of a sensor in @task.sensor(
poke_interval=timedelta(minutes=5), timeout=timedelta(minutes=120), mode="reschedule"
)
def wait_for_file_count_to_not_increase(ti: RuntimeTaskInstanceProtocol)
previous_file_count = ti.xcom_pull(key="file_count", default=0)
# Code to get the current file count
current_file_count = 100
if previous_file_count == current_file_count:
return True
ti.xcom_push(key="file_count", value=current_file_count)
return False |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
|
Seems like this is not possible and I need to use Variables or Files instead. |
Beta Was this translation helpful? Give feedback.
-
|
Also - there is a discussion about the need for persisting state in Airflow - which might lead to adding such a feature https://lists.apache.org/thread/vftpzrwb34xr2xbfsx7qtbxn5w6h3f2b - so feel free to take part in that discussion. |
Beta Was this translation helpful? Give feedback.
Seems like this is not possible and I need to use Variables or Files instead.
I now came to this conclusion after I found this SO post:
https://stackoverflow.com/questions/67258269/use-xcom-pull-to-pull-a-keys-value-that-same-task-pushed-airflow