Apache Airflow: how to add task to DAG without changing dag file?

Create wrapper

Define example DAG

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# here must be your path, of course, I use docker, and for simplest - main airflow folder
file_path = "/usr/local/airflow/target.txt"

with DAG(dag_id="Create_file_DAG", schedule_interval=None,
start_date=datetime(2019, 10, 29)) as dag:
BashOperator(task_id="create_file", bash_command=f"touch {file_path}")
simple airflow DAG

Create test tasks

# apache airflow DAG
from os import path
from copy import copy
from airflow.operators.python_operator import PythonOperator
from bash_dag import dag
# here must be your path, of course, I use docker, and for simplest - main airflow folder
file_path = "/usr/local/airflow/target.txt"

validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_name))

# also exist dag.add_tasks to pass list of tasks
dag.add_task(validation_task_file_name)
task in ‘removed’ state

Copy production DAG

# apache airflow DAG
from os import path
from copy import copy
from airflow.operators.python_operator import PythonOperator
from bash_dag import dag

file_path = "/usr/local/airflow/target.txt"

test_dag = copy(dag)
# we overwrite DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_path))

# also exists method add_tasks to pass list of tasks
test_dag.add_task(validation_task_file_name)
test DAG duplicated from production
error after duplicate DAG object
test_dag = copy(dag)
# we overwrite DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
# we added only this 2 lines, all old code the same
for task in test_dag.task_dict:
test_dag.task_dict[task]._dag = test_dag
....
success test DAG
... 
create_file_task = BashOperator(task_id="create_file",
bash_command=f"touch {file_path}")
dummy_one = DummyOperator(task_id="dummy_one")
dummy_two = DummyOperator(task_id="dummy_two")
dummy_three = DummyOperator(task_id="dummy_three")
dummy_one >> dummy_two >> dummy_three >> create_file_task

Depend on last tasks in production DAG

# apache airflow DAG
from os import path
from copy import copy
from airflow.operators.python_operator import PythonOperator
from bash_dag import dag

file_path = "/usr/local/airflow/target.txt"

test_dag = copy(dag)
# we overwrite DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
for task in test_dag.task_dict:
test_dag.task_dict[task]._dag = test_dag
last_tasks_list.append(task) if not test_dag.task_dict[task].downstream_task_ids else None
validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_path))

# also exists method add_tasks to pass list of tasks
test_dag.add_task(validation_task_file_name)
# set last tasks of production DAG to downstream our validation tasks
last_tasks_list >> validation_task_file_name
error
... 
validate_task_id = "validate_file_exists"
validation_task_file_name = PythonOperator(task_id=validate_task_id,
python_callable=lambda: path.exists(file_path))

# also exists method add_tasks to pass list of tasks
test_dag.add_task(validation_task_file_name)
# set last tasks of production DAG to downstream our validation tasks
validation_task_in_dag_file_name = test_dag.get_task(validate_task_id)
# pay attention, we change this line
last_tasks_list >> validation_task_in_dag_file_name
correct test workflow

Create separate module for wrapper and re-use it

from copy import copy


def create_test_dag(production_dag, tasks_to_add):

test_dag = copy(production_dag)
# we overwrite airflow DAG name
test_dag._dag_id = test_dag._dag_id + "_test"
last_tasks_list = []
for task in test_dag.task_dict:
test_dag.task_dict[task]._dag = test_dag
last_tasks_list.append(task) if not test_dag.task_dict[task].downstream_task_ids else None
for task in tasks_to_add:
test_dag.log.info('Adding task: %s', tasks_to_add)
# also exists method add_tasks to pass list of tasks
if task.task_id not in test_dag.task_dict:
# if not added previous to avoid errors
test_dag.add_task(task)
# set last tasks of production DAG to downstream our validation tasks
task_in_dag = test_dag.get_task(task.task_id)
[test_dag.get_task(task) for task in last_tasks_list] >> task_in_dag

return test_dag
# apache airflow DAG
from os import path
from airflow.operators.python_operator import PythonOperator
from test_dag_wrapper import create_test_dag
from bash_dag import dag

file_path = "/usr/local/airflow/target.txt"

validation_task_file_name = PythonOperator(task_id="validate_file_exists",
python_callable=lambda: path.exists(file_path))
# create test DAG using wrapper
test_dag = create_test_dag(dag, [validation_task_file_name])
globals()[test_dag._dag_id] = test_dag

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store