Skip to content
Advertisement

Task in same airflow DAG starts before previous task is committed

We have a DAG that as first task aggregates a table (A) into a staging table (B). After that there is a task that reads from the staging table (B), and writes to another table (C).

However, the second task reads from the aggregated table (B) before it has been fully updated, which causes table C to have old data or sometimes it is empty. Airflow still logs everything as successful.

Updating table B is done as (pseudo):
delete all rows;
insert into table b
select xxxx from table A;
Task Concurrency is set as 10 
pool size: 5
max_overflow: 10
Using local executor 

Redshift seems to have a commit queue. Could it be that redshift tells airflow it has committed when the commit is in fact still in a queue, and the next task thus reads before the real commit takes place?

We have tried wrapping the update of table B in a transaction as (pseudo):

begin
delete all rows;
insert into table b
select xxxx from table A;
commit;

But even that does not work. For some reason airflow manages starting the second task before the first task is not fully committed.

UPDATE

It turned out there was a mistake in the dependencies. Downstream tasks were waiting for incorrect task to finish.

For future reference, never be 100 % sure you have checked everything. Check and recheck the whole flow.

Advertisement

Answer

You can achieve this goal by setting wait_for_downstream to True.

From https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html :

when set to true, an instance of task X will wait for tasks immediately downstream of the previous instance of task X to finish successfully before it runs.

You can set this parameter at the default_dag_args level or at the tasks (operators) level.

default_dag_args = {
   'wait_for_downstream': True,
}
User contributions licensed under: CC BY-SA
5 People found this is helpful
Advertisement