CC Open Source Blog

Apache Airflow testing with Pytest

author's gravatar

by Brent Moran on 2020-01-23

CC Catalog is a project that gathers information about images from around the internet, and stores the information so that these images can eventually be indexed in CC Search. A portion of the process is directed by Apache Airflow, which is a tool commonly used to organize workflows and data pipelines.

The nature of Airflow leads to some particular challenges when it comes to testing, and special care must be taken to make tests independent from the global state of the system where they are run. This blog post will describe a few of the challenges we faced when writing tests for Airflow jobs, and some tricks we used to solve those challenges.

Brief description of Apache Airflow

Apache Airflow is an open source piece of software that loads Directed Acyclic Graphs (DAGs) defined via python files. The DAG is what defines a given workflow. The nodes are pieces of jobs that need to be accomplished, and the directed edges of the graph define dependencies between the various pieces. By default, the Airflow daemon only looks for DAGs to load from a global location in the user's home folder: ~/airflow/dags/. When a DAG is 'run', i.e., the tasks defined by the nodes of the DAG are each performed in the order defined by the directed edges of the DAG, the Airflow daemon stores information about the dag run in ~/airflow/. The daemon also stores general information about what DAGs exist on the system, and all of their current statuses in that directory. For more details, please see the documentation

Challenge: Localize Airflow to the project directory

Even when installed using pip within a virtualenv environment, all airflow commands will be run against the default locations in the user's home directory. In particular, if you want to test a DAG from your project directory, the method given in the Airflow documentation is to copy the dag into the default location ~/airflow/dags/, and use the command-line airflow tool to run the tasks defined by the nodes. The information about success and failure of the tests will be stored by the Airflow daemon in the ~/airflow/ directory. We'd rather keep all input and output from our tests to the project directory instead. This helps avoid any side effects which might arise by running tests for different projects, and also ensures that tests can't affect anything in the default directory, which may be used for production in many cases.

The solution is to choose a directory in your project, and set the environment variable $AIRFLOW_HOME whenever you run the tests, or use the airflow command on the project DAGs. I recommend you add the command

export AIRFLOW_HOME=/your/desired/full/path/

to a script (ours is called env.sh) that will be run in any shell dealing with the 'localized' Airflow instance, because forgetting to set the variable for even one airflow command will corrupt the DAG states stored in the global area. Note that setting this variable is necessary even when running in a virtualenv environment.

Now that you have $AIRFLOW_HOME set, you'll likely want to load some DAGs that you've written. This is made easier if you put the files defining them into a dags directory in the directory denoted by $AIRFLOW_HOME. I.e., it's wise to structure the project sub-directory dealing with Airflow and Airflow DAGs similarly to the default location, but in your project directory. At this point, you should have some $AIRFLOW_HOME directory as a subdirectory of your project directory, and then some $AIRFLOW_HOME/dags directory, where you keep any python files defining Airflow DAGs, and their dependencies. Another advantage of this structure is it's likely the directory structure you'll use in production, and replicating simplifies deployment.

Finally, Airflow will leave a number of files in the $AIRFLOW_HOME directory which you are not likely to want to track in source control (e.g., git). These files are:

Add these files to .gitignore or the equivalent.

Smoketesting: Can the Airflow daemon load the DAGs?

Note that we're using pytest for our unit testing, and so most examples assume this.

The most basic test you'll want is to determine whether your DAGs can load without errors. To do this, you can use the following function:

from airflow.models import DagBag

def test_dags_load_with_no_errors():
    dag_bag = DagBag(include_examples=False)
    dag_bag.process_file('common_api_workflows.py')
    assert len(dag_bag.import_errors) == 0

We initialize a DagBag (this loads DAG files). With the process_file method, we instruct the Airflow daemon to attempt to load any DAGs defined in the common_api_workflows.py file. We then check to make sure loading the DAGs didn't produce any errors.

Hint: Use functions to create DAGs.

This will increase testability. You can test the function, bypassing the need to load the DAG into the DagBag (except when you're actually testing that it can be loaded). This may seem obvious, but none of the Airflow documentation uses this pattern. Here is an example of a function that creates a simple dag, and a test of the function:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

def create_dag(
        source,
        script_location,
        dag_id,
        crontab_str=None,
        default_args=DAG_DEFAULT_ARGS):

    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        schedule_interval=crontab_str,
        catchup=False
    )

    with dag:
        start_task = BashOperator(
            task_id='{}_{}'.format(source, status),
            bash_command='echo Starting {} workflow'.format(status),
            dag=dag
        )

        run_task =  BashOperator(
            task_id='get_{}_images'.format(source),
            bash_command='python {} --mode default'.format(script_location),
            dag=dag
        )

        start_task >> run_task

    return dag

def test_create_dag_creates_correct_dependencies():
    dag = create_dag(
        'test_source',
        'test_script_location',
        'test_dag_id'
    )
    start_id = 'test_source_starting'
    run_id = 'get_test_source_images'
    start_task = dag.get_task(start_id)
    assert start_task.upstream_task_ids == set()
    assert start_task.downstream_task_ids == set([run_id])
    run_task = dag.get_task(run_id)
    assert run_task.upstream_task_ids == set([start_id])
    assert run_task.downstream_task_ids == set([])

Here, we assume that DAG_DEFAULT_ARGS is defined earlier in the file. See the Airflow documentation for details about default DAG arguments. Now, this function is testable (great!) but it doesn't acutally make the DAG it creates known to the Airflow daemon. To do that, we have to create the created dag into the global scope of the module defined by the file, which can be done with the following snippet:

globals()[dag_id] = create_dag(
    source,
    script_location,
    dag_id
)

Here, it's assumed that source, script_location, and dag_id are defined earlier in the python file.

We hope that these hints are helpful to the reader. For more, and for the context around the snippets shown here, please take a look at the repo.