::: IT인터넷 :::

AirFlow DAG 패키지를 AirFlow에서 사용하기

곰탱이푸우 2022. 7. 18. 08:20
파이썬 패키지로 작성하여 배포한 AirFlow의 DAG을 실제로 AirFlow에 적용하는 방법에 대해 정리한다.
 
다음 과정으로 진행한다.
  • AirFlow DAG 패키지 설치
  • AirFlow WebUI 확인
  • DAG 동작 상태 확인
 
 

AirFlow DAG 패키지 설치

파이썬 패키지로 작성하여 배포한 AirFlow의 DAG 패키지를 AirFlow에 설치한다.
해당 작업은 AirFlow의 Scheduler와 Worker 컨테이너 내부에서 진행해야 한다.
 

DAG 패키지 작성과 배포

AirFlow의 DAG을 파이썬 패키지로 작성하는 방법은 아래 포스팅을 참고한다.

Docker 컨테이너 진입

AirFlow에 DAG을 등록하는 코드이므로 pip로 설치하고 터미널에서 명령어로 실행해야 한다.
 
다음 명령으로 진입한다.
# airflow_웹서버_컨테이너_이름을 실행 중인 airflow Scheduler와 Worker 컨테이너 이름으로 수정한다.

# Scheduler 컨테이너인 경우
$ docker exec -it -u 0 airflow_Scheduler_컨테이너_이름 /bin/bash
# Workder 컨테이너인 경우
$ docker exec -it -u 0 airflow_Worker_컨테이너_이름 /bin/bash

# -u 0는 root 계정으로 접속한다는 의미이다.
# 해당 옵션이 없으면 존재하지 않는 계정인 I have no name으로 출력된다.

 

 

pip 설정 확인

일반적인 인터넷 환경에서는 컨테이너 내부에서 pip 사용이 가능하다.
폐쇄망인 경우 공식 저장소인 pypi.org에 접속할 수 없기 때문에 저장소 설정이 필요하다.
 
root 계정으로 로그인해서 /etc/pip.conf 파일을 생성하고 설정을 진행한다.
아래 포스팅의 pip 클라이언트 설정 항목을 참고한다.

패키지 설치

pip 명령을 사용해서 배포한 패키지를 설치한다.
 
아래 포스팅에서 배포한 DAG 패키지 이름으로 진행한다.
컨테이너 내부에서 AirFlow의 패키지들은 파이썬 가상 환경으로 관리되고 있다.
따라서 파이썬 가상 환경에 있는 pip 명령어를 사용해서 설치해야 한다.
해당 pip 파일은 /opt/bitnami/airflow/venv/bin 폴더에 있다.
 
아래와 같이 실행한다.
$ cd /opt/bitnami/airflow/venv/bin

# DAG 패키지 최신 버전 설치
$ ./pip install srtest-airflow

# 여러 버전 중 특정 버전을 지정하고 싶은 경우
$ ./pip install srtest-airflow==1.0.0
 
DAG 패키지를 작성할 때 AirFlow의 DAG과 Operator 등을 사용하기 위해 requirement.txt에 apache-airflow를 지정했다.
그러나 AirFlow Docker 컨테이너에는 이미 설치되어 있어 굳이 설치할 필요가 없다.
 
DAG 패키지를 작성할 때 install_requires에서 apache-airflow로 시작하는 패키지를 제외한 이유가 바로 여기에 있다.
해당 내용은 아래 포스팅의 setup.py 항목을 참고한다.
 

패키지 실행

DAG 패키지 설치가 완료되면 해당 패키지를 실행하여 dags 폴더에 DAG과 구성 파일들을 복사한다.
 
initialize 옵션을 지정해서 실행하면 된다.
해당 내용은 아래 포스팅의 initializer.py 항목을 참고한다.
아래와 같이 실행한다. 굉장히 빠르게 실행된다.
$ srtest-airflow initialize
Initialize is completed!
 
설치 된 파일 목록을 확인해보면 아래와 같이 생성 된 것을 확인할 수 있다.
 

주의 사항

앞에서 언급한대로 AirFlow의 Scheduler와 Worker 컨테이너 모두 설치해야 한다.
 
AirFlow의 Scheduler에 설치하지 않은 경우
Scheduler에 설치하지 않으면 아래와 같은 오류 메시지가 출력된다.
 
AirFlow의 Worker에 설치하지 않은 경우
Worker에 설치하지 않으면 DAG은 인식하지만 Worker에서 필요한 라이브러리가 없어서 동작하지 않는다.
 
Worker에서 동작 자체를 수행하지 못하고 AirFlow의 스케줄러는 해당 작업이 실패한 것으로 처리한다.
 
 

AirFlow WebUI 확인

AirFlow의 Scheduler와 Worker 컨테이너에 DAG 패키지 설치가 완료되면 아래처럼 DAG이 정상적으로 등록된다.
 

DAG 동작 상태 확인

시스템의 현재 날짜가 start_date 이후인 경우, start_date부터 현재 날짜까지 작업을 수행한다.
DAG를 정의할 때 catchup 옵션이 True로 되어 있는 경우만 해당 된다.
 
catchup 옵션은 아래 포스팅을 참고한다.

on_failure_callback 주석 처리

테스트 환경에서 잘 동작해도 실환경에서 고려하지 못한 부분으로 인한 오류가 발생할 수 있다.
따라서 처음 DAG을 셋팅할때는 Task 실패가 생각보다 많이 발생한다.
 
DAG 코드에서 Task가 실패하면 메일을 보내는 on_failure_callback 부분에 주석 처리한 이유가 바로 여기에 있다.
# init 기능 (config.yml 파일을 임시 경로로 복사)
init = PythonOperator(task_id="init", 
    python_callable=SrTest.init,
    op_kwargs={"src_file": src_file, "dst_file": dst_file},
    provide_context=True,
    # 테스트 도중에는 실패 콜백 함수를 비활성화 (정식 운영할때만 주석 제거)
    # on_failure_callback=SrTestCallback.send_fail_mail,
    dag=dag)
 
처음 DAG을 셋팅할 때 발생하는 오류는 DAG 코드의 버그가 원인인 경우가 많다.
따라서 초반에는 해당 부분에 주석으로 처리해서 과도한 이메일 발송을 차단해야 한다.
 
버그 수정이 완료되고 정상적으로 동작하기 시작하면 해당 부분의 주석을 제거한다.
Task 실패의 원인이 DAG이 아닌 다른 요소에 발생했기 때문이다.
 
알림 메일을 설정하는 이유는 드물게 발생하는 이벤트를 빠르게 인식하기 위함이다.
너무 많은 알림 메일은 스팸 메일이고 노이즈일 뿐 아무 도움이 되지 못한다는 것을 참고한다.
 
 

Task 실행

DAG에 정의한 Task들이 정상적으로 실행되는지 확인하기 위해 수동으로 실행한다.
우측 상단의 ▶ 의 Trigger DAG을 클릭해서 실행하면 현재 시간을 start_date로 하여 DAG이 실행된다.
 

Task 실행 결과 확인

Task의 실행 결과를 확인한다.
우측 상단의 Auto-refresh를 활성화하거나 화살표 버튼을 클릭하면 스스로 업데이트 한다.
굳이 F5 버튼이나 웹페이지 새로고침을 할 필요는 없다.
 
DAG의 버그 수정이 완료되어 정상적으로 실행되면 모두 성공을 의미하는 녹색으로 표시된다.
 
앞 부분에 보면 실패를 의미하는 빨간색이 많은 것을 알 수 있다.
초반 DAG 코드에 있던 버그로 인한 것이다.
 
이러한 부분을 사전에 줄이기 위해서는 DAG 패키지의 테스트 코드에 DAG 자체에 대한 테스트도 추가하는 것이 좋다.
 
 

init 로그 확인

init Task 부분의 녹색을 클릭하면 해당 Task에 대한 정보가 출력된다.
상단의 Log를 클릭하면 해당 Task가 실행되면서 생성 된 로그를 확인할 수 있다.
 
아래와 같은 로그가 확인된다.
특별한 오류나 예외 발생 없이 정상적으로 실행 된 것을 확인할 수 있다.
 

display 로그 확인

display 로그도 동일한 방법으로 확인한다.
특별한 오류나 예외 발생 없이 정상적으로 실행 된 것을 확인할 수 있다.

 

로그 중에 Current Date와 Body: 부분이 중요하다.
Task가 실행 중인 2022-04-22 날짜에 맞게 변환 된 것을 확인할 수 있다.
의도한 기능이 정상 동작한다.
 

cleanup 로그 확인

cleanup 로그도 동일한 방법으로 확인한다.
특별한 오류나 예외 발생 없이 정상적으로 실행 된 것을 확인할 수 있다.

 

 

on_success_callback 동작 확인

cleanup Task를 정의할 때 성공하면 on_success_callback으로 이메일을 전송하도록 정의했다.
 
해당 코드는 아래와 같다.
# cleanup 기능 (임시 경로에 복사한 config.yml 삭제)
cleanup = PythonOperator(task_id="cleanup",
    python_callable=SrTest.cleanup, 
    op_kwargs={"dst_file": dst_file}, 
    provide_context=True, 
    # 테스트 도중에는 실패 콜백 함수를 비활성화 (정식 운영할때만 주석 제거)
    # on_failure_callback=SrTestCallback.send_fail_mail,
    on_success_callback=SrTestCallback.send_finish_mail, 
    dag=dag)
 
DAG 워크플로우가 성공하면 실제로 메일을 보내는지 확인한다.
아래와 같이 제목과 본문의 날짜가 start_date 값으로 변경되어, 정상적으로 메일이 온 것을 확인할 수 있다.
 

schedule_interval

DAG을 정의할 때 실행 주기를 설정하는 schedule_interval을 매일 9시에 동작하도록 설정했다.
schedule_interval='0 9 * * *',
 
실제로 해당 시간에 정상 동작한 것을 확인할 수 있다. 
 
 
on_success_callback의 send_finish_mail 함수도 정상 동작해서 이메일이 발송되었다.