::: 데이터 분석 :::

Spark Application 패키지 작성하기 (4) - 비즈니스 로직 작성

곰탱이푸우 2023. 9. 14. 08:20
기본적인 Scala 개발환경이 구축되고 나면 목표로 했던 Spark Application을 작성한다.
 
앞서 테스트에 사용한 Spark Application은 main 함수에 기능을 구현한 단순한 예제 프로그램이다.
기존에 Hadoop 기반의 Spark 클러스터를 구축했으므로, Spark Application은 해당 환경에서 실행할 수 있는 형태로 작성해야 한다.
 
따라서 Spark Application 작성을 위한 기본 형태 (Skeleton) 제공을 위한 예제 프로그램을 재작성한다.
참고로 예전에 실무에서 활용했던 Spark Application 코드에서 재사용 또는 공개가 가능한 코드만 활용하여 정리하였다.
 
전체적인 내용은 아래 문서를 참고하고, 변경 사항을 중심으로 정리한다.
전체 내용은 아래와 같은 순서로 작성한다.
  • 전체 구조와 기본 설정
  • 헬퍼 (Helper) 기능 작성
  • 데이터 처리 로직 작성
  • 비즈니스 로직 작성
  • 테스트 작성과 빌드, 배포
 
이전 단계 내용들은 아래 문서를 참고한다.
 

기능 구현

실제 기능을 수행하는 코드를 작성한다.
 

SparkExampleApp.scala

main 함수 기능을 수행하고 비즈니스 로직을 처리하는 코드를 작성한다.
실행 인자로 전달 된 값에 따라 수행할 기능을 선택적으로 수행한다.
 
내부에 구현하는 main 함수는 AppUtil에 정의한 main 함수에 의해 실행 (invoke) 되는 점을 참고한다.
 

라이브러리 임포트

굳이 손으로 타이핑하지 않아도 된다.
아래 설명할 코드들을 작성할 때 IntelliJ에서 Alt + Enter로 손쉽게 추가할 수 있다.
 
SparkExampleApp.scala에서 사용하는 라이브러리들을 선언한다.
  • 데이터 처리 로직을 정의한 SparkExample
  • 실제 main 함수와 Logger를 정의하고 실행 인자를 파싱하는 AppUtil
  • 하둡의 데이터 파일 경로를 처리하는 FileSystemHelper
  • 파싱한 실행 인자를 변환하여 변수에 저장하는 Flags
  • 하둡 경로의 예외 처리를 위한 FileAlreadyExistsException (이미 존재), PathNotFoundException (없음) 
  • FileSystemHelper에 암시적(Implicit)으로 전달하기 위한 SparkContext
  • 데이터 처리를 위한 Spark의 고수준 API를 제공하는 SparkSession
  • 데이터 처리 기능의 성공과 실패를 확인하고 후처리를 위한 Try, Success, Failure
 
코드는 다음과 같다.
package com.bearpooh.bdp.example.sparkexample

import com.sr.ocean.sparkexample.application.SparkExample
import com.sr.ocean.sparkexample.util.{AppUtil, FileSystemHelper}   
import com.twitter.app.Flag
import org.apache.hadoop.fs.{FileAlreadyExistsException, PathNotFoundException}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.util.{Failure, Success, Try}
 
 

싱글톤 객체 선언

싱글톤 (Singleton)은 하나 이상의 인스턴스를 가질 수 없는 형태의 클래스를 의미한다.
  • Scala에서는 object로 선언
  • new 키워드 대신 object 이름으로 직접 접근 (정적 클래스와 유사)
  • 실행 중인 JVM에서 최초 접근할 때 자동으로 인스턴스 생성
 
이러한 특징으로 인해 다음과 같은 상황에 주로 사용한다.
  • Application의 main 함수 구현
  • 특정 기능을 보조하는 유틸리티 (Helper) 객체 생성
 
아래 코드와 같이 class나 trait 대신 object로 객체를 선언하면 된다.
object SparkExampleApp extends AppUtil {
  // .. 생략 ..
}
 
앞서 작성한 AppUtil trait를 확장 (상속)하여 선언하며, 사용 가능한 기능은 다음과 같다.
  • main 함수 (Application의 실제 진입점)
  • Logger (Log4J)
  • 실행인자 처리 (Flags)
 

공통 사용 변수 정의

SparkExampleApp 객체에서 사용하는 공통 변수들을 정의한다.
 
예제 프로그램에서는 크게 3가지를 선언했다.
  • 실행 인자로 전달 된 값들을 변수에 저장 (Flag[T] 타입)
  • SparkSession 정의 (Application 이름, 리소스매니저 정의)
  • SparkContext 암시적 정의 (FileSystemHelper에 전달하기 위한 암시적 변수)
 
실제 코드는 다음과 같다. 값을 할당하는 구문이므로 크게 어렵지 않다.
object SparkExampleApp extends AppUtil {
  // Define Flags
  // flag[타입](이름, 기본값, 설명) 형태로 작성 (기본값은 없으면 생략 가능)
  val saveFlag: Flag[Boolean] = flag[Boolean]("saveFlag", false, "Optional. Save json data to parquet file.")
  val srcDir: Flag[String] = flag[String](name="srcDir", "Required. Directory path to json data file.")
  val dstDir: Flag[String] = flag[String](name = "dstDir", "Optional. Directory path to write parquet file.")

  // Define SparkSession
  // 실제로 호출될때 생성하기 위해 lazy (늦은 초기화) 형태로 선언한다.
  lazy val spark: SparkSession = SparkSession.builder()
    .appName(name)
    .master("yarn")
    .getOrCreate()

  // Implicit definition of SparkContext (for FileSystemHelper)
  implicit val sparkContext: SparkContext = spark.sparkContext

  // .. 생략 ..
}
 
 

경로 처리 함수 정의

전달 받은 HDFS 경로가 존재하는지 확인하는 기능을 정의한다.
SparkExample 기능에도 작성한 부분을 굳이 SparkExampleApp에 다시 작성한 이유는 다음과 같다.
  • SparkExampleApp 코드는 Spark 클러스터의 드라이버 (Driver)에서 실행된다.
  • SparkExample 코드는 Spark 클러스터의 익스큐터 (Executer)에서 실행된다.
  • Spark은 드라이버가 실제 실행할 데이터 처리 코드를 익스큐터에 전달해서 실행한다.
  • 실행 조건이 부합하지 않으면 익스큐터에 전달되기 전에 드라이버에서 중단해야 한다.
 
실제 코드는 다음과 같다. 중첩 된 조건문 패턴이므로  크게 어렵지 않다.
object SparkExampleApp extends AppUtil {
  // .. 생략 ..

  // Check data path existence
  def checkDataPath(path: String, mode: String): String = {
    // Scala에서는 단일 값에 대한 if - else if - ... - else 패턴은 match - case 패턴을 사용한다.
    mode match {
      case "write" =>    // 전달된 mode의 값이 write인 경우 (쓰기 모드)
        if (FileSystemHelper.exists(path))    // 경로가 있으면 예외 발생
          throw new FileAlreadyExistsException("The " + mode + " path is already exist! : " + path)
        else path    // 경로가 없으면 전달 된 path 반환
      case "read" =>    // 전달 된 mode의 값이 read인 경우 (읽기 모드)
        if (!FileSystemHelper.exists(path))    // 경로가 없으면 예외 발생
          throw new PathNotFoundException("The " + mode + " path is not exist! : " + path)
        else path    // 경로가 있으면 전달 된 path 반환
      case _ =>    // 그 외의 값이 전달 된 경우
        throw new IllegalArgumentException("mode is not valid! : " + mode)
    }
  }

  // .. 생략 ..
}
 
match 조건은 write (쓰기)와 read (읽기), _ (나머지)로 구분했다.
  • write - 데이터 파일을 저장하려는 경로가 이미 존재하는 경우 예외 발생 (덮어쓰기 방지)
  • read - 데이터 파일을 읽으려는 경로가 존재하지 않는 경우 (읽을 데이터가 없음)
  • _ -  write, read가 아닌 모든 값을 의미한다.
 
 

main 함수 정의

드라이버 노드에서 실행 될 데이터 처리 비즈니스 로직을 수행하는 main 함수를 정의한다.
AppUtil에 정의한 main 함수에 의해 실행 (invoke) 되는 점을 참고한다.
 
데이터 경로 정의
실행 인자로 전달한 데이터의 경로를 검증하고 문자열 변수에 할당한다.
Flags로 파싱한 인자를 경로 처리 함수에 전달하여 하둡의 HDFS에 존재하는지 확인한다.
데이터 저장 여부가 false인 경우 저장 경로(dstPath)는 "PathNotDefined" 문자열로 할당한다.
그리고 실행 인자로 전달되어 변수에 할당 된 값들을 로그로 출력한다.
 
실제 코드는 다음과 같다. 
object SparkExampleApp extends AppUtil {
  // .. 생략 ..

  def main(): Unit = {
    // Assign argument to variable
    val srcPath: String = checkDataPath(srcDir(), "read")    // 사용할 데이터 경로 (read)   
    // 설명 작성을 위한 익명함수 사용 (실제로는 {} 없이 한줄에 작성 가능)
    val dstPath: String = {    
      if(saveFlag()) checkDataPath(dstDir(), "write")    // 저장할 데이터 경로 (write)
      else "PathNotDefined"    // saveFlag가 false인 경우 기본값 할당
    }
   
    // 변수에 할당 된 실행 인자들의 로그 출력
    log.info("saveFlag: " + saveFlag())
    log.info("srcPath: " + srcPath)
    log.info("dstPath: " + dstPath)
   
    // .. 생략 ..
  }
}
 
기능 호출
데이터 처리 로직을 정의한 SparkExample 객체에 SparkSession 인자를 전달하여 인스턴스를 생성한다.
그리고 데이터 저장 여부에 따라 각 기능을 호출한다.
  • true인 경우 convertParquet 함수 호출 (JSON 데이터를 읽고 Parquet 포맷으로 저장)
  • false인 경우 countRecords 함수 호출 (전체 레코드 수와  상위 20개 레코드를 로그로 출력)
 
기능을 호출할 때에는 Scala의 Try로 감싸서 예외 발생 여부를 체크한다.
실행 성공 여부에 따라 아래 값들을 반환하고 result 변수에 할당된다.
  • 오류가 없이 종료되면 Success 반환
  • 실행 중 예외가 발생하면 Failure 반환
 
해당 부분의 코드는 다음과 같다.
object SparkExampleApp extends AppUtil {
  // .. 생략 ..

  def main(): Unit =
    // .. 생략 ..

    // Run Spark Data Job
    val result: Try[Unit] = {    // 조건문 처리를 위한 익명함수 (Success 또는 Failure 반환)
      if(saveFlag())    // 저장 여부 인자가 True인 경우
        Try(new SparkExample(spark).convertParquet(srcPath, dstPath))
      else    // 저장 여부 인자가 False인 경우
        Try(new SparkExample(spark).countRecords(srcPath))
    }

    // .. 생략 ..
  }
}
 
 
후처리
데이터 처리가 모두 끝나면 수행하는 후처리 작업들이다.
먼저 실행 중인 SparkSession을 중단한다.
 
데이터 처리의 실행 결과가 성공 (Success)인 경우 종료 로그를 출력하고 종료한다.
실패 (Failure)이면 다음과 같이 수행한다.
  • 저장 여부 값이 true이고 생성 된 폴더가 있으면 삭제한다. 
  • 오류 로그를 출력한다.
  • 발생한 오류 (예외)를 시스템으로 전달하고 종료한다.
 
자세한 코드는 다음과 같다.
object SparkExampleApp extends AppUtil {
  // .. 생략 ..

  def main(): Unit = {
    // .. 생략 ..

    // Stop SparkSession
    spark.stop()

    // Post-processing
    result match {
      case Success(_) =>
        log.info("App Complete")
      case Failure(e) =>
        if(saveFlag() && FileSystemHelper.exists(dstPath))
          FileSystemHelper.deleteIfExists(dstPath)
        log.error(s"App exit(err=$e.getMessage)", e)
        throw e
    }
  }
}
 

테스트 작성과 빌드, 배포

테스트 작성과 빌드, 배포는 다음 문서를 참고한다.