::: IT인터넷 :::

Livy로 Spark Application 실행하기 (2) - Spark Application 실행

곰탱이푸우 2023. 10. 26. 08:20
Docker 컨테이너 내부 설정이 완료되면, Livy 서버에 Spark Application을 Submit하는 코드를 작성하고 실행한다.
Jupyter Notebook은 파이썬 REPL 환경을 제공하기 때문에, 코드 작성과 실행은 Jupyter Notebook을 사용한다.
 
AirFlow Jupyter 컨테이너를 구성하는 방법은 아래 문서를 참고한다.
Livy 서버와 Spark의 설정을 변경하는 방법은 아래 문서를 참고한다.
하둡 클러스터에 Livy 서버를 설치하는 방법은 아래 문서를 참고한다.
Livy로 Spark Application을 실행하는 방법은 아래 순서로 진행한다.
 
Spark Application 실행은 Livy 서버의 RestAPI를 사용하며 아래 과정으로 진행한다.
  • Jupyter Notebook의 작업 폴더 권한 설정과 작업할 노트북 생성
  • 생성한 노트북에 Spark Application을 실행하고 모니터링하는 코드 작성
  • 사용할 데이터와 실행할 Jar 파일을 HDFS에 업로드
  • 작성한 Livy 코드를 실행하여 Spark Application Submit
 

Spark Application 실행하기

기존에 작성했던 Spark Applicaion을 Jar로 빌드하고, Livy를 통해 Spark에 제출하여 정상 실행되는지 확인한다.
 
Spark Application과 데이터 파일은 아래 문서를 참고한다.

Livy 클라이언트 설정

Livy 서버에 Spark Application을 Submit 하기 위한 클라이언트 환경을 설정한다.
 

dags 폴더 권한 변경

airflow_jupyter_1 컨테이너는 AirFlow의 dags 디렉토리를 공유 볼륨으로 사용하며, Jupyter Notebook의 기본 작업 폴더로 지정되어 있다.
따라서 Jupyter Notebook으로 해당 디렉토리의 권한을 변경해야 노트북 파일을 생성할 수 있다.
 
아래와 같이  airflow_jupyter_1 컨테이너에 진입하여 dags 디렉토리의 권한을 변경한다.
dags 디렉토리의 경로는 /usr/local/airflow/dags이다.
$ docker exec -it -u root airflow_jupyter_1 /bin/bash
$ chmod -r 777 /usr/local/airflow/dags

 

Jupyter Notebook 접속

폴더 권한 수정이 완료되면 Jupyter Notebook에 접속하고 노트북 파일을 생성한다.
 
http://localhost:8888/ 또는 http://서버주소:8888/ 에 접속하면 아래와 같은 화면이 나타난다.
Notebook 탭의 Python 3 (ipykernel) 버튼을 클릭한다.
 
아래와 같이 Untitled.ipynb 파일이 생성된다.
만약 파일이 정상적으로 생성되지 않으면 dags 디렉토리의 권한을 재확인한다.
 
공동으로 작업하는 경로이므로 파일명을 변경해야 한다.
파일명 탭 우클릭하고 Rename Notebook을 선택하거나  또는 Ctrl + Shift + S를 입력하고 LivyClient.ipynb로 변경한다.
 
 

Spark Application Submit 코드 작성

requests  모듈을 통해 Livy 서버의 REST API를 호출하는 코드를 사용한다.
각 기능별 코드들은 블록을 나눠서 작성하는 것이 편리하다.
 

라이브러리 Import 및 기본 값 설정

코드 실행에 필요한 라이브러리를 임포트하고, 실행에 필요한 공통 변수들을 정의한다.
 
자세한 내용은 아래 코드와 주석을 참고한다.
import time
import json
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from http import HTTPStatus

# Spark Session 실행 상태를 조회하는 시간 간격 조절
SLEEP_TIME = 5

# requests의 요청이 실패한 경우 재시도 조건 정의
# 총 5회 재시도하며, 404, 429, 500, 504인 경우는 제외
retry_strategy = Retry( total=5,
                        status_forcelist=[
                            HTTPStatus.NOT_FOUND.value,  # 404
                            HTTPStatus.TOO_MANY_REQUESTS.value,  # 429
                            HTTPStatus.INTERNAL_SERVER_ERROR.value,  # 500
                            HTTPStatus.BAD_GATEWAY.value],  # 504
                        backoff_factor=1)

adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)

# Livy 서버 정보 정의 (주소, 헤더, 사용자 등)
livy_host = "http://bdp02.bearpooh.com:8998"
livy_headers = {"X-Requested-By": "harbor", "Content-Type": "application/json"}
livy_user = "hadmin"
livy_queue = "default"
livy_batch_url = livy_host + "/batches"

 

Submit Payload 작성

Spark Application을 Submit 하기 위해 Livy 서버에 REST API로 요청하는 Payload를 작성한다.
먼저 개별 변수에 값을 정의하고, Dictionary 타입으로 통합하여 전달한다.
 
변수 정의
실행하고자 하는 Spark Application의 정보와 사용할 자원 정보를 개별 변수에 정의한다.
데이터와 Application의 경로는 앞에서 진행한 HDFS의 경로를 사용해야 한다.
 
실행하는 Spark Application은 Spark 예제 프로그램을 활용한다.
SparkExample Application의 실행 인자는 아래 문서를 참고한다.
자세한 내용은 아래 코드와 주석을 참고한다.
# 실행하려는 Spark Application의 Main 클래스명
className = "com.bearpooh.bdp.sparkexample.SparkExampleApp"
# 실행하려는 Jar 파일의 HDFS 경로
file = "hdfs:///data/jars/sparkexample_2.12-1.0.0.3.jar"

# Spark Application을 실행할때 사용할 Driver와 Executor의 자원 정보
driverCores = 1
driverMemory = "1g"
executorCores = 2
executorMemory = "1g"
numExecutors = 2

# 실행하려는 Spark Application이 사용하는 의존성 라이브러리와 저장소 설정 정보 
conf = {
    'spark.jars.ivy': '/home/livy/.ivy2/',
    'spark.jars.ivySettings': '/home/livy/.ivy2/ivysettings.xml',
    'spark.jars.packages': ','.join(['com.twitter:util-app_2.12:22.12.0',
                                    'org.scalatest:scalatest_2.12:3.2.16'])
}

# 실행하려는 Spark Application에 전달하는 실행 인자 목록
# 사용하는 데이터 경로는 HDFS의 경로를 사용한다.
args = ['-srcDir=hdfs:///data/sparkexample/raw/',
        '-dstDir=hdfs:///data/sparkexample/output/',
        '-saveFlag=True']
 
Payload 정의
Spark Application 실행을 위해 Livy 서버의 REST API에 전달할 Payload를 생성한다.
 
변수에 정의한 Spark Applicatoin과 사용할 자원 정보를 Dictionary 타입으로 재구성한다.
payload = {
    "proxyUser": livy_user,    # Spark Application을 실행하려는 계정 정보
    "queue": livy_queue,    # Spark Application을 실행하려는 YARN의 작업 영역
    "name": className.split(".")[-1],    # 실행하고자 하는 Application 이름
    "className": className,    # Spark Application의 Main 함수가 존재하는 클래스 이름
    "file" : file,    # 실행하려는 Spark Application JAR 파일의 HDFS 경로 
    "driverCores": driverCores,    # Spark Driver의 코어 개수
    "driverMemory": driverMemory,    # Spark Driver의 메모리 크기
    "executorCores": executorCores,    # Spark Executor의 코어 개수
    "executorMemory": executorMemory,    # Spark Executor의 메모리 크기
    "numExecutors": numExecutors,    # Spark Executor의 개수 
    "conf": conf,    # Spark이 사용하는 라이브러리 저장소 정보 
    "args": args    # 실행하려는 Spark Application의 실행 인자 정보
}

payload
 
정의한 Payload는 아래와 같이 출력된다.
{'proxyUser': 'hadmin',
'queue': 'default',
'name': 'SparkExampleApp',
'className': 'com.bearpooh.bdp.sparkexample.SparkExampleApp',
'file': 'hdfs:///data/jars/sparkexample_2.12-1.0.0.3.jar',
'driverCores': 1,
'driverMemory': '1g',
'executorCores': 2,
'executorMemory': '1g',
'numExecutors': 2,
'conf': {'spark.jars.ivy': '/home/livy/.ivy2/',
  'spark.jars.ivySettings': '/home/livy/.ivy2/ivysettings.xml',
  'spark.jars.packages': 'com.twitter:util-app_2.12:22.12.0,org.scalatest:scalatest_2.12:3.2.16'},
'args': ['-srcDir=hdfs:///data/sparkexample/raw/',
  '-dstDir=hdfs:///data/sparkexample/output/',
  '-saveFlag=True']}
 
 

Submit 코드 작성

Livy 서버의 REST API를 호출하여 Spark Application을 제출하는 코드를 작성한다.
 
post 형태로 요청하며 실행하려는 Spark Application의 정보를 포함한 Dictionary 타입의 Payload를 JSON 형태로 변환하여 전달한다.
response = http.post(livy_batch_url, headers=livy_headers, data=json.dumps(payload))

response
 

상태 모니터링 코드 작성

Spark Application이 정상적으로 실행되는지 모니터링 하는 코드를 작성한다.
 
Livy 서버는 Spark Application을 Spark에 전달하는 중계 역할을 수행한다.
또한 Spark과 YARN과 통신하며 Application의 실행 상태를 모니터링 할 수 있다.
 
사용자 입장에서는 Spark이나 YARN에 직접 접근하지 않고, Livy 서버를 통해 Application의 실행 상태를 확인하고 조치할 수 있는 장점이 된다.
Livy 서버에 대한 과도한 확인 요청은 서비스 운영에 영향을 줄 수 있으므로 4~5초에 한번씩 실행 상태를 확인한다.
 
자세한 내용은 아래 코드와 주석을 참고한다.
# Spark Application 실행을 요청한 Batch Session의 응답 확인
batch = response.json()

print("\n==========[JOB STATE]==========")
print(batch)

# Spark에서 실행 중인 Application의 고유한 식별값
app_id = None

# Spark앱이 죽거나, 완료될 때 까지 반복
while batch["state"] not in ("dead", "success"):
    # Livy 서버에 요청하는 시간 간격을 조절하기 위한 sleep
    time.sleep(SLEEP_TIME)

    try:    # Livy 서버의 특정 Batch 작업 ID에 대한 상태 정보 확인
        response = http.get(f"{livy_batch_url}/{batch['id']}/state", headers=livy_headers)
    # Livy 서버 요청이 실패하면 예외 메시지 출력
    except requests.exceptions.ConnectionError as e:    
        print(e)
        raise e

    # Livy 서버에 확인한 Application 실행 상태 정보 출력
    batch = response.json()
    print(response.text)

    # Spark Application이 실행 중이고 Applicaion의 ID 값이 없는 경우
    if not app_id and batch["state"] == "running":
        app_id = http.get(f"{livy_batch_url}/{batch['id']}", 
                          headers=livy_headers).json()["appId"]
        print("appId: " + app_id)
    # Spark Application의 ID 값이 존재하는 경우
    elif app_id:
        print(f"wait for applicationId: {app_id} to finish...")
    else:
        continue

# Spark Application의 실행 상태가 dead인 경우 예외 메시지 출력
if batch["state"] == "dead":
    print("Spark application failed.")
    raise Exception("Spark application failed.")
 

Batch 작업 삭제 코드 작성

Spark Application의 실행이 실패했지만 Livy 서버에 해당 Application 이름의 세션이 남아 있는 경우 재실행 할 수 없다.
 
일정 시간이 지나면 Livy 서버에서 종료 된 세션을 삭제하기 때문에 기다려도 된다.
그러나 빠른 재실행이 필요한 상황에서 무작정 기다리는 것은 적절한 선택이 아니다.
이를 위해서는 Livy 서버의 REST API에서 제공하는 특정 Batch 작업 삭제 기능을 사용하는 것이 좋다.
 
아래와 같이 DELETE 요청으로 삭제하고자 하는 Batch 작업의 ID (식별자)를 호출하면, 해당 Batch 작업을 삭제할 수 있다.
response = http.delete(f"{livy_batch_url}/{batch['id']}", headers=livy_headers)
response.json()

 

Spark Application의 초기 테스트에는 반복적인 실패가 발생할 수 밖에 없기 때문에, 이러한 Batch 작업 삭제 코드는 매우 유용하게 사용된다.
 
HDFS에 데이터와 Application 업로드
HDFS에 테스트 데이터와 Jar 파일을 업로드한다.
 

데이터 파일 업로드

SparkExample 패키지의 테스트 데이터로 활용했던,  logs.json 파일을 HDFS에 업로드한다.
 
테스트 데이터 관련 내용은 아래 문서를 참고한다.
아래와 같이 호스트에서 에지노드로 파일을 전송하고 HDFS에 업로드한다.
# 호스트에서 실행
$ scp -P 2222 /데이터/파일/경로/logs.json hadoop@에지노드호스트IP:/home/hadoop/workspace/
$ ssh -p 2222 hadoop@에지노드호스트IP -t 'bash -cli "hadoop fs -mkdir /data/sparkexample"'
$ ssh -p 2222 hadoop@에지노드호스트IP -t 'bash -cli "hadoop fs -mkdir /data/sparkexample/raw"'
$ ssh -p 2222 hadoop@에지노드호스트IP -t \
        'bash -cli "hadoop fs -copyFromLocal /home/hadoop/workspace/logs.json /data/sparkexample/raw"'
 
에지 노드 구성과 명령어 실행은 아래 문서를 참고한다.
하둡 클러스터의 HDFS에서 아래와 같이 확인되면 정상적으로 업로드 된 것이다.
 
 

Spark Application 업로드

Spark에서 실행하려고 하는 Application도 HDFS에 업로드해야 한다.
 
Spark Application 작성은 아래 문서를 참고한다.
Spark Application을 Jar 형태로 배포하는 것은 아래 문서를 참고한다.
Nexus에 배포 된 sparkexample의 Jar 파일을 다운로드한다.
아래 그림과 같이 배포 된 패키지의 상세 정보에서 Path 부분을 클릭하면 로컬에 다운로드 된다.
 
다운로드 된 파일명이 너무 길면 프로그램명_스칼라버전-버전.jar 형태로 변경한다. (ex. sparkexample_2.12-1.0.0.3.jar)
 
데이터 파일과 동일하게 Jar 파일을 엣지 노드로 복사하고 HDFS에 업로드한다.
# 호스트에서 실행
$ ssh -p 2222 hadoop@에지노드호스트IP -t 'bash -cli "mkdir /home/hadoop/workspace"'
$ ssh -p 2222 hadoop@에지노드호스트IP -t 'bash -cli "hadoop fs -mkdir /data/sparkexample/jars"'
$ scp -P 2222 /어플리케이션/Jar파일/경로/sparkexample_2.12-1.0.0.3.jar \
              hadoop@에지노드호스트IP:/home/hadoop/workspace/
$ ssh -p 2222 hadoop@에지노드호스트IP -t 'bash -cli "hadoop fs -copyFromLocal \
              /home/hadoop/workspace/sparkexample_2.12-1.0.0.3.jar /data/sparkexample/jars/"'
 

코드 실행으로 Spark Application 제출

Jupyter Notebook에서 위에 작성한 코드들을 순차적으로 실행하여 Spark Application을 실행한다.
 
Batch 작업 삭제 코드는 오류가 발생한 경우, 또는 빠른 재시작이 필요한 경우에만 실행한다.
 

Spark Application 실행 결과 확인

Spark Application을 실행하고 나서 실행 상태와 결과를 확인한다.
자세한 방법은 아래 문서를 참고한다.