::: IT인터넷 :::

Jupyter Notebook으로 AirFlow 사용하기

곰탱이푸우 2022. 7. 21. 08:20
Jupyter Notebook으로 AirFlow를 사용하는 방법에 대해 정리한다.

 

Jupyter Notebook은 파이썬 기반의 데이터 과학에서 자주 사용하는 도구이다.
웹페이지 기반의 대화형 파이썬 인터프리터를 제공하기 때문에 간단한 코드 스니펫 검증에 자주 사용된다.
또한 데이터 변환이나 학습, 또는 EDA와 같은 데이터 분석을 진행하기도 한다.
 
AirFlow에서 Jupyter Notebook을 연동하여 사용하는 이유는 다음과 같다.
  • AirFlow DAG 코드를 간단하게 수정
  • 간단한 DAG 테스트를 위한 코드 작성
 
실제로 운영하고 있는 서버에서 테스트하는 것은 좋지 않다.
가급적 로컬 환경에 테스트 환경을 별도로 구성하고 진행하는 것을 추천한다.
 

 

파이썬으로 AirFlow의 DAG 패키지를 작성하는 방법은 아래 포스팅을 참고한다.
Bitnami에서 배포한 AirFlow Docker 이미지를 설정하는 방법은 아래 포스팅을 참고한다.
AirFlow의 DAG과 구성요소에 대한 내용은 아래 포스팅을 참고한다.

Jupyter Notebook 설정

Jupyter Notebook으로 AirFlow를 사용하기 위해서는 몇 가지 설정이 필요하다.
  • AirFlow의 dags 폴더를 공유 볼륨으로 설정한다.
  • Jupyter  Notebook 컨테이너 내부에 airflow 라이브러리를 설치해야 한다.
  • DAG가 외부 라이브러리를 사용하는 경우 Scheduler와 Worker에 설치해야 한다.
 
주의해야 하는 점은 Jupyter Notebook에서는 IDE와 같은 코드 교정 기능을 제공하지 않는다.
따라서 AirFlow에서 테스트할 때 오류가 많이 발생 할 수 있으므로 코드 작성할 때 주의해야 한다.
 

Docker 컨테이너 설정

Jupyter Notebook을 가장 편하게 사용할 수 있는 방법은 역시 Docker 컨테이너를 사용하는 것이다.
 
AirFlow 컨테이너를 설정할 때 같이 진행해도 되고, 별도로 구성할 수도 있다.
 
 
AirFlow 컨테이너와 함께 설정
이전 포스팅에서 Bitnami에서 배포한 AirFlow 컨테이너와 함께 구성하는 방법을 다룬 적이 있다.
 
아래 포스팅을 참고해서 진행한다.
별도 설정
이미 AirFlow 환경 구성이 되어있는 경우 별도 컨테이너를 띄워서 사용할 수도 있다.
 
다음과 같이 실행한다.
AirFlow 2.2.3 버전은 Python 3.8 버전을 사용하므로 가급적 /jupyter/base-notebook:3.8.8 Docker 이미지를 사용한다.
$ docker pull 저장소주소:포트/jupyter/base-notebook:latest

$ docker run -i -t -d -h jupyter -p 8888:8888 \
    --name jupyter --restart always \
    -e TZ=Asia/Seoul \
    -v dags폴더경로:/usr/local/airflow/dags \
    저장소주소:포트/jupyter/base-notebook:latest \
    start-notebook.sh \
    --NotebookApp.password='sha1:b70a055565b1:854e3f96f91c51cc0c91da371ea5dfbf7c1a61a1'  # 비밀번호 1234
    --NotebookApp.root_dir='/usr/local/airflow/dags'
 
실제로 실행하면 아래와 같이 Hash 값이 출력된다.
$ docker run -i -t -d -h jupyter -p 8888:8888 \
    --name jupyter --restart always \
    -e TZ=Asia/Seoul \
    -v /mnt/d/docker/airflow/dags:/usr/local/airflow/dags \
    jupyter/base-notebook:latest \
    start-notebook.sh \
    --NotebookApp.password='sha1:b70a055565b1:854e3f96f91c51cc0c91da371ea5dfbf7c1a61a1' \
    --NotebookApp.root_dir='/usr/local/airflow/dags'

063300daf01788c6086f2fbb09101c3a4f150c4342769388d17206c7b91b2c71

 

잘 실행되고 있는지 Jupyter Notebook 컨테이너의 로그를 출력한다.
$ docker logs jupyter

Entered start.sh with args: jupyter lab --NotebookApp.password=sha1:b70a055565b1:854e3f96f91c51cc0c91da371ea5dfbf7c1a61a1
Executing the command: jupyter lab --NotebookApp.password=sha1:b70a055565b1:854e3f96f91c51cc0c91da371ea5dfbf7c1a61a1
... 생략 ...
[I 2022-04-25 07:48:30.722 ServerApp]  or http://127.0.0.1:8888/lab
[I 2022-04-25 07:48:30.722 ServerApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
 
 
정상적으로 실행 된 것을 확인할 수 있다.
 
전체 로그는 다음과 같다.
 

접속 확인

설정한 8988 포트로 접속해보면 패스워드 입력을 요구한다.
 
Docker 컨테이너를 생성할 때 설정한 비밀번호인 1234를 입력하면 Jupyter Notebook 웹UI로 이동한다.
컨테이너를 생성할때 root_dir을 /usr/local/airflow/dags로 지정했다.
따라서 공유 볼륨으로 지정한 dags 폴더의 내용이 표시되는 것을 확인할 수 있다.
 

pip 설정

일반적인 인터넷 환경에서는 컨테이너 내부에서 pip 사용이 가능하다.
폐쇄망인 경우 공식 저장소인 pypi.org에 접속할 수 없기 때문에 저장소 설정이 필요하다.
 
root 계정으로 로그인해서 /etc/pip.conf 파일을 생성하고 설정을 진행한다.
$ docker exec -it -u 0 jupyter /bin/bash
$ vi /etc/pip.conf
 
또는 Jupyter Notebook 자체적으로 터미널 기능을 제공한다.
아래 방법으로 터미널을 띄워서 pip 설정을 진행해도 된다.
 
 
아래 포스팅의 pip 클라이언트 설정 항목을 참고한다.
vi 명령어가 실행되지 않으면 아래 방법을 사용한다.
  • 로컬의 pip.conf 파일을 dags 폴더에 복사한다.
  • 루트 권한 (-u 0 옵션)으로 컨테이너 내부로 진입한다.
  • 컨테이너 내부에서 dags 폴더의 pip.conf 파일을 /etc 경로에 복사한다.
 

AirFlow 라이브러리 설치

컨테이너 내부에서 AirFlow 라이브러리를 설치한다.
 
운영 중인 AirFlow의 버전과 동일하게 지정한다.
패키지 이름에 apache가 포함되어 있는 것에 주의한다.
$ pip install apache-airflow==2.2.3

 

 

Jupyter Notebook 사용하기

노트북 생성

Launcher 탭의 Other에 있는 Python File 아이콘을 클릭한다.
 
또는 File - New - Python File을 클릭한다.
 

노트북 저장

File - Save Notebook As... 를 클릭한다.

 

저장할 파일명을 지정하고 저장한다.
 
 

DAG 코드 작성

테스트할 DAG 코드를 아래와 같이 작성하고 저장한다.
 
작성한 코드는 다음과 같다.
from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from airflow import DAG 
from datetime import datetime

with DAG(dag_id="testdag", start_date=datetime(2022, 4, 7)) as testdag:
    start = DummyOperator(task_id="start") 
    end = DummyOperator(task_id="end")

    # START TaskGroup Example
    with TaskGroup(group_id="group1", tooltip="Tasks for group1") as group1:
        task1 = DummyOperator(task_id="task1")
        task2 = BashOperator(task_id="task2", bash_command="echo test_group")
        task3 = DummyOperator(task_id="task3")

        task1 >> [task2, task3]

    start >> group1 >> end
 

외부 라이브러리 설정

파이썬과 AirFlow 라이브러리 외에 사용하고자 하는 외부 라이브러리가 있는 경우 설치해야 한다.
 
해당 라이브러리를 설치해야 하는 곳은 다음과 같다.
  • Jupyter Notebook 컨테이너 (Notebook의 DAG 실행에 필요)
  • AirFlow Scheduler 컨테이너 (AirFlow 웹서버의 DAG 등록에 필요)
  • AirFlow Worker 컨테이너 (AirFlow에서 DAG 실행에 필요)
 
AirFlow의 Scheduler와 Worker 컨테이너에 설치하는 방법은 아래 포스팅을 참고한다.
Jupyter Notebook 자체적으로 터미널 기능을 제공한다.
아래 방법으로 터미널을 띄워서 라이브러리를 설치해도 된다.
 
 

AirFlow에서 활용

WebUI 확인

작성한 DAG 코드에 이상이 없다면 아래와 같이 작성한 DAG이 추가 된 것을 확인할 수 있다.
 

DAG 확인

해당 DAG을 클릭하면 아래와 같이 상세 정보를 확인할 수 있다.

 

과거 동일한 이름의 DAG이 있었다면 해당 실행 기록이 연결되어 출력되는 것을 참고한다.
가급적 고유한 이름을 사용하는 것이 중요하다.
(Database에서 삭제하면 되지만 귀찮은 작업이다..)
 

DAG 실행과 확인

우측 상단의 ▶ 버튼을 클릭해서 DAG을 실행시킨다.
 
상단에서 작성한 DAG 코드에서 DummyOperator와 BashOperator를 사용했다.
DummyOperator는 로그가 남지 않으므로 실제 로그가 남은 BashOperator인 group1.task2의 로그를 확인한다.
 
로그를 확인해보면 BashOperator에서 작성한 echo test_group 명령이 정상적으로 수행 된 것을 확인할 수 있다.
 
 

참고사항

작성한 DAG 코드에 오류가 존재하는 경우 아래와 같은 오류 메시지를 출력한다.
 
일반적인 코드 문법 오류인 경우 Jupyter Notebook에서 코드를 수정하고 저장한다.
임포트 오류인 경우 외부 라이브러리 설정 항목을 참고하여 라이브러리를 설치한다.