Defining your workflow: Why Not Airflow?

 

What is Apache Airflow?

Airflow is a platform to programmatically author, schedule & monitor workflows or data pipelines. These functions achieved with Directed Acyclic Graphs (DAG) of the tasks. It is an open-source and still in the incubator stage. It was initialized in 2014 under the umbrella of Airbnb since then it got an excellent reputation with approximately 800 contributors on GitHub and 13000 stars. The main functions of Apache Airflow is to schedule workflow, monitor and author.

Apache Airflow as a Solution

  • Failures: retry if failure happens
  • Monitoring: success or failure status
  • Dependencies:
    • Data Dependencies – upstream data is missing
    • Execution Dependency – job 2 runs after job 1 is finished
  • Scalability: no centralized scheduler b/w diff. cron machines
  • Deployment: deploy new changes constantly
  • Process Historic Data: backfill/rerun historical data

How Apache Airflow Works?

  • Apache Airflow achieves the errands by taking DAG(Directed Acyclic Graphs) as an array of the workers, a portion of these workers have particularized possibilities.
  • It brings about the development of DAG in Python itself which make these DAGs utilized effectively further for different procedures.
  • These outcomes in the changing of a workflow into a well-characterized code which further makes a work process testable, maintainable, Co-employable and versionable.
  • During the majority of the above methods assignments not allowed to trade the information, however with this reality, it is likewise obvious that metadata transfer. It’s not considered as a streaming solution concerning information.
  • The working procedure of Apache Airflow isn’t probably going to be comparable with “Spark Streaming” or “Storm” space. In any case, it tends to be taken as like the Oozie.

Why Apache Airflow?

Apache Airflow
  • The logs entries of execution gathered at one area.
  • The utilization of Airflow matters as it has solidarity to mechanize the improvement of the workflow as it has a way to deal with arrange the workflow as a code.
  • It can give a reporting message through slack if an error comes due to failure of DAG
  • Inside the DAGs, it gives a clarion image of the dependencies.
  • The capacity to produce the metadata gives an edge of regenerating uploads.

Principles

Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.
Extensible: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment.
Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.
Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

Basic concepts –

DAG: a description of the order in which work should take place
Operator: a class that acts as a template for carrying out some work
Task: a parameterized instance of an operator
Task Instance: a task that
  • has been assigned to a DAG and
  • has a state associated with a specific run of the DAG

How to use Airflow?

- Develop a connection with the User Interface.
- Edit the connection with the User Interface.
- Develop a connection with variables related to the environment.
- Configure the type of connections.
- Configure the Apache Airflow to write the logs.
- Scale out the apache airflow first with Celery, Dask and Mesos.
- Run Airflow with systemd and with upstart.

A simple workflow

Let’s see how we can implement a simple pipeline composed of two tasks.
The first task generates a .txt file with a word (“pipeline” in this case), a second task reads the file and decorate the line adding.
First, we define and initialise the DAG, then we will add two operators to the DAG.
The first one is a BashOperator which can basically run every bash command or script, the second one is an PythonOperator executing python code.

import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

OUTPUT_ONE_PATH = "data/output_one.txt")
OUTPUT_TWO_PATH = "data/output_two.txt")

def decorate_file(input_path, output_path):
    with open(input_path, "r") as in_file:
        line = in_file.read()

    with open(output_path, "w") as out_file:
        out_file.write("My "+line)

default_args = {
    "owner": "lorenzo",
    "depends_on_past": False,
    "start_date": datetime(2018, 9, 12),
    "email": ["l.peppoloni@gmail.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
}

//DAG
dag = DAG(
    "simple_dag",
    default_args=default_args,
    schedule_interval="0 12 * * *",
    )

//defining task
t1 = BashOperator(
    task_id="print_file",
    bash_command='echo "pipeline" > {}'.format(OUTPUT_ONE_PATH),
    dag=dag)

t2 = PythonOperator(
    task_id="decorate_file",
    python_callable=decorate_file,
    op_kwargs={"input_path": OUTPUT_ONE_PATH, "output_path": OUTPUT_TWO_PATH},
    dag=dag)

//task upstream
t1 >> t2
 
This is how a workflow can be designed using different operators & defining the task.
In our next blog of Airflow, we will be discussing Securing & implementation of Airflow.

If you like this blog, please do show your appreciation by hitting like button and sharing this blog. Also, drop any comments about the post & improvements if needed. Till then HAPPY LEARNING.

Comments