::: IT인터넷 :::

AirFlow의 DAG을 파이썬 패키지로 구성하기 (1)

곰탱이푸우 2022. 7. 4. 08:20
AirFlow의 환경 구성과 DAG에 대한 구조와 개념에 대해 다뤘다.
아래 포스팅들을 참고한다.
DAG 파이프라인을 스크립트 형태로 .py로 작성해도 문제는 없다.
그러나 이력 관리(형상, 버전)와 부가 기능을 구현해서 사용하기 위해서는 패키지 형태로 구성하는 것이 좋다.
 
 
DAG를 파이썬 패키지로 구성하는 방법에 대해 정리한다.
  1. 기능 정의
  2. 프로젝트 생성
  3. DAG 코드 작성
  4. 기능 코드 작성 (1)
  5. 기능 코드 작성 (2)
  6. 테스트 코드 작성
  7. 패키지 정의
  8. 테스트
  9. 형상 관리, 빌드, 배포
 
먼저 DAG 워크플로우가 수행할 기능을 정의하고, 프로젝트 골격과 DAG 코드를 작성한다.
 

기능 정의

DAG을 파이썬 패키지로 작성하는 예제 코드이므로 복잡한 기능을 정의하지는 않는다.
그러나 Hello World와 같이 너무 간단한 코드를 작성하면, 예제 작성 목적에 적합하지 않다.
 
따라서 외부 라이브러리를 가져다 쓰면서 최대한 단순한 패키지를 작성하는데 중점을 두었다.
예제 코드는 init과 display, cleanup이라는 3개의 간단한 기능을 가지고 있다.
 
이메일 전송 관련 설정 정보를 읽어와서 화면에 출력하는 패키지로, 테스트를 위해 최대한 간단하게 구성했다.
실제로 이메일을 보내는 기능은 Task 실패 또는 전체 DAG 실행이 성공하면 호출되는 콜백 함수에만 구현했다.
기존 파이썬 빌드 테스트를 위한 예제 코드와 동일하다. 다음 포스팅을 참고한다.
기존 예제 코드는 init, display 기능을 별도로 실행했다.
DAG 예제 코드는 DAG을 사용하여 init, display, cleanup 기능을 순차적으로 실행하도록 구성한다.
 
 

제약 사항

AirFlow의 Worker가 두 개 이상인 경우 각 Task가 서로 다른 Worker에서 실행 될 수 있다.
이러한 경우 서로 다른 Worker의 로컬 경로에 생성 된 파일에 접근 할 수 없다.
따라서 예제 코드는 AirFlow의 Worker가 한 개인 경우로 가정한다.
 
Worker가 여러 개인 경우 모든 Worker가 사용 가능해야 한다.
Docker로 구성했다면 모든 Worker 컨테이너에 대해  호스트의 특정 작업 경로를 공유 볼륨으로 설정한다.
MinIO, S3와 같은 AirFlow 외부의 파일 스토리지를 사용하는 것도 좋은 방법이다.
 

init

해당 패키지의 설정 파일이 지정한 경로에 있는지 확인하고 파일을 생성하는 기능을 수행한다.
파일 생성은 임시 경로 (/tmp) 하위에서 진행한다.
 

display

지정한 yml 파일 내부의 값을 읽어와서 화면에 출력하는 기능이다.
날짜 값은 DAG의 execution_date (locigal_date)에 맞게 변경해서 출력한다.
 
execution_date와 logical_date에 대한 자세한 내용은 아래 사이트를 참고한다.

cleanup

init 과정에서 생성한 설정 파일을 삭제하는 기능을 수행한다.
init 기능에 예외 처리로 구현해도 되지만, 예제 목적에 따라 추가한다.
 
 

프로젝트 생성

위의 기능에 맞춰 코드 작성을 진행한다.
먼저 프로젝트 폴더를 구성하고, 각 기능별로 코드를 작성한다.
 

프로젝트 폴더 구성

작성하는 파이썬 패키지의 전체 구성은 다음과 같다.
root
├─ .gitingnore    # git의 commit 제외 파일
├─ README.md    # git의 프로젝트 첫페이지 안내문
├─ setup.py    # 빌드, 배포를 위한 설정
├─ dags    # DAG 워크플로우를 정의한 소스코드 폴더
│    ├─ __init__.py    # 해당 경로가 파이썬 패키지에 포함됨을 알리는 역할
│    └─ srtest_pipeline.py    # DAG 워크플로우를 정의한 소스코드
├─ srtestairflow    # 해당 프로그램이 수행할 기능을 정의한 소스코드 폴더
│    ├─ __init__.py    # 해당 경로가 파이썬 패키지에 포함됨을 알리는 역할
│    ├─ srtest.py    # DAG에서 PythonOpeartor로 수행할 Task의 소스코드
│    ├─ emailclient.py    # Task 코드에 필요한 부가 기능을 정의한 소스코드
│    ├─ callback.py    # DAG이 성공하거나 Task가 실패할때 호출하는 콜백 함수
│    ├─ info.py    # 해당 패키지의 이름과 버전 정보 (setup.py에서 사용)
│    ├─ initializer.py    # 파이썬 패키지의 DAG를 AirFlow의 dags 폴더에 복사
│    └─ srtest-config    # 프로그램 실행에 필요한 설정 파일 
│        ├─ __init__.py    # 해당 경로가 파이썬 패키지에 포함됨을 알리는 역할
│        └─ config.yml    # 해당 프로그램에서 사용하는 설정 파일
├─ requirements.txt    # 설치할때 필요한 라이브러리 정의
├─ tests    # 구현한 기능들의 단위(유닛) 테스트를 정의한 소스코드 폴더
│    ├─ __init__.py    # 해당 경로가 파이썬 패키지에 포함됨을 알리는 역할
│    ├─ test_emailclient.py    # emailclient.py의 단위(유닛) 테스트
│    ├─ test_srtest.py    # srtest.py의 단위(유닛) 테스트
│    └─ resources    # 테스트 수행에 필요한 설정 파일
│        └─ config.yml    # 테스트에서 사용하는 설정 파일
└─ test_requirements.txt    # 테스트할때 필요한 라이브러리 정의
 
각 구성 요소들은 다음과 같다.
  • dags 폴더 - DAG과 워크플로우 정의
  • srtestairflow 폴더 - 해당 프로그램이 수행할 기능 정의
  • tests   - 구현한 기능들의 단위(유닛) 테스트 정의
 
나머지 setup.py, requirements.txt, test_requirements.txt는 파이썬 패키지 생성을 위한 파일들이다.
.gitignore, README.md, setup.py에 대한 설명은 생략한다.
 
파이썬 프로젝트 폴더 구성은 아래 포스팅을 참고한다.

setup.py

setup.py 내부의 entry_points 항목은 패키지를 실행할 때 가장 먼저 실행 될 부분을 가리킨다.
 
AirFlow에서 해당 DAG이 인식되기 위해서는 DAG을 정의한 py 파일이 AIrFlow의 dags 폴더에 위치해야 한다.
해당 기능은 srtestairflow폴더의 initializer.py 내부에 정의한 initialzier 함수에 정의했다.
 
따라서 setup.py의 entry_points는 initialzier 함수를 호출해야 한다.
setup.py 의 코드는 다음과 같다. entry_point의 console_scripts 항목을 참고한다.
from setuptools import setup, find_packages 
from srtestairflow.info import __package_name__, __version__

with open('README.md', 'r', encoding='utf-8') as f:
    readme = f.read()
with open("requirements.txt", "r", encoding="utf-8") as f: 
    requires = f.read().splitlines()
with open("test_requirements.txt", "r", encoding="utf-8") as f:
    test_requires = f.read().splitlines()

setup(
    name=__package_name__,
    version=__version__,
    long_description=readme,
    packages=find_packages(exclude=["contrib", "docs", "tests"]),
    package_data={'': ["*.yaml", "*.yml"]},
    install_requires=list(filter(lambda x: not x.startswith("apache-airflow"),requires)),
    setup_requires=[ "pytest-runner" ],
    tests_require=test_requires,
    python_requires=">3.8",
    entry_points={ "console_scripts": ["srtest-airflow=srtestairflow.initializer:initializer"] }
)
 

iinstall_requires 항목에 apache-airflow로 시작하는 부분은 제외한 것을 확인할 수 있다.

기본적으로 AirFlow Docker 컨테이너 내부에서 설치하는데 아래와 같은 이유로 제외해야 한다.

  • 이미 AirFlow가 설치되어 있어 설치할 필요가 없다.
  • 컨테이너 내부에서 동작 중인 AirFlow의 버전과 다를 경우 컨테이너의 오동작이 발생할 수 있다.

따라서 pip install로 설치할 때 필요한 패키지만 설치하기 위해 apache-airflow의 설치는 하지 않는다.

 

DAG 코드 작성

AirFlow에서 실행할 DAG과 워크플로우를 정의하는 코드를 작성한다.
해당 파일은 dags 폴더의 srtest_pipeline.py에 위치한다.
 
아래 포스팅에서 다룬 파이썬 패키지의 main 코드를 DAG로 구성한다고 이해하면 좋다.

__init__.py

해당 디렉터리가 파이썬 패키지의 일부임을 알려주는 역할을 한다.
해당 파일이 없다면 패키지의 구성 요소로 인식되지 않는다.
보통 파일 내용은 비워두는 경우가 많지만, import 할 항목을 지정할 수도 있다.
자세한 내용은 다음을 참고한다.

srtest_pipeline.py

DAG과 워크플로우를 정의하는 부분이다.
확장명이 .py이고 내부에 AirFlow의 DAG 클래스가 정의 되어 있으면 AirFlow가 인식해서 DAG으로 등록된다.
 
파일명에 특별한 규칙은 없어서 패키지이름_pipeline.py 형태로 명명했다.
이름을 정하는 것이 가장 어려운 편인데 접미사로 pipeline을 추가했다.
 
DAG 코드는 아래 포스팅에서 정의한 코드 구조에 따라 작성한다.
라이브러리 임포트
DAG과 워크플로우 구성에 필요한 라이브러리를 선언한다.
from airflow.operators.dummy import DummyOperator 
from airflow.operators.python import PythonOperator 
from airflow import DAG 
import os 
from datetime import datetime  # DAG 공통 속성 정의에서 시작 날짜 지정 
from tempfile import gettempdir  # 임시 경로 (/tmp) 사용 
from srtestairflow.srtest import SrTest  # DAG에서 실행할 기능 정의
from srtestairflow.callback import SrTestCallback  # DAG 성공 또는 Task 실패시 호출하는 콜백 함수

 

참고로 AirFlow의 DAG 코드의 import 부분에서는 절대 경로로 지정할 것이 권장된다.
아래 문서의 Best practices for module loading 항목을 참고한다.
 

Modules Management — Airflow Documentation

 

airflow.apache.org

 

 

공통 변수 정의
DAG 구성에 사용하기 위해 공통으로 사용하는 변수들을 정의한다.
변경이 자주 발생하거나 다른 DAG에서 공통적으로 사용하는 경우 AirFlow 상단의 Variables 기능을 활용한다.
## 공통 사용 변수 정의
app_name = "srtestairflow" 
config_folder = "srtest-config" 
config_file = "config.yml" 

# 원본 경로 /opt/bitnami/airflow/dags/srtest/srtest-config/config.yml 
src_file = os.path.join(os.environ.get("BITNAMI_PKG_EXTRA_DIRS"), app_name, config_folder, config_file) 

# 대상 경로 /tmp/srtestairflow/srtest-config/config.yml 
dst_file = os.path.join(gettempdir(), app_name, config_folder, config_file) 

# DAG의 id 
MAIN_DAG_ID = 'srtest_airflow'
 
테스트를 위한 예제이므로 /tmp 경로와 특정 폴더명, yml 파일명을 결합한 경로를 사용한다.
임시로 생성한 폴더와 yml 파일은 DAG의 마지막 작업에서 삭제한다.
 
DAG 공통 속성값 정의
DAG을 정의하는데 필요한 공통 속성 값을 정의한다.
## DAG 공통 속성 정의
args = { 
    'owner': 'airflow', 
    # start_date와 execution_date는 아래 문서를 참고한다.
    # https://airflow.apache.org/docs/2.2.3/scheduler.html 
    'start_date': datetime(2022, 4, 7),  # 연, 월, 일, 시, 분, 초 
    'email': ['your@email.address'], 
    # DAG 실행 중 에러가 발생하면, 
    # on_failure_callback으로 메일 발송하므로 해당 옵션은 False로 지정 
    'email_on_failure': False 
}
 
start_date 속성에 날짜를 전달하기 위해 datetime 함수를 사용했다.
코드 상단에 datetime을 임포트한 것을 기억하자.
 
DAG 정의
DAG을 정의하고 DAG의 공통 속성 값을 전달한다.
## DAG 정의
dag = DAG( 
    dag_id=MAIN_DAG_ID, 
    # DAG의 동시 실행을 방지한다. 
    max_active_runs=1, 
    default_args=args, 
    # 매일 오전 09:00(한국시간 기준)에 실행 
    # 분 시 일 월 요일 
    schedule_interval='0 9 * * *', 
    catchup=True 
)
 
DAG 변수는 전역 변수로 지정해야 하는 것을 명심한다.
필요한 경우 with 구문으로 작성할 수도 있다.  최근에는 이 방법을 더욱 권장하기도 한다.
 
여기서 가장 중요한 부분은 schedule_interval이다.
주기적으로 실행할 스케줄링 조건을 지정하면 된다.
이 설정으로 매번 수작업으로 실행하는 번거로움에서 해방 될 수 있다.
 
catchup은 아래 포스팅을 참고한다.
 
Task 정의
DAG에 포함 될 각 작업(Task)들을 정의한다.
 
Operator, Sensor, Hook 등을 사용할 수 있고, 예제에서는 Operator만 사용한다.
코드 상단에 DummyOperator와 PythonOperator를 임포트한 것을 기억하자.
 
display Task의 경우 Jinja Templating을 사용해서 DAG의 exectution_date를 함수의 인자로 전달했다.
SrTest 클래스의 display 함수에서는 해당 값을 사용해서 config.yml의 날짜 부분을 치환하고 출력한다.
## Task 정의
start = DummyOperator(task_id="start", dag=dag) 
end = DummyOperator(task_id="end",
    on_success_callback=SrTestCallback.send_finish_mail,
    dag=dag) 

# init 기능 (config.yml 파일을 임시 경로로 복사)
init = PythonOperator(task_id="init", 
    python_callable=SrTest.init, 
    op_kwargs={"src_file": src_file, "dst_file": dst_file}, 
    provide_context=True, 
    # 테스트 도중에는 실패 콜백 함수를 비활성화 (정식 운영할때만 주석 제거)
    # on_failure_callback=SrTestCallback.send_fail_mail, 
    dag=dag) 

# display 기능 (config.yml 내용을 읽고 execution_date로 치환하여 화면 출력)
display = PythonOperator(task_id="display", 
    python_callable=SrTest.display, 
    op_kwargs={"src_file": src_file, "dst_file": dst_file, "execution_date": "{{ execution_date }}"}, 
    provide_context=True, 
    # 테스트 도중에는 실패 콜백 함수를 비활성화 (정식 운영할때만 주석 제거)
    # on_failure_callback=SrTestCallback.send_fail_mail, 
    dag=dag) 

# cleanup 기능 (임시 경로에 복사한 config.yml 삭제)
cleanup = PythonOperator(task_id="cleanup", 
    python_callable=SrTest.cleanup,   
    op_kwargs={"dst_file": dst_file}, 
    provide_context=True, 
    # 테스트 도중에는 실패 콜백 함수를 비활성화 (정식 운영할때만 주석 제거)
    # on_failure_callback=SrTestCallback.send_fail_mail, 
    dag=dag)
 
on_failure_callback 옵션은 해당 Operator 작업이 실패한 경우 호출할 callback 함수를 지정한다.
본 예제에서는 callback.py에 정의한 SrTestCallback 클래스의 send_fail_mail 메소드를 호출했다.
 
on_success_callback 옵션은 on_failure_callback 옵션과 동일하다.
해당 Task가 성공한 경우에 지정한 함수를 callback 형태로 호출한다.
DummyOperator인 end Task가 성공하면 호출하도록 정의했다.
 
provide_context는 True로 설정했는데, 해당 DAG과 Task의 속성들을 kwargs로 사용 가능하다.
해당 옵션은 BaseOperator에 정의 되어 있다.

 

 
Task 배열
정의한 Task 들을 실행할 순서에 맞게 배열한다.
<<. >> 같은 Shift 연산자를 사용하고, set_upstream, set_downstream 함수도 사용할 수 있다.
## Task 배열
start >> init >> display >> cleanup >> end