CC Catalog is a project that gathers information about images from around the internet, and stores the information so that these images can eventually be indexed in CC Search. A portion of the process is directed by Apache Airflow, which is a tool commonly used to organize workflows and data pipelines.
The nature of Airflow leads to some particular challenges when it comes to testing, and special care must be taken to make tests independent from the global state of the system where they are run. This blog post will describe a few of the challenges we faced when writing tests for Airflow jobs, and some tricks we used to solve those challenges.
Brief description of Apache Airflow
Apache Airflow is an open source piece of software that loads Directed Acyclic
Graphs (DAGs) defined via python files. The DAG is what defines a given
workflow. The nodes are pieces of jobs that need to be accomplished, and the
directed edges of the graph define dependencies between the various pieces. By
default, the Airflow daemon only looks for DAGs to load from a global location
in the user's home folder: ~/airflow/dags/
. When a DAG is 'run', i.e., the
tasks defined by the nodes of the DAG are each performed in the order defined by
the directed edges of the DAG, the Airflow daemon stores information about the
dag run in ~/airflow/
. The daemon also stores general information about what
DAGs exist on the system, and all of their current statuses in that directory.
For more details, please see the documentation
Challenge: Localize Airflow to the project directory
Even when installed using pip
within a virtualenv
environment,
all airflow commands will be run against the default locations in the user's
home directory. In particular, if you want to test a DAG from your project
directory, the method given in the Airflow documentation is to
copy the dag into the default location ~/airflow/dags/
, and use the
command-line airflow
tool to run the tasks defined by the nodes. The
information about success and failure of the tests will be stored by the Airflow
daemon in the ~/airflow/
directory. We'd rather keep all input and output
from our tests to the project directory instead. This helps avoid any side
effects which might arise by running tests for different projects, and also
ensures that tests can't affect anything in the default directory, which may be
used for production in many cases.
The solution is to choose a directory in your project, and set the environment
variable $AIRFLOW_HOME
whenever you run the tests, or use the airflow
command on the project DAGs. I recommend you add the command
export AIRFLOW_HOME=/your/desired/full/path/
to a script (ours is called env.sh
) that will be run in any shell dealing with
the 'localized' Airflow instance, because forgetting to set the variable for
even one airflow
command will corrupt the DAG states stored in the global
area. Note that setting this variable is necessary even when running in a
virtualenv
environment.
Now that you have $AIRFLOW_HOME
set, you'll likely want to load some DAGs that
you've written. This is made easier if you put the files defining them into a
dags
directory in the directory denoted by $AIRFLOW_HOME
. I.e., it's wise
to structure the project sub-directory dealing with Airflow and Airflow DAGs
similarly to the default location, but in your project directory. At this
point, you should have some $AIRFLOW_HOME
directory as a subdirectory of your
project directory, and then some $AIRFLOW_HOME/dags
directory, where you keep
any python files defining Airflow DAGs, and their dependencies. Another
advantage of this structure is it's likely the directory structure you'll use in
production, and replicating simplifies deployment.
Finally, Airflow will leave a number of files in the $AIRFLOW_HOME
directory
which you are not likely to want to track in source control (e.g., git
).
These files are:
$AIRFLOW_HOME/airflow.cfg
$AIRFLOW_HOME/airflow.db
$AIRFLOW_HOME/logs/
$AIRFLOW_HOME/unittests.cfg
Add these files to .gitignore
or the equivalent.
Smoketesting: Can the Airflow daemon load the DAGs?
Note that we're using pytest
for our unit testing, and so most examples assume
this.
The most basic test you'll want is to determine whether your DAGs can load without errors. To do this, you can use the following function:
from airflow.models import DagBag def test_dags_load_with_no_errors(): dag_bag = DagBag(include_examples=False) dag_bag.process_file('common_api_workflows.py') assert len(dag_bag.import_errors) == 0
We initialize a DagBag
(this loads DAG files). With the process_file
method,
we instruct the Airflow daemon to attempt to load any DAGs defined in the
common_api_workflows.py
file. We then check to make sure loading the DAGs
didn't produce any errors.
Hint: Use functions to create DAGs.
This will increase testability. You can test the function, bypassing the need to
load the DAG into the DagBag
(except when you're actually testing that it
can be loaded). This may seem obvious, but none of the Airflow documentation
uses this pattern. Here is an example of a function that creates a simple dag,
and a test of the function:
from airflow import DAG from airflow.operators.bash_operator import BashOperator def create_dag( source, script_location, dag_id, crontab_str=None, default_args=DAG_DEFAULT_ARGS): dag = DAG( dag_id=dag_id, default_args=default_args, schedule_interval=crontab_str, catchup=False ) with dag: start_task = BashOperator( task_id='{}_{}'.format(source, status), bash_command='echo Starting {} workflow'.format(status), dag=dag ) run_task = BashOperator( task_id='get_{}_images'.format(source), bash_command='python {} --mode default'.format(script_location), dag=dag ) start_task >> run_task return dag def test_create_dag_creates_correct_dependencies(): dag = create_dag( 'test_source', 'test_script_location', 'test_dag_id' ) start_id = 'test_source_starting' run_id = 'get_test_source_images' start_task = dag.get_task(start_id) assert start_task.upstream_task_ids == set() assert start_task.downstream_task_ids == set([run_id]) run_task = dag.get_task(run_id) assert run_task.upstream_task_ids == set([start_id]) assert run_task.downstream_task_ids == set([])
Here, we assume that DAG_DEFAULT_ARGS
is defined earlier in the file. See the
Airflow documentation for details about default DAG arguments. Now, this
function is testable (great!) but it doesn't acutally make the DAG it creates
known to the Airflow daemon. To do that, we have to create the created dag into
the global scope of the module defined by the file, which can be done with the
following snippet:
globals()[dag_id] = create_dag( source, script_location, dag_id )
Here, it's assumed that source
, script_location
, and dag_id
are defined
earlier in the python file.
We hope that these hints are helpful to the reader. For more, and for the context around the snippets shown here, please take a look at the repo.