Ben Ryves is a Staff Software Engineer based in our Zurich office. As part of his work on the Marketing Platform Team, he builds and maintains infrastructures across our architecture. Here, he explains how the team supported a migration to the newest version of Apache Airflow in a way that minimized risk and ensured a smooth transition for other user teams.
At GetYourGuide we use Apache Airflow for scheduling the majority of our data generation and transformation tasks. If you're not familiar with Airflow, it allows engineers to write Python files which express operators and compose them together into a directed acyclic graph (DAG). Alongside a schedule, this allows us to ensure that tasks run in a periodic and predictable fashion such that the dependencies between all of our DAGs are properly encoded.
Recently we migrated to the latest major version of Airflow in order to take advantage of architectural and performance improvements, as well as ensuring that we could use the most up-to-date provider code. While necessary, this move required a large number of modifications to our DAGs due to breaking changes in the Airflow API, as well as in various providers which we use. It also required coordination between all the teams using Airflow, because DAGs often depend on other DAGs. This blog details how we organized that migration, and the tools that we put in place in order to reduce the risk.
When planning the migration, one of the key things we wanted to ensure was making it possible for teams to migrate at their own pace within the migration window. This was important both to avoid rushing through any changes, as well as to give teams time to test those changes. Additionally, we wanted to make it possible to easily switch DAGs on and off in both environments, such that if there was an issue it was operationally easy to swap DAGs temporarily back to the new environment.
To enable this it was essential to extend the ability in Airflow to depend on tasks in other DAGs to make our DAGs able to depend on tasks in other clusters. That is, we wanted tasks in our old Airflow 1 cluster to be able to depend on tasks in the new Airflow 2 cluster, and vice versa. Further, we wanted to make this process automatic such that when writing or migrating a DAG, it wasn't necessary for either DAG to know which cluster the other DAG was in. That way teams would be able to move their DAGs without having to notify other teams, or write migration-specific code to handle DAGs existing in other clusters.
To achieve this we implemented a new type of cluster-independent sensor to allow for waiting for upstream tasks and DAGs to finish executing, which we call a CrossAirflowSensor. This sensor relies on a service outside of Airflow which acts as a registry for DAGs. Every time a DAG or task runs in either environment, a callback executes after the task finishes which registers the start time, DAG name, and task name. This ensures that the service has a complete picture of all tasks running in both environments. Then, when a sensor is scheduled, the poke method of the sensor queries the external task registry service, and completes only when the expected task is found to have been completed.
While simple, this sensor enabled us to quickly move DAGs back and forth between environments, and completely decoupled our teams from the migrations of other teams, meaning that we could iteratively migrate DAGs between environments at a measured pace. Further, when we did run into operational issues with the new environment, the sensor enabled us to quickly and safely revert back to the previous setup while those issues were being resolved, allowing us to avoid downtime.
One common pattern for sensors in our Airflow environment is to depend on specific tasks and DAGs. Typically this is done by referencing the dag_id and task_id in the sensor definition. An issue we found with this approach is that it couples DAGs together, making it difficult to rename tasks or DAGs without having to change code in other DAGs. While we would ideally depend directly on the data (for example the Hive partition), this isn't always feasible, so during the migration we also took the opportunity to build tooling which could remove this coupling between our DAGs.
The tooling we built maintains a mapping between dataset names and the tasks that produce them, and then provides code that, at runtime, inspects the mapping to look up the dag_id and task_ids for a specific datasource. To generate the mapping, we use the outlets field of Airflow operators, which also has the benefit that our code is compatible with other Airflow features such as data-aware scheduling, as well as providing us with metadata that we can use to generate lineage information. To build the mapping we run a script which creates a production-like containerized Airflow environment. The script iterates over all of the tasks in that environment, and for every outlet found, an entry is added to the list of mappings. We also run the script in our CI pipelines to verify that the currently committed version of the mapping file is correct, and we have an integration test that verifies that all of the tasks and DAGs referred to by cross-DAG sensors exist as expected.
Using this approach we were able to drastically reduce the number of times different teams have to ask about which DAG/task produces a specific dataset, and we were also able to completely remove coupling between our DAGs. This allowed us to refactor several old DAGs that were used for synchronization purposes, which simplified our environment significantly. Further, we were able to increase velocity for teams writing new sensors, as instead of knowing the ids of tasks in Airflow, they now just need to know the names of datasets – something that they would already have to know in order to write the code that the task runs.
The final tool that we added which had a significant impact on our migration was DAG diff tooling. The goal of this tool was to minimize the risk when making changes to DAG code. Airflow code being Python can be both a blessing and a curse – it enables you to be extremely productive, but it also leads to code that often touches multiple DAGs in not always easy to predict ways. This became especially problematic during the migration, when older legacy code which had often not been touched for a significant period of time suddenly became relevant and had to be updated for the new environment.
To solve this problem, we wanted tooling which would enable us to identify exactly what had changed as a result of a code change – that is, we wanted to go deeper than the Python code diff, and go all the way to being able to produce a diff of a DAG between two commits. This was possible in theory because the Python code declaring a DAG is broadly split into two parts – code that declares the topological structure of the DAG, and code which is executed as different tasks are scheduled. Because of this two-part structure, by parsing the DAGs in a production-like environment but not executing them, it's possible to produce a JSON representation of the DAG. By doing this twice – once for the new commit, and once for the old commit – we were then able to write code to diff the two versions and produce a readable list of differences.
Having this capability has been invaluable for checking the impact of a change. Using this tooling we've been able to increase our confidence that a patch is doing what it claims to do, and also make it easier for reviewers to understand the actual impact of a patch in terms of how it affects the DAGs. It's also allowed us to catch numerous issues before they made it to production. Since introducing the tooling we've added further automation to make it easier to run a diff, and we've also improved the difference checking algorithm to account for fields in tasks which are order independent. Eventually we would like to automate the diff by having an integration with our pull requests so that the diff can be added automatically when a pull request is opened, and we'd also love to open source it for the wider community – the only blocker for doing that is that it's somewhat specific to our Airflow setup at present.
Beyond these major changes, we also made a number of smaller impactful changes to our environment including fully supporting containerized development environments, and making all scripts runnable both locally and on CI/CD. While a lot of these tools are very simple in nature, they had an outsized impact on our developer experience – and we believe they demonstrate that a lot of value can be captured with simple solutions.
Engineering Manager Series Part 6: Systems Health and How to Create a DevOps Culture
Turning Analytics Notebooks into Our Scalable Brand Pipeline
Growth Path for Engineers at GetYourGuide
How we Standardized Machine Learning Observability Across Teams