A quick Introduction to Apache Airflow

Comments · 17 Views

Apache Airflow is an open-source data pipeline task scheduling and coordinating tool. This tool was developed by Airbnb back in late 2014. The project was made open-source later in 2016 by donating it to the Apache Foundation.

Task Dependencies:

Airflow uses DAGs (Directed Acyclic Graphs) for execution and is based on the python programming language.

Image for post

If task a must execute before task b, we must define the execution as

a b

Conversely, if the task a must be executed after task b, we must define it as follows:

b a

Alternatively, we can also make use of upstream and downstream definition methods. These methods were used historically and since recent updates to Airflow, bitwise operators ( and ) are used.

a.set_downstream(b) # task a comes before task b

a.set_upstream(b) # task a comes after task b

Operators:

Operators represent a single task in workflow and usually execute independently. Generally, operators do not share information. There are several types of operators based on the nature of the task to be performed.

Each operator in the Airflow must have a unique task_id and the dag name to which the operator belongs to.

Some of the operators available within Airflow are:

  1. Dummy operator
  2. Bash Operator
  3. Python Operator
  4. Branch Operator
  5. redshift to s3 Operator

and many more…

  1. Dummy Operator

This operator does not perform any job and is usually used for debugging purposes.

from airflow.operators.dummy_operator import DummyOperatorDummyOperator(task_id = ‘example’, dag=dag)

2. Bash Operator

If you want to run any bash commands or bash scripts during the execution of data pipelines then BachOperator is what you need! These operators use the underlying OS for the execution of bash commands. The results can be observed in the log files.

from airflow.operators.bash_operator import BashOperatorexample_task = BashOperator(task_id = ‘bash_ex’, bash_command = ‘echo 1’, dag = dag)

Here, we provide the bash command to be executed along with the operator identifies and the dag name to which it belongs.

3. Python Operator

Apache airflow also provides flexibility to define python functions and call them during the runtime execution. This is achieved using PythonOperator.

Consider, we are scraping the contents from a URL. We can write a function to fetch data and later pass this function as an argument to the PythonOperator.

import requestsdef pull_file(URL, savepath): 
r = requests.get(URL)
with open(savepath, 'wb') as f:
f.write(r.content)
#Use the print method for logging
print(f"File pulled successfully from {URL} and saved to
{savepath}")
from airflow.operators.python_operator import PythonOperator#Create the task
pull_file_task = PythonOperator(task_id = 'pull_file',
python_callable = pull_file,
op_kwargs = {'URL': 'https://server/data.json',
'savepath': 'abc.json'}, dag = exec_dag)

In the above code block, we have defined the python user-defined function pull_file. This function fetches data from the URL and saves it to the provided save path. Later, this function is invoked by the PythonOperator by passing the function name and argument values.

Sensors:

Sensors are the special operators that wait for a certain condition to be true.

Eg. — Creation of file, upload of a database record, a certain response from a web request, etc.

While defining sensors, we can define how often the sensor needs to check the condition for its truthness.

Sensors are derived from airflow.sensors.base_sensor_operator.

In addition to default operator arguments ‘task_id’ ‘dag’, sensors also include the following arguments:

  1. mode: How to check for the condition.

a. mode = ‘poke’: The default mode that runs repeatedly. (This makes other tasks to wait and eventually slows down the system)

b. mode = ‘reschedule’: Gives up task slot and try again later. (Other tasks don't wait until the completion of the sensor).

2. poke_interval: How often to wait between checks.

3. timeout: How long to wait before failing the task

Sensors are used instead of operators when the task involves uncertainty of its execution. If failure is not immediately desired and we need to add task repetitions without the usage of loops then sensors are used.

Additional details about other operators can be found at https://airflow.apache.org/docs/stable/_api/airflow/operators/index.html

Executors:

Executors define how tasks are executed. Various executors available in Apache Airflow are:

  1. SequentialExecutor
  2. LocalExecutor
  3. CeleryExecutor

We can check the type of executors used by each dag using either airflow.cfg file or by executing the command “airflow list-dags”.

  1. SequentialExecutor

This is the default Airflow executor that runs one task at a time. It is useful for debugging. Although it is simple to understand, it is not really recommended for production data pipelines due to long execution time.

2. LocalExecutor

This executor runs on a single system and treats each task as a process. Parallelism is defined by the user and they can utilize all resources of a given host system.

3. CeleryExecutor

CeleryExecutor uses a Celery backend as a task manager. We can define multiple worker systems which makes it significantly more difficult to set up and configure. However, these types of executors are extremely powerful for organizations with extensive workflows.

SLAs (Service Level Agreements):

Image for post

SLAs are defined within the Airflow DAG to finish the execution of workflow within expected timelines. They dictate the amount of time a task or a DAG should require to run. An SLA miss is triggered, any time when the task/DAG does not meet the expected timing requirements.

If an SLA is missed, an email is sent out and the log is stored. We can view SLA misses in the web UI.

SLA can be defined in two ways:

  1. Using the ‘SLA’ argument on the task
task1 = BashOperator(task_id = 'task', bash_command = 'runcode.sh',
sla = timedelta(seconds = 30),
dag = dag)

2. In the default_args dictionary

default_args = {'sla': timedelta(minutes = 20,
'start_date': datetime(2020,2,20)
}
dag = DAG('sla_dag', default_args = default_args)

References:

https://learn.datacamp.com/courses/introduction-to-airflow-in-python

https://airflow.apache.org/docs/stable/_api/airflow/

https://en.wikipedia.org/wiki/Apache_Airflow

Comments