Apache Airflow: how to add task to DAG without changing dag file?

Iuliia Volkova
7 min readOct 29, 2019

--

This article also can be called: “How to copy dynamically your DAG and add additional task to it?” I described all step-by-step so this article pretty big. If you want the end result — you can scroll down to the article and get the code sources.

When you create Airflow DAG you usually want to have tests on it. You must be sure that workflow finished successfully and all manipulations was done correct.

One of the possible ways: is to run end-2-end pipeline in test/dev cluster and here we got 2 things:

1) You must tests the same 1-to-1 DAG that you use in production (of course, with different params relative to environment)

2) usually you don’t want to have validation tasks in production pipeline.

Let’s take a look at an example. We have some ETL process in Apache Airflow and we migrate data from, for example, Google Storage Buckets to BigQuery and PostgreSQL. We run DAG hourly and we know that our data may not come each hour (so we can have data for 15, 16, 17 hours, when skips and again we have data for 20 and 21 hour), so if buckets are not exist — we skip DAGRun.

Our pipeline will looks like this:

Check that bucket for hour exist → if exist run tasks → [insert data to BigQuery, insert data to PostgreSQL

How we can create end-2-end test for DAG with guarantee of testing same DAG as will go in the production?

Create wrapper

Very simple! We will create wrapper, that will import your production DAG and at the end of it add validations tasks.

So we guarantee that we test the same DAG because we import it and not modify anything in this DAG, just add validation tasks at the end of it.

Let’s take simple DAG for example.

Our ‘production’ DAG will contain only one task — BashOperator that creates file on the disk. Our validation tasks will check that file was really created with expected name.

Pay attention, that this is only for example and will work only with SequentialExecutor, LocalExecutor or CeleryExecutor with one worker!!!

Define example DAG

First, define our ‘production’ DAG. Let’s create file bash_dag.py and define a DAG:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# here must be your path, of course, I use docker, and for simplest - main airflow folder
file_path = "/usr/local/airflow/target.txt"

with DAG(dag_id="Create_file_DAG", schedule_interval=None,
start_date=datetime(2019, 10, 29)) as dag:
BashOperator(task_id="create_file", bash_command=f"touch {file_path}")

Run UI, run DAG and we will see:

simple airflow DAG

Great. Now let’s create our ‘test wrapper’.

Create test tasks

Create file test_tasks.py. And define a task to validate file name, we will use PythonOperator for this.

# apache airflow DAG
from os import path
from copy import copy
from airflow.operators.python_operator import PythonOperator
from bash_dag import dag
# here must be your path, of course, I use docker, and for simplest - main airflow folder
file_path = "/usr/local/airflow/target.txt"

validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_name))

# also exist dag.add_tasks to pass list of tasks
dag.add_task(validation_task_file_name)

Pay attention to “# apache airflow DAG” if you will not have 2 words — airflow and DAG in your file, this file will be not parsed by Airflow.

Each DAG object has method “add_task” and “add_tasks” to manual adding tasks to DAG object from different places (without use ‘dag’ attribute inside task and without defining task in context).

In this example, we of course, can pass dag attribute, but we want to re-use this test task on different DAGs later.

Now open UI and check the result.

task in ‘removed’ state

And you will see this. Why? Because, you have 2 files, that parsed by Airflow and overwrite each other. So in first file bash_dag.py there is no any ‘validate_file_exists’ task and it will showed up only after next file parse, but then Airflow will read again bash_dag.py and there is no this task again and etc. What we need to do?

Copy production DAG

Change our test_tasks.py. We will copy ‘DAG’ object and overwrite name on it, so now we have 2 separate DAGs, but we still have it 1–to-1 as in production, we just change name and add tasks at the end.

# apache airflow DAG
from os import path
from copy import copy
from airflow.operators.python_operator import PythonOperator
from bash_dag import dag

file_path = "/usr/local/airflow/target.txt"

test_dag = copy(dag)
# we overwrite DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_path))

# also exists method add_tasks to pass list of tasks
test_dag.add_task(validation_task_file_name)

Now, you enter UI and see 2 DAGs

test DAG duplicated from production

Let’s run the DAG and take a look into console. You will see an error.

error after duplicate DAG object

Why? Because you duplicate a DAG, change it name, but tasks, that was in you production DAG still has old DAG in _dag attribute.

We need to change DAG from old to new test in existed task instances.

Let’s modify our test_tasks.py.

test_dag = copy(dag)
# we overwrite DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
# we added only this 2 lines, all old code the same
for task in test_dag.task_dict:
test_dag.task_dict[task]._dag = test_dag
....

Now, check UI:

success test DAG

Great! Now all worked pretty well. But, stop. We have 2 tasks in parallel. We forgot about dependencies!

Of course our validations task must start only after full DAG is finished.

Let’s add some more changes. We need to be sure, that order of task correct and we depend on the last one of production DAG. In our DAG example impossible to check it, so let’s modify our production DAG, first let’s add 3 DummyTasks before our file creation task, to be sure, that we catch last one. This is how now our tasks in bash_dag.py will be looks:

... 
create_file_task = BashOperator(task_id="create_file",
bash_command=f"touch {file_path}")
dummy_one = DummyOperator(task_id="dummy_one")
dummy_two = DummyOperator(task_id="dummy_two")
dummy_three = DummyOperator(task_id="dummy_three")
dummy_one >> dummy_two >> dummy_three >> create_file_task

Depend on last tasks in production DAG

Nice. Now, modify our test_tasks.py. We need to knit our tests task to latest in production DAG. So, ‘latest’ for us is a tasks, that not downstream anything in production DAG (we can have several parallel paths in DAG and need to wait them all). To find does task have downstream or not you can in tasks argument ‘downstream_task_ids’. Let’s add code changes. Now our test_tasks.py will looks like:

# apache airflow DAG
from os import path
from copy import copy
from airflow.operators.python_operator import PythonOperator
from bash_dag import dag

file_path = "/usr/local/airflow/target.txt"

test_dag = copy(dag)
# we overwrite DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
for task in test_dag.task_dict:
test_dag.task_dict[task]._dag = test_dag
last_tasks_list.append(task) if not test_dag.task_dict[task].downstream_task_ids else None
validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_path))

# also exists method add_tasks to pass list of tasks
test_dag.add_task(validation_task_file_name)
# set last tasks of production DAG to downstream our validation tasks
last_tasks_list >> validation_task_file_name

Time to trigger DAG, check scheduler log and see an error :) I know you like it!

error

What is this? Very simple! I will explain you! validation_task_file_name object did not have DAG, because we add it to DAG with add_task and it became a part of a DAG, but in our validation_task_file_name objects still no ‘DAG’ argument so we cannot upstream it. What to do? Return to the start and define DAG argument manually in operator? No way! It’s too easy for us! Let’s get the correct task from DAG object. We will use for this method ‘get_task’ from DAG. Add some changes to our code:

... 
validate_task_id = "validate_file_exists"
validation_task_file_name = PythonOperator(task_id=validate_task_id,
python_callable=lambda: path.exists(file_path))

# also exists method add_tasks to pass list of tasks
test_dag.add_task(validation_task_file_name)
# set last tasks of production DAG to downstream our validation tasks
validation_task_in_dag_file_name = test_dag.get_task(validate_task_id)
# pay attention, we change this line
last_tasks_list >> validation_task_in_dag_file_name

Nice. Now trigger DAG and have:

correct test workflow

Great! We just create wrapper that add to our production DAG tests end-2-end tasks.

Create separate module for wrapper and re-use it

Now make in more universal, to get possible re-use it. Create test_dag_wrapper.py — it will be module, that contain our universal function to add to all DAGs our test tasks. Move all logic inside function. We also will pass as input list of tasks, to get possible add to DAG several tasks. So you will have:

from copy import copy


def create_test_dag(production_dag, tasks_to_add):

test_dag = copy(production_dag)
# we overwrite airflow DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
last_tasks_list = []
for task in test_dag.task_dict:
test_dag.task_dict[task]._dag = test_dag
last_tasks_list.append(task) if not test_dag.task_dict[task].downstream_task_ids else None
for task in tasks_to_add:
test_dag.log.info('Adding task: %s', tasks_to_add)
# also exists method add_tasks to pass list of tasks
if task.task_id not in test_dag.task_dict:
# if not added previous to avoid errors
test_dag.add_task(task)
# set last tasks of production DAG to downstream our validation tasks
task_in_dag = test_dag.get_task(task.task_id)
[test_dag.get_task(task) for task in last_tasks_list] >> task_in_dag

return test_dag

Now, modify your test_tasks.py to use our wrapper from new module.

# apache airflow DAG
from os import path
from airflow.operators.python_operator import PythonOperator
from test_dag_wrapper import create_test_dag
from bash_dag import dag

file_path = "/usr/local/airflow/target.txt"

validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_path))
# create test DAG using wrapper
test_dag = create_test_dag(dag, [validation_task_file_name])
globals()[test_dag._dag_id] = test_dag

That’s it! You got wrapper to import your production DAGs and add test/validation tasks to them. Enjoy!

Source code: https://github.com/xnuinside/airflow_examples/tree/master/wrapper_add_test_tasks_to_dag

--

--

No responses yet