::: IT인터넷 :::

AirFlow DAG 소개와 기본 구조

곰탱이푸우 2022. 6. 13. 08:20
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
DAG에 대한 개념과 기본 구조에 대해 정리한다.
 
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.
 

AirFlow DAG 개념

DAG이란

AirFlow에서 실행할 작업들을 순서에 맞게 구성한 워크플로우(WorkFlow)를 의미한다.
Directed Acyclic Graph의 약자이며, DAG를 구성하는 각 작업들을 태스크(Task)라고 한다.
DAG는 Task의 관계와 종속성을 반영하여 구조화되어있다.
연결 된 화살표 방향 순서대로 태스크를 실행하고, 분기 실행과 병렬 실행이 가능하다.
 
AirFlow의 소개와 구조에서 다룬 그림을 다시 보면 다음과 같다.
 
일반적인 Python 코드로 정의하며, $AIRFLOW_HOME/dags 폴더에 위치한다.
변경이 필요한 경우 $AIRFLOW_HOME/airflow.cfg 파일에서 변경할 수 있다.
 
dags 폴더의 파이썬 파일(.py) 이름에  "airflow" 또는 "dag" 단어가 포함되어 있으면 AirFlow WebUI에 표시된다.
DAG_DISCOVERY_SAFE_MODE 옵션을 disable로 변경하면 모든 파이썬 파일을 인식하지만 권장사항은 아니다.
 

UI 소개

DAGs
WebUI에 최초 로그인하거나 상단의 DAGs를 클릭하면 DAG 목록이 출력된다.
 
Tree View
DAG 이름을 클릭하고 상단의 Tree View를 클릭하면 작업 목록을 트리 구조로 확인할 수 있다.
우측의 표는 실행 결과를 표시한다. 녹색은 성공, 분홍은 Skip, 연두색은 실행, 빨간색은 실패를 의미한다.
 
 
Graph View
DAG 이름을 클릭하고 상단의 Graph View를 클릭하면 작업 목록을 그래프 구조로 확인할 수 있다.
 
Variables
상단의 Admin - Variables를 클릭하면 DAG에서 공통적으로 사용하는 값을 사용하기 위한 기능이다.
DB URL이나 호스트명, 암호 등을 저장하고 사용하면 된다.
이후 변경이 발생하면 DAG 코드를 수정하지 않고 설정 값만 변경하면 되기 때문에 상당히 유용하다.
 
Task Duration
상단의 Task Duration 탭을 클릭하면 Task들의 실행 시간을 확인할 수 있다.
 
Gantt
상단의 Gantt를 클릭하면 특정 DAG Run의 Task별 실행 시간을 Gantt Chart로 확인할 수 있다.
Details
상단의 Details를 클릭하면 현재 DAG의 상세 정보를 확인할 수 있다.
DAG를 정의할 때 적용한 옵션들이 표시 되는 것을 확인할 수 있다.
 
Code
상단의 Code를 클릭하면 현재 DAG의 전체 코드를 확인할 수 있다.
 
 

DAG 코드 구조

DAG은 파이썬 코드로 정의한다.
 
파이썬 코드의 구조와 순서를 간단히 살펴보면 다음과 같다.
구분
내용
라이브러리 임포트
DAG과 워크플로우 구성에 필요한 라이브러리 선언
공통 변수 정의
DAG 구성에 사용하기 위해 공통으로 사용하는 변수 정의
변경이 자주 발생하는 경우 Variables 기능 활용
DAG 공통 속성값 정의
DAG을 정의하는데 필요한 공통 속성 값 정의
DAG 정의
DAG을 선언하고 공통 속성값 전달
Task 정의
DAG에 포함 될 각 작업(Task) 정의
Operator, Sensor, Hook 등을 사용
Task 배열
각 작업 (Task)들의 순서들을 나열
<<. >> 같은 Shift 연산자 사용
set_upstream, set_downstream 함수도 사용 가능
 
DAG 정의
파이썬 코드에서 DAG은 전역 변수로 정의해야 한다.
from airflow import DAG

dag = DAG(dag_id="dag_global")
 
아래와 같이 로컬 변수로 정의하면 AirFlow에서 인식하지 못한다.
from airflow import DAG

def test_func():
    dag = DAG(dag_id="dag_local")
 
DAG 공통 속성값 정의
DAG을 정의할 때 모든 작업(Task)에 공통적으로 적용할 값들은 default_args로 지정하여 사용할 수 있다.
파이썬의 Dictionary 타입이다.
from airflow import DAG
from datetime import datetime

# default_args 정의
args = {
    "owner": "testuser",
    "start_date": datetime(2022, 4, 7),
    # DAG 실행 중 오류 발생하면 failure callback으로 메일 발송하므로 비활성화
    "email_on_failure": False 
}

dag = DAG(
    dag_id="testdag_default_args",
    max_active_runs=1,  # DAG의 동시실행 방지
    default_args=args,  # default_args 전달
    # 분 시 일 월 요일 순으로 실행 주기 설정
    # 아래의 경우 매일 오전 8시 실행 (AirFlow에 설정 된 타임존 기준)
    schedule_interval="0 8 * * *", 
    # 이전 DAG이 오래 걸려서 다음 DAG 시작 시간을 초과한 경우
    # True면 이전 DAG 종료 이후 다음 DAG 실행, False면 이전 DAG 실행 중 다음 DAG 실행
    catchup=True)
 
catchup 기본 설정은 airflow.cfg 파일의 catchup_by_default 값을 변경한다.
 
 
Task 정의
Task는 Opearator, Sensor, Hook 등을 사용할 수 있다.
  • 지정한 작업을 수행하는 Operator를 대부분 사용한다.
  • Sensor는 어떤 결과를 만족하는지 주기적으로 체크할때 사용한다.
  • Hook은 DB나 서비스 같은 외부 시스템과 통신하기 위한 인터페이스를 제공하며 연결 상태를 유지한다.
 
정의할 때 전달 인자로 어느 DAG에 포함되는지 지정해야 한다.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

dag = DAG(dag_id="test_dag", start_date=datetime(2022, 4, 7))

operator = DummyOperator(
    task_id="dummy_test",
    dag=dag) # 위에 정의한 dag이라는 DAG에 포함
 
Task는 Bash, Python을 포함해서 다양한 작업을 수행할 수 있다.
아래 사이트를 참고한다.
또한 잘 알려진 서비스나 오픈소스 백엔드에 대한 작업을 수행할 수 있도록 다양한 Providers Package를 제공한다.
아래 사이트를 참고한다.
Task 배열
정의한 Task들의 실행 순서를 정의하는 과정이다.
Task 객체의 set_downstream (정방향), set_upstream (반대 방향) 함수를 사용할 수 있다.
 
그러나 일반적으로  >>와 << 연산자를 사용하여 순서를 정의하며 AirFlow 1.8 버전부터 지원한다.
 
다음과 같은 단순한 작업은 task1 >> task2 >> task3 으로 정의하면 된다.
 
다음과 같이 분기 되는 경우 task1 >> [task2, task3] >> task4 로 정의한다.

 

 
task2와 task3 자리에 다수의 작업이 위치하는 경우 다음과 같이 정의해도 된다.
task1 >> task2 >> ... >> task4
task1 >> task3 >> ... >> task4
 
그러면 AirFlow가 예쁘게 그려준다.
 
만약 아래와 같은 작업이 있는 경우 task1 >> task2 >> task4 << task3으로 정의할 수 있다.
반대 방향인 <<를 사용할 수 있지만 가독성이 좋지 않아 권하지 않는다.
 
 
이런 경우 앞뒤에 DummyOperator를 추가해서 정방향으로 만드는 것이 가독성에 도움이 된다.
 
코드로 표현하면 다음과 같다.
start >> task3 >> task4 >> end
start >> task1 >> task2 >> task4 >> end

# 아래와 같이 표현 가능하다.
start >> [task3, [task1, task2]] >> task4 >> end