out_of_anjoong 2024. 10. 29. 15:55

Airflow의 핵심 개념인 DAG (Directed Acyclic Graph)는 작업(Task)들을 모아 실행 순서와 관계를 설정하여, 작업이 어떤 방식으로 실행되어야 하는지 정의하는 구조입니다. DAG 내의 작업들은 의존 관계와 순서가 있으며, 각 작업이 선행 작업의 완료 후에 실행되도록 구성됩니다. DAG는 Airflow에서 전체 워크플로의 청사진 역할을 하며, 특정한 실행 흐름을 따르게 만들어 줍니다.

DAG 예시

 

DAG 정의

DAG를 정의하는 방법에는 세가지가 있다.

  1. with 명령어를 사용. 대그 안에 암묵적으로 어떤 것이든 코드를 추가할 수 있다.
  2.  표준 생성자를 사용. DAG를 사용하는 모든 연산자에 전달할 수 있음.
  3. @dag 데코레이터 사용. 함수를 DAG 생성자로 변환할 때 사용. 
import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    EmptyOperator(task_id="task")

# 1. with 명령어를 사용하는 방법.

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

my_dag = DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)

# 2. 표준 생성자를 사용하는 방법

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

# 3. @dag 데코레이터를 사용하는 방법

Task 의존성

Task나 Operator은 혼자 존재하지 않는다. Task간의 의존성을 선언하는 것이 DAG의 구조를 형성합니다.
task 의존성을 선언하는 방법은 여러가지 있습니다. 3, 4번 방법은 만약 task의 리스트를 다른 리스트에 의존하게끔 만들고 싶을 때 사용. 

  1. >>, << 연산자 사용.
  2. set_upstream, set_downstream 메소드 사용. >>, << 연산자보다 더 명시적이다.
  3.  cross_downstream 메소드 사용
  4. chain 메소드 사용.
# 1
first_task >> [second_task, third_task]
third_task << fourth_task

# 2
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

# 3
from airflow.models.baseoperator import cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])

# 4-1
from airflow.models.baseoperator import chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])

# 4-2 동일한 크기에 대해 쌍별 종속성 수행
from airflow.models.baseoperator import chain

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)