Now let’s take our scripts we created in the previous section ‘Basic DAG configuration’ and create plugins out of them. In this section we will:
- Use inheritance to extend the BaseOperator Class and create our own operators.
- Clean up our Python code to hide passwords and instead use a password file.
- Add more parameters, so that our plugins can be used for any file.
- Update DAG to use our new plugins.
First let’s create a plugins directory.
(venv)$cd ~ (venv)$mkdir airflow/plugins
The first script we will create is my_operparams.py. I’m pretty lazy about my naming conventions for blog examples. This started as ‘my_operator.py’, and then i changed it to pass more parameters, and hence now we have ‘my_operparams.py’.
This script is the plugin/custom operator version of s3transfer.py from the previous section. I changed it to pass more parameters, as well as, created a json password file that it will reference.
Note: For the scripts below I created a new directory /home/ubuntu/.credfiles to hold our password files. I also created a couple simple JSON file containing username and passwords in the directory. Now this directory can be locked down, so that only my user has access. Alternatively you could use environment variables to store passwords.
The contents of david_cred.json looks like this:
{“ACCESS_KEY”:”AKIEIAFZ7QP6MF3JXQGB”, “SECRET_ACCESS_KEY”:”FypwpEr1fnMCK+sejDZQIEda7DOVAdosSi80AJOI”}
my_operparams.py
from boto3.s3.transfer import S3Transfer import boto3 import logging import json import sys from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults log = logging.getLogger(__name__) class S3TransferOperator(BaseOperator): @apply_defaults def __init__(self,filepath,filename,s3bucket,path_to_key='/home/ubuntu/.credfiles',key_file='david_cred.json',*args, **kwargs): self.file_path=filepath self.filename=filename self.s3_bucket=s3bucket self.path_to_key=path_to_key self.key_file=key_file super(S3TransferOperator, self).__init__(*args,**kwargs) def execute(self,context): with open(self.path_to_key+'/'+self.key_file, 'r') as f: keys = json.load(f) self.access_key=keys['ACCESS_KEY'] self.secret_key=keys['SECRET_ACCESS_KEY'] self.s3_filename=self.filename self.client = boto3.client('s3', aws_access_key_id=self.access_key,aws_secret_access_key=self.secret_key) log.info('client') self.transfer = S3Transfer(self.client) log.info('transfer - '+self.s3_bucket) self.transfer.upload_file(self.file_path+self.filename, self.s3_bucket, self.s3_filename) class S3TransferPlugin(AirflowPlugin): name = "S3_transfer_plugin" operators = [S3TransferOperator]
The next plugin created is the custom operator version of snowflake_copy.py. At least my naming is a little more inline. The changes are in the same likeness as the s3transfer->my_operparams updates.
my_snowflake_copy_operator.py
from boto3.s3.transfer import S3Transfer import boto3 import logging import json import sys import snowflake.connector from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults log = logging.getLogger(__name__) class SnowflakeCopyOperator(BaseOperator): @apply_defaults def __init__(self,filename,s3bucket,path_to_key='/home/ubuntu/.credfiles',key_file='david_cred.json',snowflake_key_file='david_sf_cred.json',*args, **kwargs): self.filename=filename self.s3_bucket=s3bucket self.path_to_key=path_to_key self.key_file=key_file self.snowflake_key_file=snowflake_key_file super(SnowflakeCopyOperator, self).__init__(*args,**kwargs) def execute(self,context): with open(self.path_to_key+'/'+self.snowflake_key_file, 'r') as sf: sf_keys = json.load(sf) self.sf_user=sf_keys['USER'] self.sf_password=sf_keys['PASSWORD'] self.sf_account=sf_keys['ACCOUNT'] with open(self.path_to_key+'/'+self.key_file, 'r') as f: keys = json.load(f) self.access_key=keys['ACCESS_KEY'] self.secret_key=keys['SECRET_ACCESS_KEY'] self.cnx = snowflake.connector.connect( user=self.sf_user, password=self.sf_password, account=self.sf_account ) print('connected') self.cnx.cursor().execute(""" COPY INTO MOVIES.PUBLIC.TITLE_RATINGS FROM '{s3_bucket_file}' CREDENTIALS = ( aws_key_id='{aws_access_key_id}', aws_secret_key='{aws_secret_access_key}') FILE_FORMAT=(field_delimiter='\t',skip_header=1) """.format( s3_bucket_file='s3://'+self.s3_bucket+'/'+self.filename, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key)) print('done') class SnowflakeCopyPlugin(AirflowPlugin): name = "snowflake_copy_plugin" operators = [SnowflakeCopyOperator]
Since this plugin is just a Python script to can do an initial test by just running the script. Note since we are not really creating any objects or passing any parameters here, we are really just checking for syntax errors.
(venv)$python my_operparams.py
Update our DAG
from datetime import datetime from airflow import DAG from airflow.operators import S3TransferOperator from airflow.operators import SnowflakeCopyOperator dag = DAG('test_plugin', description='Test plugin call', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False) plugin_operator_task = S3TransferOperator(filepath='/home/ubuntu/',filename='title.ratings.tsv.gz', s3bucket='movie-etl-staging',path_to_key='/home/ubuntu/airflow/.credfiles',task_id='S3TransferOperatorTest',dag=dag) snowflake_copy_operator = SnowflakeCopyOperator(filename='title.ratings.tsv.gz', s3bucket='movie-etl-staging',task_id='sf_copy', dag=dag) plugin_operator_task >> snowflake_copy_operator
Test the initial DAG config by just running the script.
$python plugin_orch1.py
Now let’s test our operators one by one.
(vevn)$ airflow test test_plugin S3TransferOperatorTest 2018-01-01
(vevn)$ airflow test test_plugin sf_copy 2018-01-01
Now we can run our DAG through the web interface.
Select Graph View.
Click on a component that is running or ran, and select View Log.