Airflow Plugins

Now let’s take our scripts we created in the previous section ‘Basic DAG configuration’ and create plugins out of them.  In this section we will:

  1. Use inheritance to extend the BaseOperator Class and create our own operators.
  2. Clean up our Python code to hide passwords and instead use a password file.
  3. Add more parameters, so that our plugins can be used for any file.
  4. Update DAG to use our new plugins.

First let’s create a plugins directory.

(venv)$cd ~
(venv)$mkdir airflow/plugins

The first script we will create is my_operparams.py.  I’m pretty lazy about my naming conventions for blog examples.  This started as ‘my_operator.py’, and then i changed it to pass more parameters, and hence now we have ‘my_operparams.py’.

This script is the plugin/custom operator version of s3transfer.py from the previous section.  I changed it to pass more parameters, as well as, created a json password file that it will reference.

Note:  For the scripts below I created a new directory /home/ubuntu/.credfiles to hold our password files.  I also created a couple simple JSON file containing username and passwords in the directory. Now this directory can be locked down, so that only my user has access.  Alternatively you could use environment variables to store passwords.

The contents of david_cred.json looks like this:

{“ACCESS_KEY”:”AKIEIAFZ7QP6MF3JXQGB”, “SECRET_ACCESS_KEY”:”FypwpEr1fnMCK+sejDZQIEda7DOVAdosSi80AJOI”}

my_operparams.py

from boto3.s3.transfer import S3Transfer
import boto3
import logging
import json
import sys

from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults

log = logging.getLogger(__name__)

class S3TransferOperator(BaseOperator):

    @apply_defaults

    def __init__(self,filepath,filename,s3bucket,path_to_key='/home/ubuntu/.credfiles',key_file='david_cred.json',*args, **kwargs):

        self.file_path=filepath
        self.filename=filename
        self.s3_bucket=s3bucket
        self.path_to_key=path_to_key
        self.key_file=key_file
        super(S3TransferOperator, self).__init__(*args,**kwargs)

    def execute(self,context):

        with open(self.path_to_key+'/'+self.key_file, 'r') as f:
            keys = json.load(f)

        self.access_key=keys['ACCESS_KEY']
        self.secret_key=keys['SECRET_ACCESS_KEY']
        self.s3_filename=self.filename

        self.client = boto3.client('s3', aws_access_key_id=self.access_key,aws_secret_access_key=self.secret_key)

        log.info('client')

        self.transfer = S3Transfer(self.client)

        log.info('transfer - '+self.s3_bucket)

        self.transfer.upload_file(self.file_path+self.filename, self.s3_bucket, self.s3_filename)

class S3TransferPlugin(AirflowPlugin):

    name = "S3_transfer_plugin"
    operators = [S3TransferOperator]

The next plugin created is the custom operator version of snowflake_copy.py.  At least my naming is a little more inline.  The changes are in the same likeness as the s3transfer->my_operparams updates.

my_snowflake_copy_operator.py

from boto3.s3.transfer import S3Transfer

import boto3
import logging
import json
import sys
import snowflake.connector

from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults

log = logging.getLogger(__name__)

class SnowflakeCopyOperator(BaseOperator):

    @apply_defaults
    def __init__(self,filename,s3bucket,path_to_key='/home/ubuntu/.credfiles',key_file='david_cred.json',snowflake_key_file='david_sf_cred.json',*args, **kwargs):

        self.filename=filename
        self.s3_bucket=s3bucket
        self.path_to_key=path_to_key
        self.key_file=key_file
        self.snowflake_key_file=snowflake_key_file
        super(SnowflakeCopyOperator, self).__init__(*args,**kwargs)

    def execute(self,context):
   
     with open(self.path_to_key+'/'+self.snowflake_key_file, 'r') as sf:
            sf_keys = json.load(sf)

        self.sf_user=sf_keys['USER']
        self.sf_password=sf_keys['PASSWORD']
        self.sf_account=sf_keys['ACCOUNT']

        with open(self.path_to_key+'/'+self.key_file, 'r') as f:
            keys = json.load(f)

        self.access_key=keys['ACCESS_KEY']
        self.secret_key=keys['SECRET_ACCESS_KEY']

        self.cnx = snowflake.connector.connect(
        user=self.sf_user,
        password=self.sf_password,
        account=self.sf_account
        )
        print('connected')

        self.cnx.cursor().execute("""
        COPY INTO MOVIES.PUBLIC.TITLE_RATINGS FROM '{s3_bucket_file}'
            CREDENTIALS = (
                aws_key_id='{aws_access_key_id}',
                aws_secret_key='{aws_secret_access_key}')
            FILE_FORMAT=(field_delimiter='\t',skip_header=1)
        """.format(
            s3_bucket_file='s3://'+self.s3_bucket+'/'+self.filename,
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key))

        print('done')

class SnowflakeCopyPlugin(AirflowPlugin):
    name = "snowflake_copy_plugin"
    operators = [SnowflakeCopyOperator]

Since this plugin is just a Python script to can do an initial test by just running the script.  Note since we are not really creating any objects or passing any parameters here, we are really just checking for syntax errors.

(venv)$python my_operparams.py

Update our DAG

from datetime import datetime
from airflow import DAG
from airflow.operators import S3TransferOperator
from airflow.operators import SnowflakeCopyOperator

dag = DAG('test_plugin', description='Test plugin call',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

plugin_operator_task = S3TransferOperator(filepath='/home/ubuntu/',filename='title.ratings.tsv.gz', s3bucket='movie-etl-staging',path_to_key='/home/ubuntu/airflow/.credfiles',task_id='S3TransferOperatorTest',dag=dag)

snowflake_copy_operator = SnowflakeCopyOperator(filename='title.ratings.tsv.gz', s3bucket='movie-etl-staging',task_id='sf_copy', dag=dag)

plugin_operator_task >> snowflake_copy_operator

Test the initial DAG config by just running the script.

$python plugin_orch1.py

af plugin - python test

Now let’s test our operators one by one.

(vevn)$ airflow test test_plugin S3TransferOperatorTest 2018-01-01

af -plugin test my_oper

(vevn)$ airflow test test_plugin sf_copy 2018-01-01

af - plugin test sf_copy

Now we can run our DAG through the web interface.

af - clinck play icon

Select Graph View.

AF - bright green shows it is running

Click on a component that is running or ran, and select View Log.

af plugin - graph view success

af - plugin graph view view log

af plugin log view