::: IT인터넷 :::

AirFlow DAG의 Hooks, Pools, Branching, Trigger Rules

곰탱이푸우 2022. 6. 27. 08:20
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
이번에는 DAG의 구성 요소에 대해 정리한다.
 
DAG 소개와 기본 구조는 아래 포스팅을 참고한다.
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.
 

DAG 구성 요소

AirFlow에서 가장 중요한 것은 앞에서 다룬 DAG이다.
실제로 AirFlow에서 실행할 작업들을 정의한 것이기 때문이다.
 
DAG에 대한 설명은 아래 포스팅의 DAG이란 항목을 참고한다.
DAG을 구성하는 요소들은 DAG Runs, Tasks, TaskFlow, Hooks, Pools, Branching, Trigger Rules, Jinja Templating, Xcom, Variable, SubDags 등이 있다.
본 포스팅에서는 Hooks, Pools, Branching, Trigger Rules에 대해 정리한다.
 

Hooks

Hook은 DB나 서비스 같은 외부 시스템과 통신하기 위한 인터페이스를 제공하며 연결 상태를 유지한다.
 
일반적으로 데이터베이스나 클라우드에 데이터가 저장되어 있는 경우가 많다.
DAG에서 이러한 서비스에 연결해서 작업을 수행하기 위해서는 연결 상태를 유지해야 한다.
 
단순히 쿼리만 하는 경우에는 Opeartor로 해결 가능하다.
그러나 쿼리 결과를 가공하거나 추가 쿼리가 필요한 경우 Hook을 사용하면 유용하다.
Hook은 Operator 내부에서 사용하며, Jupyter Notebook 같은 대화형 환경에서 상당히 편리하다.
자세한 사항은 아래 문서를 참고한다.
 

Operators and Hooks Reference — Airflow Documentation

 

airflow.apache.org

 

 

아래 포스팅의 Sensor에서 다룬 Connection을 참고하여 Connection을 설정한다.
PostgreSQL을 사용하는 경우 아래와 같이 Operator와 Hook을 작성할 수 있다.
Connection에 Postgres 연결 설정을 했다고 가정한다.
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime

dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))

start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

def test_query():
    hook = PostgresHook(postgres_conn_id='postgres_test')
    rows = hook.get_records("SELECT * FROM table_name LIMIT 10;")
    for row in rows:
        print(row)

hook_test = PythonOperator(
    task_id='run_query_with_python',
    python_callable=test_query,
    dag=dag)

start >> hook_test >> end
 
PostgresHook와 같은 Proviers Package를 설치해야 한다.
아래 사이트를 참고한다.
 

Pools

AirFlow는 스케줄러에 의해 설정 된 일정에 따라 동작하며, 동시에 여러 DAG이 실행될 수 있다.
Pools는 병렬 실행되는 태스크의 수를 제한하는 역할을 한다.
 
WebUI에서 추가하거나 개수 변경이 가능하다.
최대 지정된 슬롯 숫자만큼 동시에 실행이 가능하다.
 
AirFlow의 Admin - Pools 메뉴를 클릭한다.
 
기본적으로 default_pool은 128로 설정되어 있고, 소규모 조직에서는 충분히 사용 가능한 숫자이다.
최대 슬롯 개수를 초과하면 큐(Queue)에 등록되고 순차적으로 처리된다.
 
지정된 Pool에 속한 task의 실행 개수를 제한하려면, Pool의 이름과 동시에 실행가능한 개수를 설정하여 생성한다.
그리고 동시에 실행하는 것이 불가능한 경우, priority_weight 옵션으로 먼저 수행할 Task의 우선 순위를 지정할 수 있다.
 

Branching

AirFlow는 아래 그림과 같이 워크플로우가 2개 이상으로 나눠지는 분기점 역할을 하는 Branch 기능을 제공한다.
 
자세한 내용은 아래 기술 문서를 참고한다.
 

DAGs — Airflow Documentation

 

airflow.apache.org

 
Branch는 크게 아래와 같이 두 가지로 구분된다.
 
 

분기 되는 작업을 모두 수행

아래 포스팅의 Dummy Operator에서 TaskGroup 설명 과정에서 다룬 적이 있다.
DAG에서 수행할 Task의 순서를 지정할 때 대괄호 ([])로 묶으면 분기할 수 있1다.
 
자세한 내용은 아래 포스팅의 Task 배열 부분을 참고한다.
또한 TaskGroup은 아래 포스팅의 Dummy Operator 부분을 참고한다.

특정 조건을 만족하는 경우

특정 조건의 만족 여부에 따라 분기 방향을 결정하는 일종의 조건문 역할을 한다.
조건을 확인하고 방향을 결정하는 Operator가 필요한데, 대부분 BranchPythonOperator를 사용한다.
 
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.python

class BranchPythonOperator(
    python_callable: Callable, 
    op_args: Optional[Collection[Any]] = None, 
    op_kwargs: Optional[Mapping[str, Any]] = None, 
    templates_dict: Optional[Dict] = None, 
    templates_exts: Optional[List[str]] = None, 
    **kwargs
)
 
전달 인자는 다음과 같다.
구분
타입
기본값
내용
python_callable
Callable
 
호출할 파이썬 개체 (함수, 클래스)
op_args
Optional[Collection[Any]]
None
파이썬 개체에 전달할 명시적 (explict) 인자
op_kwargs
Optional[Mapping[str, Any]]
None
파이썬 개체에 전달할 키워드 인자 (krawgs)
templates_dict
Optional[Dict]
None
AirFlow 엔진이 생성한 템플릿으로 전달되는
Dictionary 타입의 값 목록
templates_exts
Optional[List[str]]
None
템플릿 필드를 처리하는 동안 확인할 파일 확장자 목록
provide_context
bool
False
True로 설정하면
Task 인스턴스의 attribute들을 kwargs로 사용 가능
BaseOperator에 정의 되어 있다
 
python_callable에서 DAG 코드 외부의 함수나 클래스를 사용하는 경우, DAG 코드 상단에 import 되어 있어야 한다.
 
자세한 사항은 아래 페이지를 참고한다.
 

airflow.operators.python — Airflow Documentation

 

airflow.apache.org

 

 

Python Operator, Python Sensor와 사용법이 거의 비슷하다.
Python Callable의 반환(리턴) 값이 다음에 수행할 Task 의 이름 (task_id)라는 점이 다르다.
 
Python Operator와 Python Sensor는 아래 포스팅을 참고한다.
다음과 같이 사용할 수 있다.
from airflow.operators.python import BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
import random
from datetime import datetime

default_args= { 
    "owner": "testuser", 
    "start_date": datetime(2022, 4, 7), 
    # DAG 실행 중 오류 발생하면 failure callback으로 메일 발송하므로 비활성화 
    "email_on_failure": False
}

dag = DAG( 
        dag_id = "test_branch", 
        default_args = default_args, 
        schedule_interval = "@once", 
)

start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

### BranchPythonOperator 예제 
def check_condition():  # PythonSensor의 callback 함수
    num = random.randint(0, 10)
    print(num)
    if num > 6:
        return "print_greater"
    else:
        return "print_smaller"

python_check = BranchPythonOperator(
    task_id = "branch_test",
    python_callable = check_condition,
    dag = dag)

print_greater = BashOperator(
    task_id = "print_greater",
    bash_command = "echo value is greater than 6",
    dag = dag)

print_smaller = BashOperator(
    task_id = "print_smaller", 
    bash_command = "echo value is smaller than 6", 
    dag = dag)

start >> python_check >> [print_greater, print_smaller] >> end
 
실행하면 랜덤 값이 5가 나와서 print_smaller를 반환하고 print_greater를 생략한다.
 
DAG를 확인해보면 print_smaller를 실행하고 print_greater를 생략한 것을 확인할 수 있다.

 

 

Trigger Rules

일반적으로 Task는 이전 Task들이 성공할 때만 실행된다.
다르게 설명하면 앞서 실행 된 Task들의 결과에 대한 의존성이 존재한다.
보다 복잡한 의존성 설정을 위한 다양한 Trigger Rule 들이 존재한다.
 
Trigger Rules의 종류는 다음과 같다.
구분
내용
all_success (Default)
이전 Task 전체가 성공한 경우
all_failed
이전 Task 전체 또는 직전 Task가 실패한 경우
all_done
이전 Task 실행이 모두 완료된 경우
one_failed
이전 Task가 한 개 이상 실패한 경우
(이전 Task가 모두 완료될 때까지 대기하지 않음)
one_success
이전 Task가 한 개 이상 성공한 경우
(이전 Task가 모두 완료될 때까지 대기하지 않음)
none_failed
이전 Task에 실패가 없는 경우 (성공 또는 스킵)
none_failed_min_one_success
이전 Task에 실패가 없고 최소한 한 개 이상 성공한 경우
none_skipped
이전 Task에 스킵 상태가 없는 경우 (성공 또는 실패)
always
항상 실행 (종속성 없음)
 
자세한 내용은 아래 기술 문서를 참고한다.
 

DAGs — Airflow Documentation

 

airflow.apache.org

 

 

예를 들어 앞서 살펴 본 BranchPythonOperator에서 DummyOperator인 마지막 end Task는 실행되지 않고 생략된다.
기본 설정이 all_success이기 때문이다.
 
이러한 경우 end Task의 Trigger Rule을 none_failed_min_one_success로 변경하면 항상 실행된다.
end Task의 코드에 다음과 같이 Trigger Rule을 추가한다.
# 변경 전
# end = DummyOperator(task_id="end", dag=dag)

# 변경 후
end = DummyOperator(task_id="end", dag=dag,
    trigger_rule='none_failed_min_one_success')
 
end Task가 녹색으로 변한 것을 확인할 수 있다.