Basic DAG configuration

“In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

For example, a simple DAG could consist of three tasks: A, B, and C. It could say that A has to run successfully before B can run, but C can run anytime. It could say that task A times out after 5 minutes, and B can be restarted up to 5 times in case it fails. It might also say that the workflow will run every night at 10pm, but shouldn’t start until a certain date.”(https://airflow.apache.org/concepts.html)

FYI: Airflow has great documentation on DAG configuration in its Tutorial Section, which I suggest you take a look at.  

Big Shout Out : This Article helped me a out a lot.  Very useful stuff here http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/.

In this series, I’m going to create some python scripts and configure Airflow to load our imdb.com data to S3 and then Snowflake.  I’ll set up a simple example, step through it, and then build on it to make it slightly more sophisticated.  Hopefully I will demonstrate as simply as possible how to create a data pipeline using airflow.

In this first iteration we are going to step going to set up a simple DAG that calls a few python scripts using bash commands.  This demonstrates simple airflow job scheduling. I’ll walk through it line by line.

If there is not one already, create a dags directory under ~/airflow.

(venv)$ mkdir ~/airflow/dags

(venv)$ cd ~/airflow/dags

If you followed the setup instructions you can use ipython notebook.  Or you can use your favorite editor.

>ssh -L 8000:localhost:8888 ubuntu@35.174.154.82 -i MyPOCKeyPair.pem.txt 

$ ipython notebook

Then navigate to localhost:8000 on your machine.

af - localhost8000

af - jnotebook

Create dag file.  I named mine bash_orch.py.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG('test_bash', description='Test Bash calls',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

s3_copy_operator = BashOperator(task_id='s3_copy', bash_command='python /home/ubuntu/airflow/scripts/s3transfer.py',dag=dag)

snowflake_copy_operator = BashOperator(task_id='sf_copy', bash_command='python /home/ubuntu/airflow/scripts/snowflake_copy.py',dag=dag)

s3_copy_operator >> snowflake_copy_operator

First we import the datetime library, which allows us to easily format a date (ie start_date below)

from datetime import datetime

Then we import the DAG library so we can create a DAG object for airflow to use.  Also we import the bash_operator. There are many other operators like the Python Operator, Dummy Operator, and Base Operator which we will extend to make our own Operator later. You can read more about them here -> https://airflow.apache.org/code.html.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

Creating a DAG.  We will give it an ID, in this cas ‘test_bash’ and a description.  The schedule here is built out in cron form (here is a cron schedule translator if you are not familiar https://crontab.guru/#*_12_*_*_*).  In this example, we are calling this DAG every hour on the 12 minute mark.  The start date is the earliest date this will run.

If the ‘catchup’ parameter is set to ‘True’ then Airflow will create a job run for run period between 2017-03-20 and today.  This can be useful in some cases, but not here.

dag = DAG('test_bash', description='Test Bash calls',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

Now let’s create an instance of the BashOperator to run our first 2 scripts.  Basically we instantiate a BashOperator instance passing a task_id, bash command, and dag (I named our DAG ‘dag’ above, which may make the dag=dag a little confusing below. But none the less dag={name of your dag instance}.

FYI – Once you instantiate an Operator, it is referred to as a task. 

s3_copy_operator = BashOperator(task_id='s3_copy', bash_command='python /home/ubuntu/airflow/scripts/s3transfer.py',dag=dag)

snowflake_copy_operator = BashOperator(task_id='sf_copy', bash_command='python /home/ubuntu/airflow/scripts/snowflake_copy.py',dag=dag)

Now let’s set up dependencies for our scripts (using bitshift operators).  First we want to run the script to copy data into S3 then the script to load tables into Snowflake:

s3_copy_operator >> snowflake_copy_operator

 

Note: Traditionally, operator relationships are set with the set_upstream() and set_downstream()methods. In Airflow 1.8, this can be done with the Python bitshift operators >> and <<. The following four statements are all functionally equivalent:

op1 >> op2
op1.set_downstream(op2)op2 << op1
op2.set_upstream(op1)

Let’s create an s3 bucket and a table in snowflake as our target.  Then we will create a few python scripts to move data to our s3 bucket and copy it into our snowflake table.

Let’s first create an S3 bucket to stage our movie data files.  Navigate to Services->S3.

s3 - s3

Select Create bucket

s3 create bucket

Give the bucket a name. Note: Bucket names must be unique across all of AWS.

s3 bucket

Check to make sure your bucket exists.

 

s3 bucket exists

 

Create a staging table in Snowflake.

create or replace TABLE TITLE_RATINGS (
tconst VARCHAR(9),
averageRating NUMBER(10,2),
numVotes NUMBER(10,0),
primary key (tconst)
);

snowflake create title_ratings

In this section we will set up a few simple python scripts to move data from our Airflow server to S3, and from S3 into Snowflake.  I created a scripts directory underneath my ~/airflow directory to house these scripts.

(venv)$mkdir ~/airflow/scripts

 

Disclaimer:  I am not a Pythonist and don’t play one on the internet.  I will do some things later on in the plugins section to clean this script up and hide passwords.  Since this is for educational purposes, I’m starting what I would consider basic and verbose scripts and clean them up in iterations.  So check the later versions of scripts, and if you have a better way of writing some code let me know. I’m happy to learn.

S3transfer.py

from boto3.s3.transfer import S3Transfer
import boto3

access_key='AKIAEAFY6QP6NH3JXQGA'
secret_key='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'
filepath='/Users/davidrbarlow/Downloads/movies/title.ratings.tsv.gz'
s3_bucket_name='movie-etl-staging'
s3_filename='title.ratings.tsv.gz'
client = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key)

print('client')

transfer = S3Transfer(client)

print('transfer - '+s3_bucket_name)

transfer.upload_file(filepath, s3_bucket_name, s3_filename)

First we will import the S3Transfer library from boto3.  Obviously this will help us transfer data to S3.  The ‘from boto3.s3.transfer import S3Transfer’ clause allows to access this package by simply referencing S3Transfer.  We could omit this line, and later on use the fully qualified S3.Transfer package name(i.e., ‘transfer = boto3.s3.transfer.S3Transfer(client)’).

from boto3.s3.transfer import S3Transfer
import boto3

Next we will define some useful variables.  In future sections we will move the user name password stuff to config files, and use arguments to pass in re-usable items such as filenames and directories.

Variables for AWS CLI user’s access key and secret key.  You will want to change this to your users credentials.

access_key='AKIAEAFY6QP7NH3JXQGA'
secret_key='FzqepDr1rmNCK+tcjDYQIEda6DOVAdosSi80AJOI'

Path and filename (on Airflow server ) of the file we want to upload to S3.

filepath='/Users/adataguru/Downloads/movies/title.ratings.tsv.gz'

S3 bucket and S3 file name.  This will be the S3 bucket you created for this project.

s3_bucket_name='movie-etl-staging'
s3_filename='title.ratings.tsv.gz'

Create a boto 3 client by passing the type of client, aws access key, and secret key.

client = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key)

I put print statements in to create some logging.  This lets me know we successfully got as far as creating a boto3 client.

print('client')

Create transfer object.

transfer = S3Transfer(client)

Log that the transfer client was created.

print('transfer - '+s3_bucket_name)

Use the upload file method from the transfer instance of the S3Transfer object we created.  We will pass this method the local filepath and filename, the S3 bucket we are moving it to, and the resulting filename.

transfer.upload_file(filepath, s3_bucket_name, s3_filename)

Now let’s write a script to perform the copy into snowflake.  Again we’ll clean this up in the plugins section.

import snowflake.connector


cnx = snowflake.connector.connect(
user='adataguru',
password='Password1',
account='dg04864'
)

print('connected')

cnx.cursor().execute("""
COPY INTO MOVIES.PUBLIC.TITLE_RATINGS FROM s3://movie-etl-staging/title.ratings.tsv.gz
    CREDENTIALS = (
        aws_key_id='{aws_access_key_id}',
        aws_secret_key='{aws_secret_access_key}')
    FILE_FORMAT=(field_delimiter='\t',skip_header=1)
""".format(
    aws_access_key_id='AKIAEAFY6QP7NH3JXQGA',
    aws_secret_access_key='FzqepDr1rmNCK+tcjDYQIEda6DOVAdosSi80AJOI'))

print('done')

 

You may have to restart the airflow services.  

(venv)>ps -ef | grep airflow

(venv)>kill (insert the process id from previous output)

(venv)>airflow scheduler -D

(venv)>airflow webserver -D

Airflow has a nifty way of testing an operator.  Which is awesome, b/c it lets you test operators with out running all their dependencies.  It also outputs the logging to standard output, so you don’t have to go digging for log files.  

Since these are just python scripts you can run each script individually to test them.  Then use the airflow test feature to test the operators that run them. The command is airflow test {DAG id} {task id} {start date}.

(vevn)$ airflow test test_bash s3_copy 2015-06-01

AF - test command

Next lets test the actual DAG config.  Log into the Airflow admin console through your web browser:

{Airflow EC2 server public IP}:8080.

Set test_bash to ‘On’, click the ‘play button’ to execute now.  Then click graph view to check out the progress.

AF - Airflow admin screen

 

AF - test_bash graph view