Contributing to Airflow
When deploying Apache Airflow to remote environments, the application container first needs to check whether Airflow’s backend database can be reached before launching whichever component (webserver, scheduler, worker, etc.) it needs to launch. The first draft of this database (written by the data engineer before me) check logic uses the psql
CLI app to attempt connection with retries, which works well enough, but has two major drawbacks:
- Using
psql
means this database check is vendor-specific. If we one day switch to using MySQL or else, we will need to write another function that uses a different CLI command - While the retry logic is not complex, handwriting it in shell script is still unpleasant, and the resulting code is ugly
When my team upgraded our Airflow setup to version 2.5, we started using the Airflow CLI command airflow db check
for performing database check. Using Airflow’s CLI addresses the lack of portability to other database backends (thanks to Python DB API 2.0 and SQL Alchemy), but we still need handwrite the retry logic by hand using shell script:
# Repeatedly run "airflow db check" until either the command succeeds or
# the retries are exhausted
#
# Usage: airflow_db_check 10 1
airflow_db_check() {
local retries=$1 delay=$2
airflow db check
if [[ $? == "0" ]]; then
return 0
fi
while [[ $retries -gt "0" ]]; do
sleep $delay
airflow db check
if [[ $? == "0" ]]; then
return 0
retries=$(($retries - 1))
echo "$retries retries remaining; will retry in $delay sec"
done
echo "Could not connect to Airflow's backend"
return 1
Knowing that shell script is probably not average web developer’s favorite turing complete programming language and motivated by getting some covetted “open source contribution” street creds, I decided to submit a feature request on the apache/airflow
repository.
To my surprise, the next day a committer responded by acknowledging the usefulness of incorporating retry logic into the Airflow CLI command itself, and encouraged me to submit a pull request. Hence begin my quest to become a contributor to the Airflow project.
Setup
Apache Airflow is a genuinely massive project in no small part because it is both a library for writing workflow into DAGs and a full-stack application. Consequently, even setting up the development environment including the test suite is a non-trivial task.
This is not the first time I tried trying to set up Airflow’s development environment: in my previously attempt a few months ago, I had trouble installing the dependencies, and when the CI/CD utility breeze
presents me with a shell prompt with root
, I was spooked and immediately wiped the local copy clean. Knowing that breeze
uses Docker Compose on the backend to spawn a complete Airflow cluster, and suspecting that the virtual environment setup could have irreversible consequence on my laptop, my first instinct is to take development to the a remote server.
With Codespace
GitHub conveniently offers a free tier on its Cloud IDE Codespace, and it is listed as one of the officially supported development setup on the contribution guide, making it my first choice.
Something that I did not understand until later was that each Codespace instance is not a virtual machine, but a container that was given virtual hardware limitations (most notably CPU cores, RAM, and disk). This means that just like every other containers, exceeding the memory constraint will get your Codespace instance killed without warning (thanks containerd
!). This is what happened when I tried to run breeze shell
and Breeze tried to spawn a gazillion other containers with Docker Compose, and I simply could not resurrect my Codespace instance after it died (trying to restart the instance results will fail with unhelpful message “oops something went wrong”)
Another issue I had with developing Airflow on Codespace is that the devcontainer has a setup that is not very transparent nor “standard.” After the Codespace instance starts, the terminal always starts shell session with a root
prompt (which makes pip
very unhappy about installing Python packages into a root account). Again I only learned later that the dev container is configured to drop me straight into a Breeze shell (hence the root
prompt), which I find too opinionated and inflexible for a dev container setup.
With local setup
After struggling with Codespace for a few attempts, I decided to revert back to doing development locally, and luckily because of the limited scope of change needed for my code change as you will see later, the setup process is not too involved.
I manage my Python installation with pyenv
, and I use venv
for my virtual environment needs, so the first steps are to setup those up first:
python -m venv .venv
source .venv/bin/activate
pip install --upgrade pip setuptools wheel
Then, we need to install the local copy of Airflow:
# celery and postgres are needed later
pip install -e ".[devel,celery,postgres]"
During the installation process, on some machines I ran into an issue with pygraphviz
, which has a C library dependencies that need to be installed with brew install graphviz
, and that might require additional CFLAG="-I/path/to/graphviz/include"
for pip
to be able to compile pygraphviz
using graphviz
. I could not consistently reproduce the same error on my other laptops, so this issue will not be disussed further.
Finally, we need to install pipx
and use pipx
to setup Breeze:
# The --user flag is not necessary since we are in a virtual environment
pip install pipx
pipx ensurepath # after this, restart shell, or source ~/.zshrc
pipx install -e ./dev/breeze
The steps above will install the breeze
script into ~/.local/bin
, which can be removed simply by deleting it.
To validate the setup, run breeze
(which is a short hand for the command breeze shell
). Breeze will spawn a container that uses sqlite for Airflow’s backend and give you a shell to interact with that container. From the shell, we can run pytest tests/cli
to roughly validate that the setup works, after which we exit the shell and stop the Airflow cluster with breeze down
.
Static checks are run with using the pre-commit
program, which should have been installed when we run pip install
already. We can validate that the static checks can also execute correctly with:
pre-commit run --all-files
The first run will be slow as pre-commit
needs to set up a lot of things, but subsequent runs should be faster, and with subsequent runs we will only run static checks against staged files (after git add
but before git commit
).
Development
Scope of change
The airflow db
subcommands are defined in airflow/cli/commands/db_command.py
. The original implementation of the airflow db check
command is as follows:
# airflow.cli.commands.db_command
@cli_utils.action_cli(check_db=False)
def check(_):
"""Runs a check command that checks if db is available."""
db.check()
Within which the db.check
function is implemented within airflow/utils/db.py
:
# airflow.utils.db
@provide_session
def check(session: Session = NEW_SESSION):
"""
Checks if the database works.
:param session: session of the sqlalchemy
"""
session.execute(text("select 1 as is_alive;"))
log.info("Connection successful.")
Since we need to add two additional arguments for the airflow db check
command, we will also need to modify airflow/cli/cli_config.py
to add those two arguments:
# Add the two Arg tuples
ARG_DB_RETRY = Arg(
("--retry",),
default=0,
type=positive_int(allow_zero=True),
help="Retry database check upon failure",
)
ARG_DB_RETRY_DELAY = Arg(
("--retry-delay",),
default=1,
type=positive_int(allow_zero=False),
help="Wait time between retries in seconds",
)
# Add these two arguments to the `db check` command
DB_COMMANDS = (
# ... other commands ...
ActionCommand(
name="check",
help="Check if the database can be reached",
func=lazy_load_command("airflow.cli.commands.db_command.check"),
args=(ARG_VERBOSE, ARG_DB_RETRY, ARG_DB_RETRY_DELAY),
),
# ... other commands ...
)
Proposed change
My first try at the implementation is to write the retry logic by myself. Since utils.db.check
indicates failure to connect by letting session.execute
raise unhandled exception, each check’s logic can get rather ugly:
def check():
try:
db.check()
except:
pass
while has_retries:
sleep(...)
try:
db.check()
except:
pass
retries -= 1
# Now what? Re-raise the last exception? SystemExit?
Hence, I’ve decided to also refactor the utils.db.check
function so that it catches the OperationalError
that session.execute
could throw, and returns a boolean to indicate the status of the database check:
# airflow.utils.db
@provide_session
def check(session: Session = NEW_SESSION) -> bool:
try:
session.execute(text("select 1 as is_alive;"))
log.info("Connection successful.")
return True
except OperationalError as e:
log.debug(e)
return False
With that, the CLI command implementation also becomes a lot cleaner:
@cli_utils.action_cli(check_db=False)
def check(args):
"""Runs a check command that checks if db is available."""
retries: int = args.retry
retry_delay: int = args.retry_delay
if db.check():
raise SystemExit(0)
while retries > 0:
time.sleep(retry_delay)
if db.check():
raise SystemExit(0)
retries -= 1
print(f"Warning: will retry in {retry_delay} seconds. {retries} retries left")
raise SystemExit(1)
Note that we used raise SystemExit
because this is how other CLI commands force the program to exit, as well. Also, it makes testing a bit easier since we can used pytest.raises
to catch SystemExit
while still retaining the early exit of the function.
With airflow.utils.db.check
, unit testing is straightforward: the function has three possible outcomes:
- Return
True
when the session successfully executes the trivial query - Return
False
when the session raisessqlalchemy.exc.OperationalError
at query execution - Raise any other exceptions that
session
raises
This means to mock a session whose execute
method either runs without issues, raises OperationalError
, or raise some other error:
def test_check(...):
session_mock = MagicMock()
assert check(session_mock)
session_mock.execute = mock.Mock(side_effect=OperationalError("FOO", None, None))
assert not check(session_mock)
session_mock.execute = mock.Mock(side_effect=DatabaseError("BAR", None, None))
with pytest.raises(DatabaseError, match="BAR"):
check(session_mock)
The test case for db_commands.check
is more involved, in no small part because this function makes call to time.sleep
, and exit
, both which I will need to mock, patch, then check whether they are called with the correct argument for the correct number of times, without invoking the actual functions, the first of which will make the test very slow, and the latter of which will simply cause the test session to exit.
Patching functions is achieved using the patch
function within the unittest.mock
module, with the target being a valid import path:
def check():
...
time.sleep(x)
...
def test_check():
with patch("time.sleep"):
# The "time.sleep" call is patched with a call to a mock
check()
A new
argument can be supplied with a named mock variable, which can then be used for call assertion:
def test_check():
mock_sleep = MagicMock()
with patch("time.sleep", new=mock_sleep):
check()
mock_sleep.assert_called()
A particularly neat method in the mock
module is assert_calls
, which can be used to assert successive calls:
from unitttest.mock import patch, call, MagicMock
def check():
time.sleep(1)
time.sleep(2)
time.sleep(3)
time.sleep(4)
def test_check():
mock_sleep = MagicMock()
with patch("time.sleep", new=mock_sleep):
check()
mock_sleep.assert_calls([
call(1), call(2), call(3), call(4)
])
Updated proposal
Thanks to the feedback from other contributor(s), I’ve learned that the Apache Airflow project actually uses retry logic from another more mature project called tenacity.
The core data structure of tenacity
is the Retrying
object, which can be instantiated alone or be created through the retry
decorator. tenacity
uses unhandled exceptions as indication for whether a function call succeeds or not, so I reverted my code change on airflow.utils.db.check
back to its original implementation. This has the nice effect of reducing the amount of unit tests that I have to write.
In the end, my updated code change only affects airflow.cli.commands.db_command.check
:
@cli_utils.action_cli(check_db=False)
def check(args):
"""Runs a check command that checks if db is available."""
retries: int = args.retry
retry_delay: int = args.retry_delay
def _warn_remaining_retries(retrystate: RetryCallState):
remain = retries - retrystate.attempt_number
log.warning(f"{remain} retries remain, will retry after {retry_delay} seconds")
for attempt in Retrying(
stop=stop_after_attempt(1 + retries),
wait=wait_fixed(retry_delay),
reraise=True,
before_sleep=_warn_remaining_retries,
):
with attempt:
db.check()
And my unit tests still mock sleep
and airflow.utils.db.check
to count the number of times they are called and the arguments they are called with:
def test_check(self):
retry, retry_delay = 6, 9 # arbitrary but distinct number
args = self.parser.parse_args(
["db", "check", "--retry", str(retry), "--retry-delay", str(retry_delay)])
sleep = MagicMock()
always_pass = Mock()
always_fail = Mock(side_effect=OperationalError("", None, None))
with patch("time.sleep", new=sleep), patch("airflow.utils.db.check", new=always_pass):
db_command.check(args)
always_pass.assert_called_once()
sleep.assert_not_called()
with patch("time.sleep", new=sleep), patch("airflow.utils.db.check", new=always_fail):
with pytest.raises(OperationalError):
db_command.check(args)
# With N retries there are N+1 total checks, hence N sleeps
always_fail.assert_has_calls([call()] * (retry + 1))
sleep.assert_has_calls([call(retry_delay)] * retry)
Conclusion
This is all the code change that happened, and all there remains to do is to submit the pull request and to wait for the committer to approve your change.