::: IT인터넷 :::

AirFlow의 DAG을 파이썬 패키지로 구성하기 (3)

곰탱이푸우 2022. 7. 11. 08:20
DAG를 파이썬 패키지로 구성하는 방법에 대해 정리한다.
  1. 기능 정의
  2. 프로젝트 생성
  3. DAG 코드 작성
  4. 기능 코드 작성 (1)
  5. 기능 코드 작성 (2)
  6. 테스트 코드 작성
  7. 패키지 정의
  8. 테스트
  9. 형상 관리, 빌드, 배포
 
기능 정의, 프로젝트 생성, DAG 코드 작성은 아래 포스팅을 참고한다.
기능 코드는 항목이 많아서 나눠서 정리한다.
config.yml, srtest.py, emailclient.py에 대해 살펴본다.
 
먼저 다룬 __init__.py, info.py, initializer.py, callback.py는 아래 포스팅을 참고한다.
 

기능 코드 작성

기본적인 DAG 구성을 마쳤으니 DAG의 설정과 실행에 필요한 기능들을 정의한다.
 

srtest-config/config.yml

AirFlow 예제 코드의 Task에서 사용하는 yml 포맷의 설정 파일이다.
 
파이썬 예제 프로그램에서 다뤘던 config.yml 파일과 동일하다.
아래 포스팅의 srtest-config/config.yml 항목을 참고한다.

srtest.py

DAG 워크플로우에서 수행할 작업의 코드를 정의한다.
크게 3가지 작업을 수행한다.
  • init - 설정 파일이 존재하는지 확인하고 지정한 경로에 파일 복사
  • display - 지정한 yml 파일을 읽고 날짜 값 변경 후 화면에 출력
  • cleanup - 생성한 설정 파일을 삭제하고 폴더가 비어 있으면 폴더도 삭제

 

임포트와 클래스 정의
해당 코드에서 사용할 라이브러리를 임포트하고 클래스를 정의한다.
 
import os 
import shutil 
import arrow
from srtestairflow.emailclient import EmailClient

class SrTest: 
    """ 
    SrTest 는 DAG에서 정의한 워크플로우를 수행하는 기능들을 정의한 클래스이다. 
    """ 
    result = None  # display 기능에서 반환 받은 결과 저장​
 
EmailClient에 email 발송 기능이 없는 것에 주의한다.
yml 설정 파일을 읽고 본문을 수정하여 화면에 출력하는 기능을 수행한다.
파이썬 예제코드에서 정의했던 기능과 동일하다.
 
arrow  라이브러리를 사용하는 것에 주의한다.
해당 라이브러리는 timestamp를 날짜 형태로 변환, 시간대 변경 등 시간과 날짜 정보를 처리하는 라이브러리이다.
pip로 설치해야 사용 가능하므로, requrement.txt에 arrow를 추가해서 의존성을 정의해야 한다.
 
생성자를 사용하지 않고 클래스의 속성으로 result를 정의했다.
 
 
init 기능
지정한 경로에 config.yml 설정 파일이 존재하는지 확인하고, 지정한 경로에 해당 파일을 복사한다.
 
AirFlow의 PythonOperator에서 함수나 클래스를 호출하는 python_callable에서 인스턴스 객체는 인식하지 못한다.
따라서 각 함수들을 classmethod 데코레이터를 사용해서 정의한다.
함수의 첫 번째 전달 인자에 cls를 전달하는 것에 주목한다.
 
os 모듈의 dirname 함수를 사용해서 해당 파일의 폴더 경로를 추출한다.
해당 폴더가 존재하는지 확인하고 없으면 os 모듈의 makedirs 함수로 폴더를 생성한다.
 
os 모듈의 makedirs 는 2단계 이상의 하위 폴더도 생성 가능하다.
폴더 생성 이후 yml 파일을 대상 경로로 복사한다.
 
yml 파일이 존재하지 않으면 이후 과정을 실행할 수 없으므로 예외를 발생시킨다.
코드는 다음과 같다.
class SrTest:
    ... 중략 ...

    @classmethod
    def init(cls, src_file:str, dst_file:str) -> None: 
        """
        config.yml 파일을 임시 경로로 복사한다.
        :param src_file: 원본 config.yml 파일 경로
        :param dst_file: 복사할 config.yml 파일 경로
        :param **context: AirFlow DAG과 Task의 정보
        """
        dst_folder = os.path.dirname(dst_file) 
         
        # config 파일을 복사할 경로 확인 후, 해당 경로에 설정 파일을 복사한다. 
        if os.path.exists(dst_file) is False: 
            if os.path.exists(dst_folder) is False: 
                os.makedirs(dst_folder) 
             
            shutil.copyfile(src_file, dst_file) 
             
            print(f"Copy completed!\r\n") 
        else: 
            raise FileExistsError(f"config file already exists: {dst_folder}!")
 
 
display 기능
config.yml 파일을 읽고 날짜 부분을 AirFlow의 execution_date로 변경하고 화면에 출력한다.
DAG에서 display Task에서 execution_date 인자에 Jinja Templating인 {{ execution_date }}를 전달한 것을 기억하자.
execution_date를 arrow의 get 함수에 전달해서 Arrow 인스턴스로 변환한다.
이어서 to 함수에 Asia/Seoul을 전달해서 기준 시간대를 한국(서울)으로 변경한다.
 
EmailClient 인스턴스를 생성하고 set_curr_time 함수에 execution_date를 전달한다.
set_email_info 함수를 호출해서 yml 파일에 정의 된 날짜를 execution_date로 변경하고 result 변수에 반환한다.
그리고 result의 내용을 화면에 출력한다.
 
코드는 다음과 같다.
class SrTest:
    ... 중략 ...

    @classmethod
    def display(cls, src_file:str, dst_file:str, execution_date:str) -> None: 
        """
        config.yml 내용을 읽고 execution_date로 치환하여 화면에 출력한다.
        :param dst_file: 복사한 config.yml 파일 경로
        :param execution_date: DAG의 start_date
        """
        if not os.path.exists(dst_file): 
            cls.init(src_file, dst_file)
       
        if os.path.exists(dst_file): 
            # 현재 실행 중인 execution_date 확보
            today = arrow.get(execution_date).to("Asia/Seoul")
            # EmailClient 클래스의 인스턴스 생성 
            client = EmailClient() 
         
            # EmailClient 인스턴스의 현재 시간 정보 업데이트 
            client.set_curr_time(today) 
            cls.result = client.set_email_info(dst_file) 
            # 결과 출력 
            print(f"-- result --") 
             
            for key in cls.result.keys(): 
                element = cls.result[key] 
                print(f"{key}: {element}") 
        else: 
            raise FileNotFoundError(f"config file is not exist!:\r\n{dst_file}")
 
cleanup 기능
워크플로우는 DAG의 schedule_interval에 의해 실행되거나 수동으로 직접 실행할 수 있다.
cleanup을 해주는 이유는 워크플로우가 실행 될 때마다 항상 동일한 상태로 시작하기 위함이다.
 
init 기능이 생성한 config.yml 파일을 삭제하고, 삭제 후 폴더가 비어 있으면 폴더도 삭제한다.
init 기능과 거의 유사하지만 반대 순서로 이해하면 좋다.
 
코드는 다음과 같다.
class SrTest:
    ... 중략 ...

  @classmethod
    def cleanup(cls, dst_file:str) -> None: 
        """
        임시 경로에 복사한 config.yml 파일을 삭제한다.
        :param dst_file: 삭제할 config.yml 파일 경로
        :param **context: AirFlow DAG과 Task의 정보
        """
        dst_folder = os.path.dirname(dst_file) 
         
        # config 파일을 삭제할 경로 확인하고 삭제한다. 
        if os.path.exists(dst_folder) is True: 
            if os.path.exists(dst_file) is True: 
                os.remove(dst_file) 
            else: 
                raise FileNotFoundError(f"config file is not exist: {dst_file}!") 
             
            # 폴더에 파일이 없는 경우 해당 폴더 삭제 
            if not os.listdir(dst_folder): 
                try: 
                    os.rmdir(dst_folder) 
                except OSError as e: 
                    print(f"Error: {dst_folder} : {e.strerror}")
                 
            print(f"Remove completed!\r\n") 
        else: 
            raise FileNotFoundError(f"config file is not exist: {dst_folder}!")
 

 

 

emailclient.py

srtest.py에 정의한 display 기능에서 사용한 EmailClient 클래스에 대한 코드이다.
 
전체적인 코드 구조는 기존 파이썬 예제 코드에서 다뤘던 구조와 동일하다.
아래 내용은 변경 사항 중심으로 정리한다.
 
임포트와 클래스 정의
EmailClient에서 사용할 라이브러리를 선언하고 클래스를 정의한다.
import arrow
import textwrap
import yaml


class EmailClient:
    """
    EmailClient 는 config.yml 파일을 읽어서 메시지 형식을 채우는 클래스이다.
    """
 
arrow  라이브러리를 사용하는 것에 주의한다.
해당 라이브러리는 timestamp를 날짜 형태로 변환, 시간대 변경 등 시간과 날짜 정보를 처리하는 라이브러리이다.
pip로 설치해야 사용 가능하므로, requrement.txt에 arrow를 추가해서 의존성을 정의해야 한다.
 
 
생성자
객체 지향 언어에서 사용하는 클래스는 모두 생성자를 가지고 있다.
인스턴스가 생성될 때 기본적으로 가지고 있어야 하는 속성 값을 정의하는 기능이다.
 
EmailClient 인스턴스는 전달 받은 날짜로 현재 날짜를 설정하고, config.yml의 내용을 해당 날짜로 변경하는 기능을 수행한다.
따라서 해당 클래스에서 고정적으로 사용할 curr_time 속성을 정의했다.
class EmailClient:
    ... 중략 ...

    def __init__(self):
        self.curr_time = None
 
클래스의 인스턴스는 Task가 종료되면 삭제된다.
따라서 다른 Task와 공유하려는 경우 Variables 기능 또는 Xcom을 통한 데이터 공유가 필요하다.
 
해당 내용은 아래 포스팅을 참고한다.
set_curr_time 기능
해당 함수가 호출 될 때 전달 받은 날짜와 시간 정보를 curr_time 속성에 할당한다.
 
Arrow 클래스 타입으로 전달 되어야 한다.
해당 타입은 datetime처럼 연월일을 추출하여 사용할 수 있다.
 
코드는 다음과 같다.
class EmailClient:
    ... 중략 ...

    def set_curr_time(self, curr_time: arrow.arrow.Arrow) -> None:
        """
        현재 시간을 기록한다.
        :param curr_time: Arrow 클래스 타입으로 변환 된 DAG의 execution_date
        """
        self.curr_time = curr_time

 

 
set_email_info 기능
YAML 포맷의 설정 파일을 읽어서 Dictionary 타입으로 반환하는  함수이다.
전체 코드는 파이썬 예제코드에서 다뤘던 set_email_info 기능에서 다뤘던 내용과 동일하다.
 
파이썬 예제코드에서는 실행하는 시점의 시스템 시간을 사용했다.
AirFlow 예제 코드는 set_curr_time 함수로 정의한 Arrow 클래스 타입의 시간 정보를 사용한다.
 
연도, 월, 일 값을 추출해서 _apply_value_to_magic_keyword 함수에 전달한다.
해당 함수는 yml 파일에서 읽은 특정 필드의 값을 변경해서 반환해준다.
 
코드는 다음과 같다.
class EmailClient: 
    ... 중략 ...

    def set_email_info(self, config_file_path: str) -> dict:
        """
        YAML 파일을 읽어서 메시지 형식을 채운다.

        :param config_file_path: yaml 파일의 경로
        """       
        with open(config_file_path, "r", encoding="utf-8") as yaml_file:
            yaml_dict = yaml.load(yaml_file, Loader=yaml.Loader)

        # yaml 파일을 dict 형태로 읽어서 변수에 저장
        intermediate_payload = yaml_dict['common']['email']

        # 원하는 값 추출
        curr_time = self.curr_time
        year = str(curr_time.year)
        month = str(curr_time.month).zfill(2)
        day = str(curr_time.day).zfill(2)

        print(f"-- Current Date --")
        print(f"year = {year}")
        print(f"month={month}")
        print(f"day={day}\r\n")

        # 본문의 날짜를 추출한 값으로 치환
        payload = self._apply_value_to_magic_keyword(dictionary=intermediate_payload,
                                                    year=year, month=month, day=day)

        # 본문 템플릿
        message_body = textwrap.dedent(f"""
            {payload["date"]}에 테스트를 수행했습니다.
            From bearpooh.com auto-mailing
            """)

        message = dict()

        # 반환할 값을 dict 형태로 생성
        message['From'] = payload["sender"]
        message['To'] = payload["receiver"]
        message["Subject"] = payload["subject"]
        message["Body"] = message_body

        return message
 
아래 포스팅의 set_email_info 기능 부분을 참고한다.
 
_apply_value_to_magic_keyword 기능
set_email_info 기능에서 본문의 연도, 월, 일을 변경하기 위해 호출한 함수이다.
 
전체 코드는 파이썬 예제코드에서 다뤘던 _apply_value_to_magic_keyword 기능에서 다뤘던 내용과 동일하다.
 
아래 포스팅의 _apply_value_to_magic_keyword  기능 부분을 참고한다.