We have a DAG that as first task aggregates a table (A) into a staging table (B). After that there is a task that reads from the staging table (B), and writes to another table (C).
However, the second task reads from the aggregated table (B) before it has been fully updated, which causes table C to have old data or sometimes it is empty. Airflow still logs everything as successful.
Updating table B is done as (pseudo): delete all rows; insert into table b select xxxx from table A;
Task Concurrency is set as 10 pool size: 5 max_overflow: 10 Using local executor
Redshift seems to have a commit queue. Could it be that redshift tells airflow it has committed when the commit is in fact still in a queue, and the next task thus reads before the real commit takes place?
We have tried wrapping the update of table B in a transaction as (pseudo):
begin delete all rows; insert into table b select xxxx from table A; commit;
But even that does not work. For some reason airflow manages starting the second task before the first task is not fully committed.
UPDATE
It turned out there was a mistake in the dependencies. Downstream tasks were waiting for incorrect task to finish.
For future reference, never be 100 % sure you have checked everything. Check and recheck the whole flow.
Advertisement
Answer
You can achieve this goal by setting wait_for_downstream
to True
.
From https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html :
when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs.
You can set this parameter at the default_dag_args level or at the tasks (operators) level.
default_dag_args = { 'wait_for_downstream': True, }