::: IT인터넷 :::

AirFlow DAG의 DAG Runs, Task, TaskFlow

곰탱이푸우 2022. 6. 23. 08:20
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
이번에는 DAG의 구성 요소에 대해 정리한다.
 
DAG 소개와 기본 구조는 아래 포스팅을 참고한다.
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.
 

DAG 구성 요소

AirFlow에서 가장 중요한 것은 앞에서 다룬 DAG이다.
실제로 AirFlow에서 실행할 작업들을 정의한 것이기 때문이다.
 
DAG에 대한 설명은 아래 포스팅의 DAG이란 항목을 참고한다.
DAG을 구성하는 요소들은 DAG Runs, Tasks, TaskFlow, Hooks, Pools, Branching, Trigger Rules, Jinja Templating, Xcom, Variable, SubDags 등이 있다.
본 포스팅에서는 DagRuns와 Tasks, TaskFlow에 대해 알아본다.
 

DAG Runs

DAG Run은 Task 인스턴스들을 DAG에 정의 된 특정 execution_date 에 실행하는 DAG의 인스턴스이다.
단순하게 표현하면 DAG의 실행 이력을 의미한다. 실행 중이거나 종료된 상태를 의미한다.
 
execution_date라는 용어는 2.2 버전부터 logical_date로 변경되었다.
그러나 본문에서는 편의상 기존에 사용하던 용어인 execution_date를 계속 사용한다.
 
DAG는 AirFlow 스케줄러 또는 외부 Trigger에 의해 실행될 수 있다.
또한 execution_date가 다른 여러 개의 DAG가 동시에 실행 될 수 있다.
 
DAG Runs는 WebUI의 Browse - DAG Runs 메뉴에서 확인 가능하다.
 
아래와 같이 실행 중이거나 종료 된 DAG 실행 이력이 표시된다.
 
각 컬럼별 의미는 다음과 같다.
구분
내용
Status
DAG 실행 결과
Dag id
DAG의 이름 (식별자)
Logical Date
DAG이 실행 된 자체적인 논리적 시간 (기존의 execution_date)
Run id
DAG을 실행한 인스턴스의 이름 (식별자), 수동 실행인 경우 manual Prefix 추가
Run Type
스케줄러에 의한 자동 실행 여부, 수동 실행은 Manual 표시
Queued At
AirFlow Pools에 추가 된 시간
Start Date / End Date
DAG이 실행되고 종료 된 실제 시간 (Real-world)
External Trigger
DAG UI에서 ▶ 를 클릭하여 수동으로 실행했는지 여부
Conf
DAG UI에서 ▶ 버튼 아래의 Trigger DAG w/ config으로 실행할 때 전달한 변수 값
 
자세한 내용은 아래 페이지를 참고한다.
 

Cron Presets

DAG을 실행하기 위한 스케줄링에 사용하며, DAG을 정의할때 schedule_interval 인자로 전달한다.
Unix/Linux의 작업 스케줄러에서 사용하는 Crontab의 스케줄링 포맷을 사용할 수 있다.
 
분 시 일 월 요일 순으로 실행 주기를 설정한다.
preset
meaning
cron
data_interval (example)
None
스케줄링 없이 직접 DAG 실행
 
 
@once
한번만 실행
 
 
@hourly
한 시간에 한 번 실행
0 * * * *
00:00:00 ~ 00:59:00 (매시간)
@daily
하루에 한 번 실행 (00시)
0 0 * * *
00:00:00 ~ 23:59:00 (매일)
@weekly
일주일에 한 번 실행 (일요일 00시)
0 0 * * 0
2022-04-10 00:00:00 ~ 2022-04-16 23:59:00
@monthly
한 달에 한번 실행 (매월 1일 00시)
0 0 1 * *
2022-04-01 00:00:00 ~ 2022-04-30 23:59:00
@quarterly
분기 별로 한번 실행 (분기 첫 번째 날 00시)
0 0 1 */3 *
2022-04-01 00:00:00 ~ 2022-06-30 23:59:00
@yearly
1년에 한번 실행 (1월 1일 00시)
0 0 1 1 *
2022-01-01 00:00:00 ~ 2022-12-31 23:59:00
 
또는 위의 설정들을 조합하여 다양하게 표현 가능하다.
 
참고로 Cron Presets는 분 부터 설정 가능하므로 초 단위는 표시되지 않는다.
분은 0~59, 시는 0~23, 일은 1~31, 월은 1~12, 요일은 0~6까지 설정 할 수 있다.
 
Cron Expression은 아래 페이지를 참고한다.
data_interval은 execution_date (Logical Date)에 의해 DAG이 작동하는 시간 범위를 나타낸다.
Daily인 경우 execution_date 시작 시간부터 24시간 동안이 data_interval에 해당한다.
 

Catchup

DAG이 오래 걸려서 다음 DAG 시작 시간보다 오래 걸릴 경우, 예정대로 다음 DAG을 실행할지 결정하는 옵션이다.
True면 이전 DAG이 종료 된 이후 다음 DAG을 실행한다. 기본 값으로 설정되어 있다.
False면 이전 DAG이 실행 중이더라도 예정대로 다음 DAG을 실행한다.
 
DAG을 정의할 때 catchup 인자를 전달하면 된다.
from airflow import DAG 
from datetime import datetime

args= { 
    "owner": "testuser", 
    "start_date": datetime(2022, 4, 7), 
    # DAG 실행 중 오류 발생하면 failure callback으로 메일 발송하므로 비활성화 
    "email_on_failure": False 
}

dag = DAG(dag_id="catchup_test",
    default_args=args,
    start_date=datetime(2022, 4, 7),
    description="test DAG for catchup",
    schedule_interval="@daily",
    catchup=False)
 
자세한 내용은 아래 기술 문서의 DAG 클래스 정의 부분을 참고한다.
 

Backfill

AirFlow의 start_date (logical_date)에 관계 없이 지정한 과거 기간에 대해 DAG을 실행할 수 있다.
과거의 데이터를 생성해서 채워 넣거나, 과거 특정 시점의 데이터 생성을 요구 받은 경우 유용하다.
 
이러한 경우 Jinja Template의 ds를 통해 AirFlow에서 실행되는 작업들의 날짜를 제어해야 한다.
예를 들어 SQL 쿼리에 CURRENT_DATE()를 사용하면 모든 과거 작업에 현재 날짜로 쿼리 될 것이라서 의미가 없다.
 
WebUI에서는 지원하지 않고 CLI를 통해 사용할 수 있다.
backfill 명령어 사용법은 다음과 같다.
$ airflow dags backfill
usage: airflow dags backfill [-h] [-c CONF] [--delay-on-limit DELAY_ON_LIMIT] [-x] [-n]
                             [-e END_DATE] [-i] [-I] [-l] [-m] [--pool POOL] 
                             [--rerun-failed-tasks] [--reset-dagruns] [-B] [-s START_DATE]
                             [-S SUBDIR] [-t TASK_REGEX] [-v] [-y] dag_id
                             
Run subsections of a DAG for a specified date range.
If reset_dag_run option is used, backfill will first prompt users whether airflow should clear
all the previous dag_run and task_instances within the backfill date range.
If rerun_failed_tasks is used, backfill will auto re-run the previous failed task instances
within the backfill date range
 
backfill은 기본적으로 DAG에서 실행되지 않은 날짜들의 작업만 실행한다.
만약 특정 기간의 작업들을 모두 실행하고자 한다면 --reset_dagruns 옵션을 사용해야 한다.
 
다음과 같이 사용한다.
# 지정한 기간 동안 backfill 수행 (수행하지 않은 날짜만)
$ airflow dags backfill --start-date START_DATE --end-date END_DATE dag_id

# 지정한 기간 동안 backfill 수행 (모든 날짜)
$ airflow dags backfill --start-date START_DATE --end-date END_DATE --reset-dagruns dag_id

# 지정한 기간 동안 backfill로 실패한 날짜의 작업들만 재실행
$ airflow dags backfill --start-date START_DATE --end-date END_DATE --rerun-failed-tasks dag_id
 

Re-run Tasks

예약 된 실행 중 일부 작업이 실패했거나, 데이터에 변경이 발생해서 특정 시점 작업을 다시 수행해야 할 수 있다.
기간이 길면 backfill로 수행하면 되지만, 특정 Task만 실행하고자 하는 경우에는 간단하게 WebUI에서 해결 할 수 있다.
 
DAG 실행 화면의 Tree 보기 또는 Graph 보기에서 실패한 Task를 클릭하면 Task의 세부 동작을 설정할 수 있는 팝업이 출력된다.
Clear 버튼을 클릭하여 실패한 Task를 삭제하면 AirFlow Scheduler에 의해 해당 Task가 재실행된다.
 
선택 가능한 삭제 대상 Task의 범위는 다음과 같다.
구분
내용
Past 
선택한 Task가 포함 된 DAG Run과 이전 DAG Run의 동일한 모든 Task
Future 
선택한 Task가 포함 된 DAG Run과 이후 DAG Run의 동일한 모든 Task
Upstream 
선택한 Task가 포함 된 DAG Run에서 선택한 Task 앞에 실행 된 모든 Task
Downstream 
선택한 Task가 포함 된 DAG Run에서 선택한 Task 뒤에 실행 될 모든 Task
Recursive 
DAG간의 종속성 설정이 되어 있는 경우 부모 DAG과 자식 DAG의 모든 Task
Failed 
선택한 Task 인스턴스의 실패한 Task만 삭제
 
Past, Future, Upstream, Downastream을 그림으로 표현하면 다음과 같다.
 
Task 재실행은 CLI에서도 제공하지만 WebUI에서 충분히 사용 가능하므로 생략한다.
 
 

External Triggers

AirFlow의 Scheduler에 의해 실행되는 것을 Internal Trigger라 하고, 그 외에 수동으로 DAG을 실행하는 것을 의미한다.
 
WebUI에서는  아래와 같이 ▶ 버튼을 눌러 Trigger를 실행할 수 있다.
 
Trigger w/ config 버튼은 Dictionary 포맷으로 매개 변수를 전달할 때 사용한다.
아래 포스팅의 Dummy Operator 부분을 참고한다.
CLI에서 수동으로 실행하는 경우 airflow dags trigger 명령을 사용한다.
$ airflow dags trigger
usage: airflow dags trigger [-h] [-c CONF] [-e EXEC_DATE] [-r RUN_ID] [-S SUBDIR] dag_id

 

다음과 같이 실행하면 된다.
# backfill이 아닌 강제 trigger 하려는 경우 
# (이 경우 external_trigger로 DAG Runs에 기록)
$ airflow dags trigger --conf '{"conf1": "value1"} -e EXEC_DATE dag_id
 

Tasks

DAG을 구성하는 작업 단위를 Task라고 하며, DAG이 수행할 작업들을 의미한다.
하나 또는 여러 개의 Task를 연결해서 DAG을 생성하며, Task에는 Operator, Sensor, Hook이 있다.
 
Tasks, Operator, Sensor에 대한 자세한 내용은 아래 포스팅을 참고한다.
 

TaskFlow

TaskFlow는 DAG을 간단하게 작성할 수 있도록 해주는 기능이다.
 
일반적인 DAG 코드는 아래와 같은 구조를 갖는다.
이러한 구조로 코드를 작성하다 보면 간단한 워크플로우라도 코드는 길어진다.
 
간단한 워크플로우를 쉽게 작성하기 위해 AirFlow에서는 2.0.0 버전부터 TaskFlow를 제공한다.
파이썬 데코레이터를 사용해서 DAG과 Task 구성을 쉽게 작성할 수 있다.
 
예제 코드는 다음과 같다.
from airflow import decorators 
from airflow import DAG 
from datetime import datetime
import json

@decorators.task()
def read_json():
    data = '{"apple": 135, "meta": 35, "tesla": 100}'
    return json.loads(data)

@decorators.task()
def sum_result(data):
    print(sum(data.values()))

args = {
    "owner": "testuser",   
    "catchup": False
}

with DAG(dag_id="taskflow_test", 
    default_args=args, 
    start_date=datetime(2022, 4, 7),
    schedule_interval = "@once") as dag:
    sum_result(read_json())

 

AirFlow에서 생성 된 DAG을 확인해보면 다음과 같다.