::: IT인터넷 :::

AirFlow DAG의 Jinja Templating, Xcom, Variable, SubDags

곰탱이푸우 2022. 6. 30. 08:20
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
이번에는 DAG의 구성 요소에 대해 정리한다.
 
DAG 소개와 기본 구조는 아래 포스팅을 참고한다.
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.

DAG 구성 요소

AirFlow에서 가장 중요한 것은 앞에서 다룬 DAG이다.
실제로 AirFlow에서 실행할 작업들을 정의한 것이기 때문이다.
 
DAG에 대한 설명은 아래 포스팅의 DAG이란 항목을 참고한다.
DAG을 구성하는 요소들은 DAG Runs, Tasks, TaskFlow, Hooks, Pools, Branching, Trigger Rules, Jinja Templating, Xcom, Variable, SubDags 등이 있다.
이번 포스팅에서는 Jinja Templating, Xcom, Variable, SubDags에 대해 정리한다.
 

Jinja Templating

Jinja는 파이썬을 위한 templating engine으로, 디자이너 친화적인 템플릿 언어로 알려져 있다. (음??)
AirFlow는 Jinja2 template를 내장하고 있어 Task 인스턴스가 실행 중에도 정보를 동적으로 전달할 수 있다.
 
대표적인 변수들은 다음과 같다.
Variable
Description
{{ dag }}
DAG 오브젝트로 하위 속성 접근 가능
{{ task }}
Task 오브젝트로 하위 속성 접근 가능
{{ ds }}
DAG 실행의 논리적 날짜 (ex. 2022-04-07)로 {{ dag_run.logical_date | ds }} 와 동일
{{ ts }}
{{ dag_run.logical_date | ts }} 와 동일 (ex.  2018-01-01T00:00:00+00:00)
{{ data_interval_start }}
Task가 참고해야 할 데이터의 시작 날짜 범위
{{ data_interval_end }}
Task가 참고해야 할 데이터의 종료 날짜 범위
{{ prev_start_date_success }}
이전의 DAG 실행이 성공한 start_date (ds)
{{ macros }}
Macro로서 하위 속성으로 일부 표준 라이브러리 접근 가능
datetime, timedelta, dateutil, time, uuid, random 등
{{ task_instance }} or {{ ti }}
task_instance 오브젝트로 하위 속성 접근 가능
{{ params }}
Dictionary 타입으로 전달 된 사용자 정의 매개변수 사전에 대한 참조
{{ var.value.my_var }}
Dictionary 타입의 전역 정의 변수 (Variables에 정의)
{{ var.json.my_var.path }}
Dictionary를 deserialize한 JSON 객체 타입의 전역 정의 변수
{{ conn.my_conn_id }}
Connection represented as a dictionary.
{{ conf }}
airflow.cfg에 정의 된 airflow.configuration.conf 개체
{{ run_id }}
현재 DAG 실행의 run_id
{{ dag_run }}
DagRun에 대한 참조
 
Jinja Templating에 대한 자세한 내용은 아래 페이지를 참고한다.
 

Templates reference — Airflow Documentation

 

airflow.apache.org

 

 

대표적인 사용 예제는 다음과 같다.
# The execution date as YYYY-MM-DD
date = "{{ ds }}"

t = BashOperator(
    task_id = 'test_env',
    bash_command = '/tmp/test.sh ',
    dag=dag,
    env={'EXECUTION_DATE': date})
 
env에 전달 되는 EXECUTION_DATE의 환경 변수에 {{ ds }}가 전달된다.
ds는 AirFlow의 DAG이 실행 된  논리적 날짜로, YYYY-MM-DD 형태이다.
만약 2022년 4월 7일이라면, EXECUTION_DATE에는 2022-04-07이 전달 된다.
 

Xcom

Airflow의 task간에 변수나 데이터 전달을 위해 통신하는 기능이다.
해당 기능을 통해 다른 Task에서 이전 Task의 값을 사용할 수 있다.
 
Work가 한 개인 단독 환경에서는 DAG 코드 내부에서 공유가 가능하다.
그러나 Worker가 여러 개인 AirFlow의 분산 환경에서는 서로 다른 Worker에서 Task가 실행 될 수 있기 때문에 Xcom을 사용한다.
 
Variable과 비슷하지만 Xcom은 특정 DAG 내부에서만 공유되는 특징이 있다.
여러 DAG에서 공유해서 사용하려면 Variable을 사용해야 한다.
 
Operator를 생성할 때 provide_context 옵션이 True로 되어 있어야 한다.
Operator에 대한 내용은 아래 포스팅을 참고한다.

PythonOperator 사용

PythonOperator에서는 return 값이 자동으로 Xcom에 push 된다.
def return_xcom():
    return "xcom!"
   
return_xcom = PythonOperator(
    task_id = 'return_xcom',
    python_callable = return_xcom,
    dag = dag
)
 
 

Push와 Pull 사용

Xcom에 값을 넣는 Push (xcom_push)와 가져오는 Pull (xcom_pull)을 사용한다.
PythonOperator가 아니거나 PythonOperator에서 리턴 값이 아닌 다른 값을 전달할 때 사용한다.
 
Operator 코드 내부에서 아래와 같은 방법으로 사용한다.
일반적으로 별도의 함수에 정의해서 사용한다.
# xcom_push
def push_function(**context):
    context['task_instance'].xcom_push(key=변수명, value=전달할값)

# xcom_pull
def pull_function(**context): 
    value = context['ti'].xcom_pull(key=변수명, task_ids=대상Task이름)
 
Xcom의 사용 예제는 다음과 같다.
BashOperator의 경우 쉘 스크립트 (.sh) 파일 내부에서도 Jinja Templating으로 사용 가능하다.
## xcom_push Example
def test_xcom_push(**context):
    # 'task_instance'는 'ti'로도 표현 가능
    context['task_instance'].xcom_push(key='pushed_value', value='xcom_push_test_message!')

push_by_xcom = PythonOperator(
    task_id='push_by_xcom',
    python_callable=test_xcom_push,
    dag=dag,
)


## xcom_pull Example
def pull_function(**context):
    value = context['ti'].xcom_pull(key='pushed_value', task_ids='push_by_xcom')
    print(value)

# Python Operator인 경우
pull_example_1 = PythonOperator(
    task_id='pull_example_1',
    python_callable=pull_function,
    dag=dag,
)

# Python Operator가 아닌 경우 Jinja Templating 사용
pull_example_2 = BashOperator(
    task_id='pull_example_2',
    bash_command='echo "{{ ti.xcom_pull(key="pushed_value") }}"', 
    dag=dag,
)
 
 

Variable

DAG에서 공통적으로 사용하는 값을 사용하기 위한 기능이다.
Key - Value 형태이며 다른 DAG과도 공유할 수 있다.
 
AirFlow의 Variables에 값을 정의하는 방법은 아래 포스팅의 Variables 항목을 참고한다.
Variables의 값을 가져와서 사용하는 방법은 다음과 같다.
# DAG내에서 환경변수 사용하기
from airflow.models import Variable

# Variables에서 값 가져오기
foo = Variable.get("foo")

# Variables에서 값을 JSON 형태로 가져오기
foo_json = Variable.get("foo_baz", deserialize_json=True)
 

SubDags

DAG의 하위 개념으로 다음과 같은 경우에 활용하면 유용하다.
  • DAG 내부의 특정 부분이 반복되어 재정의를 피하고 싶은 경우
  • 특정 단계의 Task들을 하위 DAG으로 의존성 설정을 하고 싶은 경우
 
 
SubDAG은 SubDagOperator를 사용하며, 별도 정의한 SubDAG 함수를 호출하는 형태로 구성한다.
하지만 SubDagOperator가 2.2.0 버전부터 Defrecated 되었다.
 
대신 TaskGroup을 사용하도록 변경되었다.
TaskGroup 사용법은 아래 포스팅의 DummyOperator 부분을 참고한다.