::: IT인터넷 :::

AirFlow의 Sensor 이해하기

곰탱이푸우 2022. 6. 20. 08:20
실행할 작업들의 순서를 구성한 워크플로우(WorkFlow)는 AirFlow에서 DAG 이라는 형태로 사용한다.
이번에는 DAG에서 수행하는 작업을 의미하는 Sensor에 대해 정리한다.
 
DAG 소개와 기본 구조는 아래 포스팅을 참고한다.
AirFlow의 소개와 구조는 아래 포스팅을 참고한다.
Binami에서 배포한 AirFlow Docker 이미지를 사용하는 방법은 아래 포스팅을 참고한다.
 

AirFlow에서 Task

DAG을 구성하는 작업 단위를 Task라고 하며, DAG이 수행할 작업들을 의미한다.
하나 또는 여러 개의 Task를 연결해서 DAG을 생성하며, Task에는 Operator, Sensor, Hook이 있다.
 
Task는 Bash, Python을 포함해서 다양한 작업을 수행할 수 있다.
아래 사이트를 참고한다.
또한 잘 알려진 서비스나 오픈소스 백엔드에 대한 작업을 수행할 수 있도록 다양한 Providers Package를 제공한다.
아래 사이트를 참고한다.
Providers Package의 경우 pip를 사용해서 설치 가능하다.
아래 기술 문서를 참고한다.

Sensor

Sensor는 어떤 결과를 만족하는지 외부 이벤트를 주기적으로 체크할때 사용한다.
특정 조건을 만족할 때까지 기다리고, 해당 조건을 만족하면 이후 Task를 진행한다.
 
Operator와 마찬가지로 Sensor의 종류도 상당히 많다.
다양한 Sensor들은 아래 기술 문서를 참고한다.
참고로 _sensor로 끝나는 Sensor들은 대부분 Defrecated 되었다.
대신 이름에서 _sensor를 제외한 Sensor를 사용하면 된다.
 
Sensor에 공통적으로 **kwargs라는 Keyword Arguments 를 전달하는 부분이 있다.
해당 부분은 DAG을 정의할 때 정의했던 default_args가 전달된다고 이해하면 된다.

 

자세한 내용은 아래 포스팅을 참고한다.
종류는 상당히 많지만 일반적으로 많이 사용하는 주요 Sensor만 살펴본다.
 

실행 모드

Sensor Task에서 주기적으로 체크하면 다음 단계로 진행하지 못하는 상태가 유지된다.
DAG는 실행 중인 상태이기 때문에 AirFlow의 DAG을 실행하는 Worker의 슬롯 한 개를 점유한다.
조건을 만족하지 할 때까지 무기한 점유하므로 주의해서 사용해야 한다.

 

Sensor들은 BaseSensorOperator 클래스를 상속하여 구현되었다.
부모 클래스인 BaseSensorOperator 클래스에는 이러한 문제를 해결하기 위한 옵션이 존재한다.
구분
타입
기본값
내용
poke_interval
float
60
조건 확인의 재시도 주기이며 단위는 초이다.
기본 값은 60으로 1분마다 재시도한다.
timeout
float
60 * 60 * 24 * 7
Sensor의 조건 확인 대기 시간이며 단위는 초 (Second)이다
기본 값은 60 * 60 * 24 * 7로 일주일을 의미한다.
mode
str
"poke"
poke는 특정 조건을 만족할 때까지 Worker 슬롯 점유
reschedule은 조건을 확인할 때만 Worker 슬롯 점유
 
BaseSensorOperator에 대한 자세한 내용은 기술 문서를 참고한다.
AirFlow의 Worker는 한정 된 자원이다.
따라서 poke_interval, timeout, mode 설정을 적절히 조합해서 사용해야 한다.
 
 

FileSensor

어떤 파일이 존재해야 실행되는 Task라면 FileSensor를 이용해서 해당 파일이 있는지 확인할 수 있다.
 
클래스 정의
다음과 같이 정의되어 있다.
# 클래스 경로
# airflow.sensors.filesystem
class FileSensor(
    filepath: str,
    fs_conn_id: str = "fs_default",
    recursive: bool = False,
    **kwargs
)
 
전달 인자는 다음과 같다.
구분
타입
기본값
내용
filepath
str
 
파일이나 폴더명 (glob 형태도 가능)
connection에 등록 된 경로 기준으로 상대 경로도 가능
fs_conn_id
str
"fs_default"
connection에 등록 된 File(path) 경로의 ID
recursive
bool
False
True로 설정되면 하위 경로에 대한 탐색 진행
 
기본적으로 1분에 한번씩 체크하며 파일이 존재하면 다음 Task를 수행한다.
 
자세한 사항은 아래 기술 문서를 참고한다.
 

airflow.sensors.filesystem — Airflow Documentation

 

airflow.apache.org

 

 

Connection 등록
FileSensor를 사용하기 위해서는 Connection 등록이 필요하다.
 
AirFlow 상단의 Admin 메뉴에 있는 Connections에서 + 버튼을 클릭한다.
 
상단의 Conn id, Conn Type, Description에 값을 입력한다.
 
아래 쪽의 extra 항목에 Key-Value 형태로 path를 입력하고 Save 버튼을 클릭한다.
해당 path는 FileSensor에서 기준 경로로 사용한다.

 

중간에 Host, Schema, Login, Password, Port 등의 항목은 비워둔다.
해당 항목은 네트워크 통신과 계정 인증이 필요한 다른 Connection Type에서 사용한다.
 
실제로 Connection Type을 클릭해보면 다양한 타입을 지원한다.
  • 기본 : HTTP, FTP, SSH, IMAP 등
  • 외부 : Amazon, Azure, Hadoop 생태계, Docker, Elastic Search, GCP (Google Cloud), Kubernetis 등
 
외부 타입들은 Package Providers 설치가 필요하다.
아래 사이트를 참고한다.
 
사용 예제
FileSensor가 포함 된 DAG 코드를 작성한다.
from airflow.sensors.filesystem import FileSensor
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
from datetime import datetime

default_args= {
    "owner": "testuser",
    "start_date": datetime(2022, 4, 7),
    # DAG 실행 중 오류 발생하면 failure callback으로 메일 발송하므로 비활성화
    "email_on_failure": False
}

dag = DAG(dag_id = "test_sensor", 
        default_args = default_args, 
        schedule_interval = "@once")

start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

### FileSensor 예제

file_check = FileSensor(
    task_id = "file_check",
    fs_conn_id = "test_file",
    filepath = "test.py",
    dag = dag)

print_file = BashOperator(
    task_id = "print_path",
    bash_command = "cat /opt/bitnami/airflow/sensor/test.py",
    dag = dag)

start >> file_check >> print_file >> end

 

위의 코드를 도식화 하면 다음과 같다.
 
DAG이 실행되면 test.py 파일이 생성될 때까지 file_check 단계에서 체크한다.
파일이 생성되면 print_file 단계를 실행하여 파일 내용을 출력한다.
 
파일의 위치는 webserver나 scheduler가 아닌 worker 기준이다.
AirFlow의 Worker 내부에 해당 파일이 생성되어야 정상적으로 실행된다.
 
 

PythonSensor

PythonSensor는 파이썬 함수나 클래스를 콜백(Callback) 형태로 호출한다.
조건을 만족할 때까지 주기적으로 실행하고, 조건을 만족하면 다음 작업을 수행한다.
 
FileSensor가 단순하게 파일의 존재 여부만 체크하는 것에 비해 차이가 있다.
FileSensor 기능도 PythonSensor로 구현할 수 있는데, Connection 등록이 필요 없는 장점이 있다.
 
파이썬 코드를 이용하여 구체적이고 상세한 조건 확인이 필요할 때 사용하면 유용하다.
자동차 운전에 비유하면 BranchPythonOperator는 이정표, PythonSensor는 신호등이다.
 
따지고 보면 AirFlow에서 파이썬 Operator와 Sensor는 거의 만능이다.
 
클래스 정의
다음과 같이 정의되어 있다. 파이썬 Operator와 거의 유사하다.
# 클래스 경로
# airflow.sensors.python

class PythonSensor(
    python_callable: Callable, 
    op_args: Optional[Collection[Any]] = None, 
    op_kwargs: Optional[Mapping[str, Any]] = None, 
    templates_dict: Optional[Dict] = None, 
    **kwargs
)
 
전달 인자는 다음과 같다.
구분
타입
기본값
내용
python_callable
Callable
 
호출할 파이썬 개체 (함수, 클래스)
op_args
Optional[Collection[Any]]
None
파이썬 개체에 전달할 명시적 (explict) 인자
op_kwargs
Optional[Mapping[str, Any]]
None
파이썬 개체에 전달할 키워드 인자 (krawgs)
templates_dict
Optional[Dict]
None
AirFlow 엔진에 의해 생성 된 Dictionary 타입의 값 목록
templates_exts
Optional[List[str]]
None
템플릿 필드를 처리하는 동안 확인할 파일 확장자 목록
 
python_callable에서 DAG 코드 외부의 함수나 클래스를 사용하는 경우, DAG 코드 상단에 import 되어 있어야 한다.
 
자세한 사항은 아래 기술 문서를 참고한다.
참고로 template_dict 값은 Dictionary 타입의 Jinja Template으로 전달 된다.
DAG과 Task 실행에 관한 정보들이 전달된다.
자세한 사항은 아래 페이지를 참고한다.
 
사용 예제
PythonSensor가 포함 된 DAG 코드를 작성한다.
from airflow.sensors.python import PythonSensor
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow import DAG
import random
from datetime import datetime

default_args= {
    "owner": "testuser",
    "start_date": datetime(2022, 4, 7),
    # DAG 실행 중 오류 발생하면 failure callback으로 메일 발송하므로 비활성화
    "email_on_failure": False
}

dag = DAG(
        dag_id = "test_sensor", 
        default_args = default_args, 
        schedule_interval = "@once",
)

start = DummyOperator(task_id="start", dag=dag)
end = DummyOperator(task_id="end", dag=dag)

### PythonSensor 예제

def check_condition():  # PythonSensor의 callback 함수
    num = random.randint(0, 10)
    print(num)

    if num > 6:
        return True
    else:
        return False

python_check = PythonSensor(
    task_id = "python_check",
    python_callable = check_condition,
    poke_interval=15,
    dag = dag)

print_message = BashOperator(
    task_id = "print_message",
    bash_command = "echo Test for PythonSensor",
    dag = dag)

start >> python_check >> print_message >> end
 
위 코드를 도식화 하면 다음과 같다.
 
PythonSensor에서 check_condition 함수를 콜백으로 호출한다.
무작위로 생성한 0~9 사이의 값이 6보다 작으면 False를 반환하고 15초 뒤에 재시도한다.
6보다 크면 True를 반환하고 다음 작업을 수행한다.
 
로그를 확인해보면 15초 뒤에 재실행했고, 9가 나오자 True를 반환하고 성공 한 것을 확인할 수 있다.
 

그 외 주요 Sensor

제공하는 Sensor 가 많아서 특징만 살펴본다.
구분
클래스 경로
내용
BashSensor
airflow.sensors.bash
Bash 명령 또는 스크립트를 실행
리턴 코드가 0인 경우만 True 반환
DateTimeSensor
DateTimeSensorAsync
airflow.sensors.date_time
지정한 날짜까지 대기
Async는 비동기로 처리하여 Worker 비점유
ExternalTaskSensor
ExternalTaskSensorLink
ExternalTaskMarker
airflow.sensors.external_task
다른 DAG이나 다른 DAG 작업 종료까지 대기
ExternalTaskSensor로 연결한 외부 DAG에 연결
다른 DAG의 Task에 대해 종속성(연결) 지정
 
다양한 Sensor들의 사용법은 아래 기술 문서를 참고한다.