Docker and Apache Airflow

Objective

I am going to show how to setup Apache's Airflow inside docker as a basic container for data analysis pipeline.

Docker

Install docker

sudo apt-get install docker.io
sudo adduser $USER docker
sudo apt-get install debbootstrap
# Relog

Now bootstrap minbase smallest variant of GNU/Linux Debian system

mkdir debian_root && cd debian_root
sudo debootstrap --variant=minbase sid .

From debian_root directory, import it as docker image while tagging it 'raw'

sudo tar -c * | docker import - minidebian:raw

This gives reasonably small base image (~181M).

Now create a Dockerfile for Debian with Apache Airflow. This will be quick setup with SQLite default database and local workers.

mkdir -p docker ; cd docker
cat << EOF >> Dockerfile
FROM minidebian:raw
LABEL description="Minimal GNU/Linux Debian"
MAINTAINER Mateusz Kaduk <mateusz.kaduk@gmail.com>
RUN apt-get update && apt-get install -y python3 python3-pip ipython3 && pip3 install apache-airflow[gcp_api]==1.8.2 && airflow initdb
CMD ["/usr/local/bin/airflow","webserver"]
EXPOSE 8080
EOF

Build the final docker image

docker build . -t minidebian:latest

Start container

Start interactive container based on latest image, expose ports and bind dags directory

cd ..
docker run -P -it --name python-gcloud -v /home/mateusz/Debian/airflow/:/root/airflow/dags minidebian:latest

Check on what port docker with airflow is on

sudo lsof -i -n | grep docker

go there with browser and additionally you can spawn shell to running container

docker exec -it python-gcloud "/bin/bash"
# Other commands
# docker start
# docker stop

Simple DAG

As an example the following DAG can be constructed and placed in dags directory

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# from airflow.contrib.operators.gcs_download_operator import GoogleCloudStorageDownloadOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 27),
    'email': ['mateusz.kaduk@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'data',
    default_args=default_args,
    schedule_interval="@once",
    )
    # schedule_interval=timedelta(1))
 
# Tasks
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

To test simple task executing date bash command, run with your date

airflow test data gcs_download 2018-05-27

Next

In future posts, I plan to add tasks fetching data from Google Cloud Storage, processing them and producing reports.

 Share!

 
comments powered by Disqus