실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
이번에는 DAG의 구성 요소에 대해 정리한다.
DAG 소개와 기본 구조는 아래 포스팅을 참고한다.
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.
DAG 구성 요소
AirFlow에서 가장 중요한 것은 앞에서 다룬 DAG이다.
실제로 AirFlow에서 실행할 작업들을 정의한 것이기 때문이다.
DAG에 대한 설명은 아래 포스팅의 DAG이란 항목을 참고한다.
DAG을 구성하는 요소들은 DAG Runs, Tasks, TaskFlow, Hooks, Pools, Branching, Trigger Rules, Jinja Templating, Xcom, Variable, SubDags 등이 있다.
본 포스팅에서는 Hooks, Pools, Branching, Trigger Rules에 대해 정리한다.
Hooks
Hook은 DB나 서비스 같은 외부 시스템과 통신하기 위한 인터페이스를 제공하며 연결 상태를 유지한다.
일반적으로 데이터베이스나 클라우드에 데이터가 저장되어 있는 경우가 많다.
DAG에서 이러한 서비스에 연결해서 작업을 수행하기 위해서는 연결 상태를 유지해야 한다.
단순히 쿼리만 하는 경우에는 Opeartor로 해결 가능하다.
그러나 쿼리 결과를 가공하거나 추가 쿼리가 필요한 경우 Hook을 사용하면 유용하다.
Hook은 Operator 내부에서 사용하며, Jupyter Notebook 같은 대화형 환경에서 상당히 편리하다.
자세한 사항은 아래 문서를 참고한다.
아래 포스팅의 Sensor에서 다룬 Connection을 참고하여 Connection을 설정한다.
PostgreSQL을 사용하는 경우 아래와 같이 Operator와 Hook을 작성할 수 있다.
Connection에 Postgres 연결 설정을 했다고 가정한다.
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime
dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)
def test_query():
hook = PostgresHook(postgres_conn_id='postgres_test')
rows = hook.get_records("SELECT * FROM table_name LIMIT 10;")
for row in rows:
print(row)
hook_test = PythonOperator(
task_id='run_query_with_python',
python_callable=test_query,
dag=dag)
start >> hook_test >> end
PostgresHook와 같은 Proviers Package를 설치해야 한다.
아래 사이트를 참고한다.
Pools
AirFlow는 스케줄러에 의해 설정 된 일정에 따라 동작하며, 동시에 여러 DAG이 실행될 수 있다.
Pools는 병렬 실행되는 태스크의 수를 제한하는 역할을 한다.
WebUI에서 추가하거나 개수 변경이 가능하다.
최대 지정된 슬롯 숫자만큼 동시에 실행이 가능하다.
AirFlow의 Admin - Pools 메뉴를 클릭한다.

기본적으로 default_pool은 128로 설정되어 있고, 소규모 조직에서는 충분히 사용 가능한 숫자이다.
최대 슬롯 개수를 초과하면 큐(Queue)에 등록되고 순차적으로 처리된다.

지정된 Pool에 속한 task의 실행 개수를 제한하려면, Pool의 이름과 동시에 실행가능한 개수를 설정하여 생성한다.
그리고 동시에 실행하는 것이 불가능한 경우, priority_weight 옵션으로 먼저 수행할 Task의 우선 순위를 지정할 수 있다.
Branching
AirFlow는 아래 그림과 같이 워크플로우가 2개 이상으로 나눠지는 분기점 역할을 하는 Branch 기능을 제공한다.

자세한 내용은 아래 기술 문서를 참고한다.
DAGs — Airflow Documentation
airflow.apache.org
Branch는 크게 아래와 같이 두 가지로 구분된다.
분기 되는 작업을 모두 수행
아래 포스팅의 Dummy Operator에서 TaskGroup 설명 과정에서 다룬 적이 있다.
DAG에서 수행할 Task의 순서를 지정할 때 대괄호 ([])로 묶으면 분기할 수 있1다.
자세한 내용은 아래 포스팅의 Task 배열 부분을 참고한다.
또한 TaskGroup은 아래 포스팅의 Dummy Operator 부분을 참고한다.
특정 조건을 만족하는 경우
특정 조건의 만족 여부에 따라 분기 방향을 결정하는 일종의 조건문 역할을 한다.
조건을 확인하고 방향을 결정하는 Operator가 필요한데, 대부분 BranchPythonOperator를 사용한다.
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.python
class BranchPythonOperator(
python_callable: Callable,
op_args: Optional[Collection[Any]] = None,
op_kwargs: Optional[Mapping[str, Any]] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[List[str]] = None,
**kwargs
)
전달 인자는 다음과 같다.
구분
|
타입
|
기본값
|
내용
|
python_callable
|
Callable
|
|
호출할 파이썬 개체 (함수, 클래스)
|
op_args
|
Optional[Collection[Any]]
|
None
|
파이썬 개체에 전달할 명시적 (explict) 인자
|
op_kwargs
|
Optional[Mapping[str, Any]]
|
None
|
파이썬 개체에 전달할 키워드 인자 (krawgs)
|
templates_dict
|
Optional[Dict]
|
None
|
AirFlow 엔진이 생성한 템플릿으로 전달되는
Dictionary 타입의 값 목록 |
templates_exts
|
Optional[List[str]]
|
None
|
템플릿 필드를 처리하는 동안 확인할 파일 확장자 목록
|
provide_context
|
bool
|
False
|
True로 설정하면
Task 인스턴스의 attribute들을 kwargs로 사용 가능 BaseOperator에 정의 되어 있다
|
python_callable에서 DAG 코드 외부의 함수나 클래스를 사용하는 경우, DAG 코드 상단에 import 되어 있어야 한다.
자세한 사항은 아래 페이지를 참고한다.
Python Operator, Python Sensor와 사용법이 거의 비슷하다.
Python Callable의 반환(리턴) 값이 다음에 수행할 Task 의 이름 (task_id)라는 점이 다르다.
Python Operator와 Python Sensor는 아래 포스팅을 참고한다.
AirFlow의 Operator 이해하기
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다. 이번에는 DAG에서 수행하는 작업을 의미하는 Operator에 대해 정리한다. DAG 소개와 기본 구조는 아래
www.bearpooh.com
AirFlow의 Sensor 이해하기
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다. 이번에는 DAG에서 수행하는 작업을 의미하는 Sensor에 대해 정리한다. DAG 소개와 기본 구조는 아래 포
www.bearpooh.com
다음과 같이 사용할 수 있다.
from airflow.operators.python import BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
import random
from datetime import datetime
default_args= {
"owner": "testuser",
"start_date": datetime(2022, 4, 7),
# DAG 실행 중 오류 발생하면 failure callback으로 메일 발송하므로 비활성화
"email_on_failure": False
}
dag = DAG(
dag_id = "test_branch",
default_args = default_args,
schedule_interval = "@once",
)
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)
### BranchPythonOperator 예제
def check_condition(): # PythonSensor의 callback 함수
num = random.randint(0, 10)
print(num)
if num > 6:
return "print_greater"
else:
return "print_smaller"
python_check = BranchPythonOperator(
task_id = "branch_test",
python_callable = check_condition,
dag = dag)
print_greater = BashOperator(
task_id = "print_greater",
bash_command = "echo value is greater than 6",
dag = dag)
print_smaller = BashOperator(
task_id = "print_smaller",
bash_command = "echo value is smaller than 6",
dag = dag)
start >> python_check >> [print_greater, print_smaller] >> end
실행하면 랜덤 값이 5가 나와서 print_smaller를 반환하고 print_greater를 생략한다.

DAG를 확인해보면 print_smaller를 실행하고 print_greater를 생략한 것을 확인할 수 있다.

Trigger Rules
일반적으로 Task는 이전 Task들이 성공할 때만 실행된다.
다르게 설명하면 앞서 실행 된 Task들의 결과에 대한 의존성이 존재한다.
보다 복잡한 의존성 설정을 위한 다양한 Trigger Rule 들이 존재한다.
Trigger Rules의 종류는 다음과 같다.
구분
|
내용
|
all_success (Default)
|
이전 Task 전체가 성공한 경우
|
all_failed
|
이전 Task 전체 또는 직전 Task가 실패한 경우
|
all_done
|
이전 Task 실행이 모두 완료된 경우
|
one_failed
|
이전 Task가 한 개 이상 실패한 경우
(이전 Task가 모두 완료될 때까지 대기하지 않음)
|
one_success
|
이전 Task가 한 개 이상 성공한 경우
(이전 Task가 모두 완료될 때까지 대기하지 않음)
|
none_failed
|
이전 Task에 실패가 없는 경우 (성공 또는 스킵)
|
none_failed_min_one_success
|
이전 Task에 실패가 없고 최소한 한 개 이상 성공한 경우
|
none_skipped
|
이전 Task에 스킵 상태가 없는 경우 (성공 또는 실패)
|
always
|
항상 실행 (종속성 없음)
|
자세한 내용은 아래 기술 문서를 참고한다.
예를 들어 앞서 살펴 본 BranchPythonOperator에서 DummyOperator인 마지막 end Task는 실행되지 않고 생략된다.
기본 설정이 all_success이기 때문이다.
이러한 경우 end Task의 Trigger Rule을 none_failed_min_one_success로 변경하면 항상 실행된다.
end Task의 코드에 다음과 같이 Trigger Rule을 추가한다.
# 변경 전
# end = DummyOperator(task_id="end", dag=dag)
# 변경 후
end = DummyOperator(task_id="end", dag=dag,
trigger_rule='none_failed_min_one_success')
end Task가 녹색으로 변한 것을 확인할 수 있다.

'::: IT인터넷 :::' 카테고리의 다른 글
AirFlow의 DAG을 파이썬 패키지로 구성하기 (1) (0) | 2022.07.04 |
---|---|
AirFlow DAG의 Jinja Templating, Xcom, Variable, SubDags (0) | 2022.06.30 |
AirFlow DAG의 DAG Runs, Task, TaskFlow (0) | 2022.06.23 |
AirFlow의 Sensor 이해하기 (0) | 2022.06.20 |
AirFlow의 Operator 이해하기 (26) | 2022.06.16 |