Objective
I am going to show how to setup Apache's Airflow inside docker as a basic container for data analysis pipeline.
Docker
Install docker
sudo apt-get install docker.io
sudo adduser $USER docker
sudo apt-get install debbootstrap
# Relog
Now bootstrap minbase smallest variant of GNU/Linux Debian system
mkdir debian_root && cd debian_root
sudo debootstrap --variant=minbase sid .
From debian_root directory, import it as docker image while tagging it 'raw'
sudo tar -c * | docker import - minidebian:raw
This gives reasonably small base image (~181M).
Now create a Dockerfile for Debian with Apache Airflow. This will be quick setup with SQLite default database and local workers.
mkdir -p docker ; cd docker
cat << EOF >> Dockerfile
FROM minidebian:raw
LABEL description="Minimal GNU/Linux Debian"
MAINTAINER Mateusz Kaduk <mateusz.kaduk@gmail.com>
RUN apt-get update && apt-get install -y python3 python3-pip ipython3 && pip3 install apache-airflow[gcp_api]==1.8.2 && airflow initdb
CMD ["/usr/local/bin/airflow","webserver"]
EXPOSE 8080
EOF
Build the final docker image
docker build . -t minidebian:latest
Start container
Start interactive container based on latest image, expose ports and bind dags directory
cd ..
docker run -P -it --name python-gcloud -v /home/mateusz/Debian/airflow/:/root/airflow/dags minidebian:latest
Check on what port docker with airflow is on
sudo lsof -i -n | grep docker
go there with browser and additionally you can spawn shell to running container
docker exec -it python-gcloud "/bin/bash"
# Other commands
# docker start
# docker stop
Simple DAG
As an example the following DAG can be constructed and placed in dags directory
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# from airflow.contrib.operators.gcs_download_operator import GoogleCloudStorageDownloadOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 27),
'email': ['mateusz.kaduk@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'data',
default_args=default_args,
schedule_interval="@once",
)
# schedule_interval=timedelta(1))
# Tasks
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
To test simple task executing date bash command, run with your date
airflow test data gcs_download 2018-05-27
Next
In future posts, I plan to add tasks fetching data from Google Cloud Storage, processing them and producing reports.