Four lessons from managing a company wide Airflow plugin

At my current company (Carpe Data, we are hiring!) , one of my tasks is to maintain an internal Airflow plugin (common-airflow-utils) with Airflow related utilities. The library is used by 5 different teams, powering a big part of our production worfklows. and it provides higher abstractions on top of the airflow API, with the goals of standardizing Airflow practices, as well as making dag writing much easier, particularly for those teams where Python expertise is lacking.

For example, for a regular dag owner (the person writing DAG code), instead of launching an emr job via an EmrCreateJobFlowOperator they can just call the common utilities function create_emr_job_flow(*args, **kwargs).

This library has been 1 year in production, and there are a few things that I have learned from building and maintaining it:

1.Taskflow api is great, but not for internal library functions

When I joined Carpe, there was already some Airflow code dangling around. It wasn't great, but since my goal was to set up new Airflow libraries with (hopefully) better standards, I tried to keep existing code whenever possible.

This meant keeping some internal airflow plugin functions that followed the Airflow Taskflow API. In a nutshell, the Airflow Taskflow API is a new way (for Airflow 2.0.0 or higher) to define operators, using standard python functions instead of the class based operators that were used before Taskflow API was released.


At my previous company, we were using Airflow < 2.0.0, and that meant I was not used to using the taskflow API.  When I saw the ease of use to define operators using regular python functions, I was hooked. So much easier to use! So elegant! its just python with a magical @taskdecorator!.

So I released the first version of the common utils keeping some of the legacy code that used the taskflow API to build internal operators.  

In retrospective, this wasn't a great idea.

A year has passed, and this library has grown, not only the number of operators it contains, but also in the number of teams who have adopted it as well as the number of people contributing to it.

Recently, we have had some major issues with the library, and one of the main reasons is the choice of taskflow api for internal custom operators.

I will explain this with an example.

Lets assume we have a very simple dag, that performs the following steps:
-  1 . Run a sql query in snowflake, via a library call to snowflake_operator
- 2. Decide based on the output of that query, whether to run the next step
- 3. if the branch on 2) is true, then we want to print the output of the result of 1).

Here is how our snowflake_operator looks like inside the common-airflow-utils library. We use AWS ECS to run our operators inside containers (more on that later):

def snowflake_operator(
    task_id: str,
    sql_query: str,
):
    """
    Runs the ECS snowflake job
    """

    @task(multiple_outputs=True)
    def _setup_operator_args(
        sql_query,
    ):
        """
        Function that evaluates the lazy airflow parameters so they can be used as regulard arguments downstream.
        """
        return {
            "sql_query": sql_query,
        }

    @task
    def _setup_ecs_command(
        sql_query,
    ):
        """
        Generates the full ECS docker command.
        """
        command = ["--sql_query", sql_query]
        return command

    with TaskGroup(group_id=task_id) as task_group:
        args = _setup_operator_args(
            sql_query=sql_query,
        )

        command = _setup_ecs_command.override(trigger_rule=trigger_rule)(
            sql_query=args['sql_query'],
        )

        run_ecs_operator.override(task_id='run_snowflake_query',
                                  )(
                                      task_id=task_id,
                                      container_name="snowflake",
                                      command=command,
                                  )

    return task_group




And here is how our dag would look like to use the snowflake operator.


from airflow.decorators import task

from commons_airflow_utils.dag import DAG
from commons_airflow_utils.operators.snowflake.snowflake_secretsmanager import snowflake_operator

DAG_ID = "super_dag"

with DAG(
    dag_id=DAG_ID,
    doc_md=__doc__
) as dag:
    # this operator queries snowflake and returns randomly a 1 or a 2
    run_snowflake_1 = snowflake_operator(
        task_id='run_snowflake_1',
        sql_query="""
                WITH arr AS (SELECT array_construct(1, 2) arr),
                number_selection AS (SELECT arr[ABS(MOD(RANDOM(), array_size(arr)))] number FROM arr)
                SELECT number FROM number_selection;
        """
    )

    @task.branch
    def choose_run_next_step(number_selection):
        if number_selection == '2':
            return 'run_snowflake_2'
        else:
            return 'skip'

    @task
    def skip():
        print("SKIPPING")

    # snowflake python output has a weird format
    run_snowflake_2 = snowflake_operator(
        task_id='run_snowflake_2',
        sql_query="""
                SELECT {{ task_instance.xcom_pull(task_ids='run_snowflake_1')[0]['result'][0][0] }} + 1;
        """
    )

    @task
    def print_snowflake_2_result(result):
        print(result)

    run_snowflake_1 >> [skip(), run_snowflake_2] >> print_snowflake_2_result(run_snowflake_2)

The dag looks like this on airflow:

Very simple so far.

Now let's imagine that the business logic changes, and we decide we want to change the workflow a bit:

-  1 . Run a sql query in snowflake, via a library call to snowflake_operator
- 2. Add one to the output of 1) (*product decision!*)
- 3. Decide based on the output of that query, whether to run the next step
- 4. if the branch on 3) is equal to `2` , then we want to print the output of the result of 2).


No problem, we just have to update our DAG to add the step:

   # this operator queries snowflake and returns randomly a 1 or a 2
    run_snowflake_1 = snowflake_operator(
        task_id='run_snowflake_1',
        sql_query="""
                WITH arr AS (SELECT array_construct(1, 2) arr),
                number_selection AS (SELECT arr[ABS(MOD(RANDOM(), array_size(arr)))] number FROM arr)
                SELECT number FROM number_selection;
        """
    )

    @task
    def add_one_to_snowflake_1_result(snowflake_result):
        print(snowflake_result)
        return snowflake_result[0]['result'][0][0] + 1

    snowflake_1_output_plus_one = add_one_to_snowflake_1_result(run_snowflake_1)

    @task.branch
    def choose_run_next_step(number_selection):
        if number_selection == '2':
            return 'run_snowflake_2'
        else:
            return 'skip'

    @task
    def skip():
        print("SKIPPING")



    @task
    def print_snowflake_result(result):
        print(result)


    run_snowflake_2 = snowflake_operator(
        task_id='run_snowflake_2',
        sql_query="""
                SELECT {{ task_instance.xcom_pull(task_ids='snowflake_1_output_plus_one') }} + 1;
        """
    )

    print_snowflake_2_result = print_snowflake_result(run_snowflake_2)

    run_snowflake_1 >> snowflake_1_output_plus_one >> [skip(), run_snowflake_2] >> print_snowflake_2_result

we go to test the new update to the dag , and we get an error, PANIC!!

We see our dag failing, why?

Well, the error message is clear:

Sadness


Since the function snowflake_operator returns a task group, task groups have no output since they are not tasks per se. Task groups are not lazily evaluated like tasks are, so you cant really use {{}} params, or xcoms inside them (only inside the internal tasks).

No problem, you think, let's fix that. We can just instead of returning the taskgroup, we can return the output of the last step inside the task, since the operator essentially runs a docker ecs command and its output is the only thing we care about. That way downstream tasks can easily interact with the output of the snowflake_operator.

Here is how the snowflake_operator looks like with that slight modification:

def snowflake_operator(
    task_id: str,
    sql_query: str,
):
    """
    Runs the ECS snowflake job
    """

    @task(multiple_outputs=True)
    def _setup_operator_args(
        sql_query,
    ):
        """
        Function that evaluates the lazy airflow parameters so they can be used as regulard arguments downstream.
        """
        return {
            "sql_query": sql_query,
        }

    @task
    def _setup_ecs_command(
        sql_query,
    ):
        """
        Generates the full ECS docker command.
        """
        command = ["--sql_query", sql_query]
        return command

    with TaskGroup(group_id=task_id) as task_group:
        args = _setup_operator_args(
            sql_query=sql_query,
        )

        command = _setup_ecs_command.override(trigger_rule=trigger_rule)(
            sql_query=args['sql_query'],
        )

        ecs_output = run_ecs_operator.override(task_id='run_snowflake_query',
                                  )(
                                      task_id=task_id,
                                      container_name="snowflake",
                                      command=command,
                                  )

    return ecs_output

We run the dag with the new version of the snowflake operator and with a few modifications, the dag works, yaaay!.

Wait a second, lets look at the dag structure:

Madness



The dependencies are all messed up! Now the dependencies are only pointing to the last step of the internal task group for snowflake_operator.

To make the dag work with this new operator change we have to change the branch operator to look like this:

@task.branch
    def choose_run_next_step(number_selection):
        if number_selection == '2':
            return 'run_snowflake_2.run_snowflake_query'
        else:
            return 'skip'

Since we now have to reference the internal step inside the taskgroup as the next step in the branch operator, that means the internal api of the common-airflow-utils library now is used by every one and it becomes essentially part of the public api (if you change it, things will break).

All of this effort for what? All of the benefits of writing vanilla python functions to build taskflow operators are gone since we are adding a lot of complexity just to manage the fact that task_groups are not tasks, and thus not lazily evaluated and cannot interact by themselves with airflow context.

Let's see now the snowflake_operator following the standard class based method to develop custom operators:

from airflow.models.baseoperator import BaseOperator
from common_airflow_utils.ecs import run_ecs_operator_function

class SnowflakeECsOperator(BaseOperator):
    """
    ECS Operator to run sql queries on Snowflake.
    """

    ui_color = "#abf0ff"
    template_fields = (
        "sql_query",

    )
    container_name = "snowflake"


    def __init__(
            self,
            sql_query: str,
            **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.sql_query = sql_query

    def _setup_ecs_command(self):
        command = ["--sql-query", self.sql_query]
        return command

    def execute(self, **context):  # pylint: disable=unused-argument
        """
        Executes the ECS docker command and returns.
        """
        command = self._setup_ecs_command()

        ecs_output = run_ecs_operator_function(
            task_id=self.task_id,
            container_name=self.container_name,
            task_version=self.task_version,
            command=command,
        )
        return  ecs_output

def snowflake_operator(
    task_id: str,
    sql_query: str,
):
    """
    Runs the ECS snowflake job
    """
    return SnowflakeECsOperator(task_id=task_id, sql_query=sql_query)

This operator is a single airflow task, this means branching and sequencing works out of the box, and the output can be easily accessed via the .output attribute.  Since it inherits from BaseOperator, we can pass to it all of the standard arguments that airflow operators support (trigger_rules, retries, hooks, and so on).
Since its a single task, the scheduler has to track 3 times less objects.

Here is how the dag graph looks like now that we are using class based operators:

Much simpler! And the thing is, from the point of view of the dag writer, they don't care about the internals of the snowflake operator, only that it receives a sql query and it runs it!



2.Airflow uni-tests are hard , smoke tests are less hard

Unittests are the first line of defense for software engineers, they check that all the individual parts of your codebase are working as you expect.

Unittests on airflow are very tricky, for a couple reasons.

First and foremost, Airflow dags and operators require an Airflow context to work. This means in production there is a separate process that takes care of the triggering, computing, state checking for the workflows.

If you read tutorials about airflow testing, you can see that its easy to test that the dags produce valid airflow code (as seen  on the official docs), or that the dags have the specifications you want (meaning, that if you want your dags to have task2 after task1 that is the case).  

However, in practice most of the errors that happen with Airflow dags have nothing to do with those kind of bugs, in my experience errors usually happen because of intradependencies between tasks. Those are hard to unittest on airflow, and to be able to unittest properly you are forced to modify the actual implementation of your dags (see an example here). Changing production code to make testing easier is a big no no on my book (tests should adapt to production, not the other way around).

Another big set of issues have nothing to do with the code itself, but are related to the environment. Airflow being a monolithic orchestrator, requires tons of secrets/variables/connections to properly work. Implenting all of that complexity in a pure unittest suite is hard, that is why I usually have very lightweight unittests that ensure we avoid stupid mistakes (for example, if a dag is supposed to run daily, I want to make sure a dev doesnt remove the cron argument to work locally and pushes to prod). But I mostly test dags via local runs (using the excellent aws repo for local airflow running)

3.Airflow code is not standard Python code

Every framework is by definition based on a set of assumptions/modifications that make some things easier, at the cost of making other things harder, or plain impossible.

Airflow takes this to the next level, and every dev who is dealing with Airflow code probably has struggled to implement standard libraries for code quality in a way that do not crease false flags when dealing with DAG code.

At my current company, we use standard pre-commit plugins for code quality, plus Sonar for company wide static code analysis.

We have had to modify our pylint settings quite a bit in order to fit airflow specific quirks, one of the most common one is how airflow recommends settign up tasks dependencies via bitwise operators.

For example, if you want to define 2 tasks in a way that task2 starts after task1, the recommended way to implement that dependency is this:

task1 = DummyOperator(...)
task2 = DummyOperator(...)
task1 >> task2

Pylint will freak out at this with the very obvious error code pointless statement  why would you do a bitwise operation if you are not going to assign the result to anything?
Sonar might also freak out depending on your company settings.

The way to disable these false alarms is to either disable them poroject wide (which means real issues wont be detected), or pepper your dag code with comments like these ones:

task1 = DummyOperator(...)
task2 = DummyOperator(...)
#this is sad
task1 > task2 # pylint: disable=pointless statement #NOSONAR

Another one of Airflow's  quirks that doesnt play well with vanilla code analysers is the top level imports. As stated on Airflow's Best Practices docs:

...if you have an import statement that takes a long time or the imported module itself executes code at the top-level, that can also impact the performance of the scheduler.

Which goes against the most basic of python PEP8 guidelines (imports at the top of the files).

Pylint rightly will throw the error import-outside-top-level  (which again ,you can bypass with a pylint disable statement.

4.Containerized operators are great

My current job is the third one in which I am in charge of managing Airflow environments. This time, I knew I would do something different to avoid dependency conflicts.

One of the biggest caveats of Airflow in my opinion is that software dependencies (i.e. python packages) are shared. This means if an Airflow environment is used by multiple tenants, the environment will need to support the requirements of every single workflow of every single team using the environment. If a team requires requests<2.0.0 and another team requires requests>2.0.0 , then both teams cant use the same environment.

Worse still, when updating the environment, no matter if you are running airflow on premises or using a service like AWS MWAA, the installation does not validate the compatibility between packages, meaning that conflict between packages can bring the whole scheduler down, putting the admin in the uncomfortable position of knowing that every minute the Airflow environment is broken not a single job can run. I have been there a few times and trust me, its quite stressful.

Fortunately, there is a very nice way to avoid dependency issues on Airflow, using container operators!.

What are container operators? Simply put, these are airflow operators where the computation doesnt happen in the same environment the airflow scheduler or workers are, but inside a container, whether a pure Docker container (DockerOperator or KubernetesPodOperator), or a cloud based container (ECSRunTaskOperator for AWS or CloudRunExecuteJobOperator for Google Cloud Services).

Since these containers run docker images, they can be tagged, versioned, and isolated. Even better, they enable airflow to run workflows on any language! The best part of them though, is that their computation requirements are isolated, meaning heavy tasks cant bring down the worker. We had a task at my current company that was run occasionally, but that required a lot of memory. It would sometimes bring the worker down. Moving it to ECS allowed us to define specific memory requirements for the task.

Thats it, I hope you liked the article!