::: IT인터넷 :::

AirFlow의 Operator 이해하기

곰탱이푸우 2022. 6. 16. 08:20
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
이번에는 DAG에서 수행하는 작업을 의미하는 Operator에 대해 정리한다.
 
DAG 소개와 기본 구조는 아래 포스팅을 참고한다.
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.
 

AirFlow에서 Task

DAG을 구성하는 작업 단위를 Task라고 하며, DAG이 수행할 작업들을 의미한다.
하나 또는 여러 개의 Task를 연결해서 DAG을 생성하며, Task에는 Operator, Sensor, Hook이 있다.
 
Task는 Bash, Python을 포함해서 다양한 작업을 수행할 수 있다.
아래 사이트를 참고한다.
또한 잘 알려진 서비스나 오픈소스 백엔드에 대한 작업을 수행할 수 있도록 다양한 Providers Package를 제공한다.
아래 사이트를 참고한다.
Providers Package의 경우 pip를 사용해서 설치 가능하다.
아래 기술 문서를 참고한다.
 

Operator

Operator는 task를 어떻게 실행시킬지를 나타낸다.
하나의 워크플로우 안에서 하나의 테스크를 나타낸다.
Sensor와 Hook도 있지만 일반적으로 Operator를 대부분 사용한다.
 
Operator는 Action Operator와 Transfer Operator로 구분된다.
  • Action Operator는 작업을 수행하거나 다른 시스템에 작업을 수행하도록 지시한다.
  • Transfer Operator는 특정 시스템에서 다른 시스템으로 데이터를 이동시킨다.

 

AirFlow에서 제공하는 기본 Operator의 종류는 Bash와 Python을 포함해서 상당히 많다.
자세한 사항은 아래 기술 문서를 참고한다.
참고로 _operator로 끝나는 Operator들은 대부분 Defrecated 되었다.
대신 이름에서 _operator를 제외한 Operator를 사용하면 된다.
 
Operator에 공통적으로 **kwargs라는 Keyword Arguments 를 전달하는 부분이 있다.
해당 부분은 DAG을 정의할 때 정의했던 default_args가 전달된다고 이해하면 된다.
 
자세한 내용은 아래 포스팅을 참고한다.
일반적으로 많이 사용하는 주요 Operator만 살펴본다.

 

 

Dummy Operator

아무 작업을 하지 않는 Operator이다.
보통 시작과 종료를 나타내거나, 다른 작업들을 그룹화하는데 사용한다.
 
클래스 정의
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.dummy

class DummyOperator(**kwargs)
 
자세한 사항은 아래 페이지를 참고한다.
사용 예제
다음과 같이 사용한다.
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime

dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))

task1 = BashOperator(task_id="task2", bash_command="echo task1", dag=dag)

### DummyOperator 예제

start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

start >> task1 >> end
 
다른 작업을 그룹화 하는 경우 다음과 같이 사용한다.
DAG과 TaskGroup을 with와 함께 사용하면 계층적 구성이 가능하다.
각 Operator에 dag를 지정하는 전달 인자를 사용하지 않은 것에 주목한다.
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from airflow import DAG
from datetime import datetime

with DAG(dag_id="testdag", start_date=datetime(2022, 4, 7)) as testdag:
    start = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end")

    # START TaskGroup Example
    with TaskGroup(group_id="group1", tooltip="Tasks for group1") as group1:
        task1 = DummyOperator(task_id="task1")
        task2 = BashOperator(task_id="task2", bash_command="echo test_group")
        task3 = DummyOperator(task_id="task3")

        task1 >> [task2, task3]

    start >> group1 >> end
 
위의 코드를 도식화하면 아래와 같다.
 

Bash Operator

Bash Shell 스크립트를 실행하는 Operator이다.
리눅스 명령어 실행도 가능하며 프로그램 실행도 가능하다.
 
실행하려는 작업을 프로그램으로 작성한 경우, Bash Opearator로 실행할 수 있기 때문에 중요하다.
 
클래스 정의
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.bash

class BashOperator(
    bash_command: str,
    env: Optional[Dict[str, str]] = None,
    output_encoding: str = "utf-8",
    skip_exit_code: int = 99,
    cwd: str = None,
    **kwargs)
 
전달 인자는 다음과 같다.
구분
타입
기본값
내용
bash_command
str
 
실행할 Bash 명령어
env
Optional[Dict[str, str]]
None
Dict 형태의 환경 변수 전달
output_encoding
str
"utf-8"
Bash 명령어 출력값 인코딩 방식
skip_exit_code
int
99
명령어 결과인 exit_code로 성공 여부 확인 여부
None이면 0이 아닌 모든 값은 실패로 간주
cwd
str
None
명령어가 실행 될 경로 지정
None이면 임시 경로에서 실행
provide_context
bool
False
True로 설정하면 Task 인스턴스의 attribute들을 kwargs로 사용 가능
BaseOperator에 정의 되어 있다
 
명령어 실행 이후 exit_code로 명령어의 실행이 정상으로 종료되었는지 확인하려면 skit_exit_code를 사용한다.
특정 경로에서 명령어 실행을 원하는 경우 cwd를 사용한다.
 
자세한 사항은 아래 페이지를 참고한다.
 
사용 예제
다음과 같이 사용한다.
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime

dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

### BashOperator 예제

test_bash = BashOperator(
    task_id = "test_bash",
    bash_command = "echo 'This is the message'",
    output_encoding="utf-8",
    dag=dag)

start >> test_bash >> end
 
환경 변수를 전달하고자 하는 경우에는 다음과 같이 사용한다.
from airflow.operators.bash import BashOperator 
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime

dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))
start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

### BashOperator 예제

test_bash = BashOperator(
    task_id = "test_bash",
    bash_command = "echo 'This is the ds: \'$msg\''",
    env = { "msg": '{{ dag_run.conf.ds | d("ds not found!") }}' },
    output_encoding="utf-8",
    dag=dag)

start >> test_bash >> end
 
그냥 실행하면 ds라는 환경 변수가 존재하지 않아 ds not found! 메시지가 출력된다.
문자열을 d()로 감싼 것에 주의한다.
 
dag_run의 ds가 존재하는 경우 해당 값을 출력한다.
실행할 때 해당 값을 전달해야 한다. 우측의 ▶ 버튼을 클릭하고 Trigger DAG w/ config 버튼을 클릭한다.

 

아래와 같이 Dictionary 포맷으로 ds 값을 정의하고, 아래의 Trigger 버튼을 클릭한다.
 
로그를 확인해보면 전달한 값이 정상적으로 출력되는 것을 확인할 수 있다.
 
 

Python Operator

파이썬 코드 (.py)를 실행하기 위한 Operator이다.
일반적으로 자동화 스크립트를 파이썬으로 작성하는 경우가 많다.
이러한 파이썬 코드들은 해당 Operator를 통해 실행할 수 있다.
 
whl로 배포한 프로그램의 경우 BashOperator를 통해 실행이 가능하다.
 
클래스 정의
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.operators.python

class PythonOperator(
    python_callable: Callable,
    op_args: Optional[Collection[Any]] = None,
    op_kwargs: Optional[Mapping[str, Any]] = None,
    templates_dict: Optional[Dict] = None,
    templates_exts: Optional[List[str]] = None,
    **kwargs
)
 
전달 인자는 다음과 같다.
구분
타입
기본값
내용
python_callable
Callable
 
호출할 파이썬 개체 (함수, 클래스)
op_args
Optional[Collection[Any]]
None
파이썬 개체에 전달할 명시적 (explict) 인자
op_kwargs
Optional[Mapping[str, Any]]
None
파이썬 개체에 전달할 키워드 인자 (krawgs)
templates_dict
Optional[Dict]
None
AirFlow 엔진이 생성한 템플릿으로 전달되는
Dictionary 타입의 값 목록
templates_exts
Optional[List[str]]
None
템플릿 필드를 처리하는 동안 확인할 파일 확장자 목록
provide_context
bool
False
True로 설정하면
Task 인스턴스의 attribute들을 kwargs로 사용 가능
BaseOperator에 정의 되어 있다
 
python_callable에서 DAG 코드 외부의 함수나 클래스를 사용하는 경우, DAG 코드 상단에 import 되어 있어야 한다.
 
자세한 사항은 아래 페이지를 참고한다.
 

airflow.operators.python — Airflow Documentation

 

airflow.apache.org

 

 

참고로 template_dict 값은 Dictionary 타입의 Jinja Template으로 전달 된다.
DAG과 Task 실행에 관한 정보들이 전달된다.
자세한 사항은 아래 페이지를 참고한다.
사용 예제
다음과 같이 사용한다.
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime
import time

dag = DAG(dag_id="testdag", start_date=datetime(2022, 4, 7))

start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

### PythonOperator 예제

def test_function(sleep_time):
    """This is a function that will run within the DAG execution"""
    print("Start sleep {sleep_time} seconds!".format(sleep_time=sleep_time))
    time.sleep(sleep_time)
    print("End sleep {sleep_time} seconds!".format(sleep_time=sleep_time))

test_bash = PythonOperator(
    task_id = "test_python",
    python_callable=test_function,
    op_kwargs={"sleep_time": 10},
    dag=dag)

start >> test_bash >> end
 
 

그 외 주요 Operator

제공하는 Operator가 많아서 특징만 살펴본다.
구분
클래스 경로
내용
EmailOperator
airflow.operators.email
이메일 발송
BranchPythonOperator
airflow.operators.branch
파이썬 실행 결과에 따른 분기를 설정하는 Operator
설정과 실행 Method로 구분
ShortCircuitOperator
airflow.operators.python
bool 조건에 맞을 때만 실행
bool 연산 로직은 python_callable로 전달
PythonVirtualenvOperator
airflow.operators.python
Python 가상 환경에서 실행
GenericTransfer
airflow.operators.generic_transfer
Database의 데이터를 다른 Database로 복사
SubDagOperator
airflow.operators.subdag
특정 DAG을 SubDag으로 실행
TriggerDagRunOperator
airflow.operators.trigger_dagrun
지정한 DAG 실행

 

각 Operator의 사용법은 아래 기술 문서를 참고한다.