Airflow의 핵심 개념인 DAG (Directed Acyclic Graph)는 작업(Task)들을 모아 실행 순서와 관계를 설정하여, 작업이 어떤 방식으로 실행되어야 하는지 정의하는 구조입니다. DAG 내의 작업들은 의존 관계와 순서가 있으며, 각 작업이 선행 작업의 완료 후에 실행되도록 구성됩니다. DAG는 Airflow에서 전체 워크플로의 청사진 역할을 하며, 특정한 실행 흐름을 따르게 만들어 줍니다.

DAG 예시

 

DAG 정의

DAG를 정의하는 방법에는 세가지가 있다.

  1. with 명령어를 사용. 대그 안에 암묵적으로 어떤 것이든 코드를 추가할 수 있다.
  2.  표준 생성자를 사용. DAG를 사용하는 모든 연산자에 전달할 수 있음.
  3. @dag 데코레이터 사용. 함수를 DAG 생성자로 변환할 때 사용. 
import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    EmptyOperator(task_id="task")

# 1. with 명령어를 사용하는 방법.

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

my_dag = DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)

# 2. 표준 생성자를 사용하는 방법

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

# 3. @dag 데코레이터를 사용하는 방법

Task 의존성

Task나 Operator은 혼자 존재하지 않는다. Task간의 의존성을 선언하는 것이 DAG의 구조를 형성합니다.
task 의존성을 선언하는 방법은 여러가지 있습니다. 3, 4번 방법은 만약 task의 리스트를 다른 리스트에 의존하게끔 만들고 싶을 때 사용. 

  1. >>, << 연산자 사용.
  2. set_upstream, set_downstream 메소드 사용. >>, << 연산자보다 더 명시적이다.
  3.  cross_downstream 메소드 사용
  4. chain 메소드 사용.
# 1
first_task >> [second_task, third_task]
third_task << fourth_task

# 2
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

# 3
from airflow.models.baseoperator import cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])

# 4-1
from airflow.models.baseoperator import chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])

# 4-2 동일한 크기에 대해 쌍별 종속성 수행
from airflow.models.baseoperator import chain

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)

 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow Architecture  (0) 2024.10.25
About Airflow  (0) 2024.06.08

Apache Airflow는 워크플로우 자동화 도구입니다. 워크플로우는 의존성이 있는 작업(Task)들의 집합으로, 이들을 다이렉트 그래프 형태로 관리합니다. 각 작업은 다른 작업의 실행에 의존할 수 있으며, 작업들이 상호작용하는 방식에 따라 워크플로우가 정의됩니다.

Airflow의 주요 개념

  • DAG (Directed Acyclic Graph) : 워크플로의 구조를 나타내며, 방향성 있고 순환하지 않는 그래프를 의미합니다. 각 노드는 작업(Task)이고, 간선은 작업 간의 의존 관계를 나타냅니다. DAG는 워크플로가 어떻게 실행되어야 하는지를 정의하지만, 작업을 실행하는 코드는 아닙니다.
  • Operator : 특정 작업을 정의하는 코드입니다. Operator는 ‘이 작업이 해야 할 일’을 결정하며, 그 자체로 작업의 인스턴스는 아닙니다. 여러 종류의 Operator가 있으며, 예를 들어 BashOperator는 Bash 스크립트를 실행하는 데 사용됩니다.
  • Task : 작업의 인스턴스입니다. DAG 내에서 Operator를 사용해 정의된 작업이 Task로 실행됩니다. 각 Task는 Operator에 의해 정의된 작업을 실행하게 됩니다.
  • Task Instance : 특정 Task가 특정 시점에 실행된 것을 나타내는 객체입니다. Task Instance는 DAG 실행 중에 특정 Task가 언제, 어떻게 실행되었는지를 추적합니다.

Airflow Components

Required Components

  • Scheduler : Airflow에서 DAG와 Task를 관리하는 핵심 역할을 하는 컴포넌트로, Task의 의존성을 확인하고, 실행할 Task를 결정하여 실행시킴.
  • Executer : Task를 실행하는 컴포넌트입니다. Executor는 여러 종류가 있으며, LocalExecutor는 로컬 머신에서 Task를 실행하는 반면, CeleryExecutor는 분산 환경에서 Task를 실행할 수 있습니다. 별도의 구성 요소가 아니며 스케줄러 프로세스 내에서 실행됨.
  • Webserver : DAG 및 작업의 동작을 검사, 트리거, 디버그 할 수 있는 편리한 사용자 인터페이스를 제공.
  • A folder of dag files : 스케줄러가 어떤 작업을 실행할지, 언제 실행할지를 결정.
  • A metadata database : Airflow 구성 요소가 워크플로우와 작업의 상태를 저장하는 데에 사용됨. 이 컴포넌트의 설정은 데이터베이스 백엔드 설정에서 설명되며 Airflow가 작동하는 데에 필요함.

Optional Components

  • Optional worker : 스케줄러가 제공하는 작업을 실행합니다. 기본 설치에서는 작업자가 스케줄러의 일부일 수 있으며 별도의 구성 요소가 아닙니다. CeleryExecutor에서 장기 실행 프로세스로 실행되거나 KubernetesExecutor의 POD로 실행될 수 있습니다.
  • Optional trigger : asyncio 이벤트 루프에서 지연된 작업을 실행합니다. 기본 설치에서는 지연된 작업이 사용되지 않는 경우 트리거러가 필요하지 않습니다. 지연된 작업에 대한 자세한 내용은 지연 가능한 연산자 및 트리거에서 확인할 수 있습니다.
  • Optional dag processor : 선택적 DAG 프로세서로, DAG 파일을 파싱하고 메타데이터 데이터베이스에 직렬화합니다. 기본적으로 DAG 프로세서 프로세스는 스케줄러의 일부이지만, 확장성과 보안상의 이유로 별도의 구성 요소로 실행될 수 있습니다. DAG 프로세서가 존재하는 경우 스케줄러는 DAG 파일을 직접 읽을 필요가 없습니다. DAG 파일 처리에 대한 자세한 내용은 DAG 파일 처리에서 확인할 수 있습니다.
  • Optional folder of plugins : 플러그인의 선택적 폴더입니다. 플러그인은 Airflow의 기능을 확장하는 방법입니다(설치된 패키지와 유사합니다). 플러그인은 스케줄러, DAG 프로세서, 트리거 및 웹 서버에 의해 읽힙니다. 플러그인에 대한 자세한 내용은 플러그인에서 확인할 수 있습니다.

 

Airflow는 이러한 핵심 개념들을 통해 복잡한 워크플로를 정의하고 실행할 수 있게 해주며, 사용자는 Python을 이용해 DAG와 Task를 정의하게 됩니다.

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow DAGs  (1) 2024.10.29
About Airflow  (0) 2024.06.08

Airflow란

Apache Airflow는 배치 지향적 워크플로우의 개발, 스케쥴링, 모니터링을 지원하는 오픈 소스 플랫폼이다.

Airflow 특징

  • Dynamic : 파이썬 코드로 구성되어 있어 동적 파이프라인 생성을 가능하게 합니다.
  • Extensible : 다양한 기술과 연결할 수 있는 operator가 포함되어 있다. 모든 Airflow 컴포넌트들은 확장이 가능하여 다양한 환경에 쉽게 할 수 있습니다.
  • Flexible : 워크플로우 매개변수화는 Jinja 템플릿 엔진을 활용하여 내장되어 있습니다.

 

Airflow 를 사용해야 하는 이유

Airflow®는 배치 워크플로우 오케스트레이션 플랫폼입니다. Airflow 프레임워크는 여러 기술과 연결할 수 있는 연산자를 포함하고 있으며, 새로운 기술과 연결하기 위해 쉽게 확장할 수 있습니다. 워크플로우에 명확한 시작과 끝이 있고 정기적으로 실행된다면, Airflow DAG로 프로그래밍할 수 있습니다.

워크플로우는 파이썬 코드로 정의되며, 이는 다음을 의미합니다:

  • 버전 관리에 저장할 수 있어 롤백이 가능합니다.
  • 여러 사람이 동시에 개발할 수 있습니다.
  • 기능을 검증하기 위해 테스트를 작성할 수 있습니다.
  • 구성 요소는 확장 가능하며 기존 구성 요소의 광범위한 컬렉션을 기반으로 구축할 수 있습니다.

 

Airflow 설치 방법

  • pip 설치 방법
    • 제약사항 추가하는 방법은 일반적으로 사용하는 방법.
      - 제약사항이 필요한 이유는 Airflow가 라이브러리면서 애플리케이션입니다. 보통 버전 의존도는 라이브러리는 열려있고 애플리케이션은 열려있다. Airflow는 열려있는데 이 때문에 때떄로 설치 코드가 작동하지 않거나 사용 불가능한 Airflow가 설치될 수 있습니다. 
    • 제약사항이 없는 방법은 필요에따라 업(다운)그레이드가 가능.
pip install "apache-airflow[celery]==2.10.2" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.8.txt"
# 제약사항 추가
pip install "apache-airflow==2.10.2" apache-airflow-providers-google==10.1.0
# 제약사항 X
  • Docker 설치 방법
    • Docker Hub에서 Apache에서 공식으로 제공되는 Docker Image가 있습니다.
docker pull apache/airflow

 

https://airflow.apache.org/docs/apache-airflow/stable/index.html

 

What is Airflow®? — Airflow Documentation

 

airflow.apache.org

 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow DAGs  (1) 2024.10.29
Airflow Architecture  (0) 2024.10.25

+ Recent posts