::: 데이터 분석 :::

Spark Application 패키지 작성하기 (2) - 헬퍼 기능 작성

곰탱이푸우 2023. 9. 7. 08:20
기본적인 Scala 개발환경이 구축되고 나면 목표로 했던 Spark Application을 작성한다.
 
앞서 테스트에 사용한 Spark Application은 main 함수에 기능을 구현한 단순한 예제 프로그램이다.
기존에 Hadoop 기반의 Spark 클러스터를 구축했으므로, Spark Application은 해당 환경에서 실행할 수 있는 형태로 작성해야 한다.
 
따라서 Spark Application 작성을 위한 기본 형태 (Skeleton) 제공을 위한 예제 프로그램을 재작성한다.
참고로 예전에 실무에서 활용했던 Spark Application 코드에서 재사용 또는 공개가 가능한 코드만 활용하여 정리하였다.
 
전체적인 내용은 아래 문서를 참고하고, 변경 사항을 중심으로 정리한다.
전체 내용은 아래와 같은 순서로 작성한다.
  • 전체 구조와 기본 설정
  • 헬퍼 (Helper) 기능 작성
  • 데이터 처리 로직 작성
  • 비즈니스 로직 작성
  • 테스트 작성과 빌드, 배포
 
전체 구조와 기본 설정은 아래 문서를 참고한다.
 

기능 구현

실제 기능을 수행하는 코드를 작성한다.
 

AppUtil.scala

AppUtil.scala의 AppUtil이라는 trait 객체 내부에 Application의 main 함수를 작성한다.
해당 trait는 SparkExampleApp.scala에서 상속 받아 사용한다.
 

라이브러리 임포트

AppUtil.scala에서 사용하는 라이브러리들을 선언한다.
package com.bearpooh.bdp.example.sparkexample.util

import com.twitter.app.{Flag, Flags}
import org.apache.log4j.{ConsoleAppender, LogManager, Logger, PatternLayout}

import java.io.OutputStreamWriter
import java.lang.reflect.{InvocationTargetException, Method}
 
IntelliJ와 같은 IDE (통합개발환경)을 사용하면 해당 항목을 직접 작성하지 않아도 되므로 편리하다.
실제 코드에서 코드를 작성하면서 임포트 되지 않은 라이브러리는 빨간색으로 표시된다.
 
해당 부분에서 Alt + Enter를 누르면 임포트할 라이브러리를 선택할 수 있다.
만약 사용하고자 하는 라이브러리가 목록에 없으면, build.sbt의 libraryDependencies 항목에 패키지를 추가한다.
 
 

AppUtil trait 구현

Spark Application에서 사용할 trait는 다음 기능을 수행한다.
  • Logger 설정 (spark-core에 포함 된 log4j)
  • Application의 실행 인자 처리 (util-app의 Flags)
  • main 함수 정의

 

trait 정의
trait는 Scala에서 상속을 통해 코드를 재사용할 수 있는 기본 코드 단위를 의미한다.
  • 클래스에서 trait를 상속하면 trait의 메소드와 필드를 사용할 수 있다. (extends)
  • trait의 경우 상속보다는 믹스인(Mix-in)이라는 표현을 사용한다.
  • 일반적인 개념의 상속과 다르게 여러 개의 trait를 조합할 수 있다. (extends + with)
  • trait는 인스턴스화 될 수 없다. 따라서 클래스나 오브젝트가 확장할때 사용한다.
 
기능이 많아질 경우 별도의 Jar 패키지로 분리할 수 있다.
  • build.sbt의 libraryDependency에서 해당 Jar 패키지 정보를 추가한다.
  • 사용하고자 하는 trait를 임포트하고 클래스에서 상속 (extends) 한다.
 
아래와 같이 정의한다.
trait AppUtil {    // trait 정의
  // 실행 인자 처리를 위한 Flags 설정
  // .. 생략 ..

  // Log4j Logger 설정
  // .. 생략 ..

  // Main 함수 정의
  // .. 생략 ..
}
 
Flags 설정
Flags는 main 함수로 전달 된 실행 인자를 Scala 코드의 Flag 형태로 변환하는 기능을 제공한다.
twitter에서 제작한 util-app 라이브러리에 포함되어 있다.
 
인스턴스를 생성할 때에는 3개의 인자를 전달 받는다.
  • name - Application의 이름 (String)
  • includeGlobal - 전역적 Flag 포함 (Boolean)
  • failFastUntilParsed - Flag 변환에 실패하면 실행 중단 (Boolean)
 
includeGlobal과 failFastUntilParsed 항목은 다음과 같이 설정한다.
  • includeGlobal - 특정 Application의 실행 인자만 처리할 것이므로 false
  • failFastUntilParsed - Flag 변환에 실패하면 프로그램을 즉시 종료할 것이므로 true
 
아래는 trait 내부에서 실행 중인 클래스 이름을 얻고, Flag 인스턴스를 생성하는 코드이다.
trait AppUtil {    // trait 정의
  val name: String = getClass.getSimpleName    // 실행하는 클래스 이름
  val flag: Flags = new Flags(name, false, true)    // 실행 인자 처리를 위한 Flags 설정

  // .. 생략 ..
}
 

 

Logger 설정
분산 환경에서 실행되는 Spark Application은 println 같은 출력문 대신 logger를 사용하는 것이 권장된다.
각 노드의 로그가 중앙으로 수집되기 때문인데 장점은 다음과 같다.
  • 로그 레벨 (디버그, 정보, 경고, 오류)에 따라 필요한 로그만 출력 가능
  • 편리한 로그 모니터링과 관리
  • 로그 발생 순서 파악 용이 (출력문은 각 노드에서 표준 출력)
  • 분산 환경의 성능 저하 예방과 최소화 (출력문의 노드간 통신이 발생하는 경우)
  • 로그의 파일 저장 (오류 분석에 활용)
 
아래 코드는 trait 내부에 Logger를 정의한 내용이다.
자세한 내용은 주석을 참고한다.
trait AppUtil {    // trait 정의
  val name: String = getClass.getSimpleName    // 실행하는 클래스 이름
  val flag: Flags = new Flags(name, false, true)    // 실행 인자 처리를 위한 Flags 설정

  val log: Logger = {    // Log4j Logger 설정 (초기화)
    val appLogger: Logger = LogManager.getLogger(name)    // Log4j Logger 정의

    // log4j.properties 파일이 없는 경우 로그 출력 대상을 Console로 지정
    if (!appLogger.getAllAppenders.hasMoreElements) {
      // 로그를 Console로 전송하는 인스턴스 생성
      val appender: ConsoleAppender = new ConsoleAppender()    
      // 표준 출력 스트림으로 출력
      appender.setWriter(new OutputStreamWriter(System.out))    
      // 로그 포맷 설정 (연/월/일 시:분:초 로그레벨 로거이름 로그메시지)
      appender.setLayout(new PatternLayout("%-d{YY/MM/dd HH:mm:ss} %-5p%c: %m%n"))   
      appLogger.addAppender(appender)    // 생성한 Console Appender를 Logger에 추가
      appLogger.warn("There is no log4j.properties.")
    }

    appLogger    // 생성 된 Log4j Logger 반환 (log 변수에 할당)
  }

  // Main 함수 정의
  // .. 생략 ..
}
 
 

main 함수

SparkExampleApp이 실행되면 상속 받은 AppUtil.scala의 Trait가 동작하며 main 함수를 실행한다.
해당 main 함수는 아래 3가지 주요 기능을 수행한다.
  • 로그레벨 설정
  • 실행 인자 파싱 (Flags)
  • SparkExampleApp의 main 함수 실행 (invoke)
 
trait의 main 함수 정의
AppUtil trait의 main 함수를 final def로 정의한다.
해당 main 함수는 Spark Application의 진입점 (entry point) 역할을 수행한다.
 
Scala에서 final은 최종을 의미하는데, 더 이상 변경이 불가능하다.
  • 클래스 정의에 적용 - 하위 클래스가 상속 (inherit) 할 수 없다
  • 클래스 멤버에 적용 - 하위 클래스에서 재정의 (override) 할 수 없다
 
정리하면 AppUtil trait를 상속 받은 SparkExampleApp 클래스에서 main 함수를 재정의(override) 할 수 없음을 의미한다.
  • Scala 호출 규칙에 따라 SparkExampleApp 클래스가 실행되면 AppUtil에서 final로 선언된 main 함수가 호출된다.
  • 이 경우 SparkExampleApp의 main 함수는 무시된다.
  • 따라서 trait의 main 함수에서 SparkExampleApp의 main 함수를 실행 (invoke) 해야 한다.
 
아래 코드는 trait 내부에 main 함수를 정의한 내용이다.
자세한 내용은 주석을 참고한다.
trait AppUtil {
  // .. 생략 ..
  
  // args는 실행할 때 전달되는 실행 인자
  final def main(args: Array[String]): Unit = {    
    log.setLevel(org.apache.log4j.Level.INFO)    // 로그레벨 설정 (INFO)
    flag.parseArgs(args)    // 전달 된 실행인자 파싱 (Flags)

    // .. 생략 ..
  }
}
 

 

전달 인자의 로그 출력 (Flags)
이 부분은 필수적인 부분은 아니며 선택적으로 적용 가능한 부분이다.

 

Spark Application은 실행할 때  다양한 실행 인자가 전달되어야 한다.
  • Spark 클러스터에서 사용할 리소스 정보 (Driver와 Executer의 코어, 메모리, 노드 수)
  • 사용할 데이터 경로와 처리 후 저장할 경로 (하둡 클러스터를 사용할 경우 HDFS 경로)
  • 프로그램과 데이터 사용에 필요한 추가 정보 (ex. 데이터 날짜, 덮어쓰기 허용 여부 등)
 
이러한 실행 인자 정보를 로그에 포함하면 다음과 같은 이점이 있다.
  • 실행 인자가 정상적으로 전달 되었는지 확인 가능
  • 전달 된 실행 인자의 설정 값 확인 가능
  • 이후 오류 분석할 때 활용 가능
 
아래는 Flags 인스턴스에 의해 파싱 된 실행 인자 목록을 로그로 출력하는 코드이다.
자세한 내용은 주석을 참고한다.
trait AppUtil {
  // .. 생략 ..

  final def main(args: Array[String]): Unit = {
    // .. 생략 ..

    // print flags
    // 파싱 된 실행인자 목록을 얻고 리스트로 변환
    val flags: Seq[Flag[_]] = flag.getAll().toList    
    if (flags.nonEmpty) {    // 전달 된 실행인자가 있는 경우
      // 리스트의 각 요소들을 하나씩 이터레이트
      flags.foreach { f =>    // log.info 함수 반환 값이 없으므로 foreach 사용
        // 각 실행 인자들의 이름과 값 (또는 기본값) 출력
        log.info(f"flag(${f.name}):" + f.getWithDefault)    
      }
    } else log.info("There is no flags.")    // 전달 된 실행인자가 없으면 로그 출력

    // .. 생략 ..
  }
}
 
Application의 main 함수 실행
앞서 설명한 바와 같이 SparkApplicationApp의 main 함수 대신 AppUtil trait에서 final로 선언된 main 함수가 호출된다.
따라서 trait의 main 함수에서 SparkExampleApp의 main 함수를 실행 (invoke) 해야 한다.
 
아래는 AppUtil trait를 상속한 클래스 내부의 main 메소드를 찾아서 실행 (invoke)하는 코드이다.
자세한 내용은 주석을 참고한다.
trait AppUtil {
  // .. 생략 ..

  final def main(args: Array[String]): Unit = {
    // .. 생략 ..

    // invoke main method if it is exists
    // main 메소드가 없으면 None이 할당되므로 Option[Method] 타입
    val mainMethod: Option[Method] = {    
      // Option[타입]에 None이 아닌 값을 할당하는 경우 Some(값) 형태 전달
      // AppUtil trait를 상속한 클래스 내부의 main 메소드 정보 할당
      try Some(getClass.getMethod("main"))    
      catch {
        // main 메소드가 없으면 NoSuchElementException이 발생하고 None 할당
        case _: NoSuchElementException => None    
      }
    }

    // mainMethod가 Option[Method] 타입이므로 내부의 값을 사용하기 위해 foreach 사용
    mainMethod.foreach { method =>    // Option[Method] 내부의 Method 타입의 값
      try method.invoke(this)    //
      catch {
        case e: InvocationTargetException => throw e.getCause
      }
    }
  }
}
 
 

FileSystemHelper.scala

예제 프로그램은 하둡 클러스터에 구성한 Spark에서 정상적으로 실행하는지 확인하기 위해 작성되었다.
하둡의 HDFS에 저장된 데이터 파일들을 사용하며, 하둡의 경로 처리를 위한 헬퍼 기능을 정의한다.
 

라이브러리 임포트

굳이 손으로 타이핑하지 않아도 된다.
아래 설명할 코드들을 작성할 때 IntelliJ에서 Alt + Enter로 손쉽게 추가할 수 있다.
 
FileSystemHelper.scala에서 사용하는 라이브러리들을 선언한다.
 
HDFS 환경에서는 Java에서 기본 제공되는 경로 라이브러리 대신 하둡 라이브러리를 사용한다.
  • org.apache.hadoop.fs.FileSystem
  • org.apache.hadoop.fs.Path
 
코드는 다음과 같다.
package com.bearpooh.bdp.example.sparkexample.util

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import java.net.URI
 

싱글톤 객체 선언

싱글톤 (Singleton)은 하나 이상의 인스턴스를 가질 수 없는 형태의 클래스를 의미한다.
  • Scala에서는 object로 선언
  • new 키워드 대신 object 이름으로 직접 접근 (정적 클래스와 유사)
  • 실행 중인 JVM에서 최초 접근할 때 자동으로 인스턴스 생성
 
이러한 특징으로 인해 다음과 같은 상황에 주로 사용한다.
  • Application의 main 함수 구현
  • 특정 기능을 보조하는 유틸리티 (Helper) 객체 생성
 
아래 코드와 같이 class나 trait 대신 object로 객체를 선언하면 된다.
object FileSystemHelper {
  // .. 생략 ..
}

 

 

기능 메소드 구현

FileSystemHelper 싱글톤 객체에서 사용할 기능 메소드를 구현한다.
예제 프로그램에서는 exists와 deleteIfExists 함수 두가지를 정의했다.
  • exists - 폴더 경로의 실제 존재 여부 확인
  • deleteIfExists - 폴더 경로가 실제로 존재하면 삭제
 
추가적으로 필요한 기능들을 메소드로 추가할 수 있다.
예를 들면 다음과 같다.
  • renameIfExists - 폴더 경로가 실제로 존재하면 폴더명 변경
  • writeFile - 지정한 경로에 파일 생성
  • getLastDirectoryPath - 특정 경로의 하위 폴더에서 가장 최근에 생성된 폴더명 반환
 
또한 Spark을 통해 Hadoop에 접근해야 하므로 SparkContext 라이브러리를 사용한다.
SparkContext 는 Spark 작업을 수행하는 데 필요한 설정과 자원 관리를 제공한다.
  • 해당 컨텍스트를 통해 Spark이 실행 중인 하둡 클러스터의 환경 정보를 얻어온다.
  • 실제로 메소드를 호출할때 명시적으로 전달하지 않는다.
  • implicit val로 정의 된 SparkContext 변수가 컴파일러에 의해 자동으로 전달된다.
 
deleteIfExists 함수의 실제 코드는 다음과 같고, 자세한 내용은 주석을 참고한다.
object FileSystemHelper {
  // .. 생략 ..

  // 하둡 경로와 sparkContext가 커링 함수 형태로 전달된다.
  // sparkContext는 암시적 파라미터로 전달되므로, 호출할때는 파일 경로만 명시적으로 전달한다.
  // 해당 메소드를 호출하는 곳에 sparkContext가 암시적으로 선언되어 있어야 한다.
  def deleteIfExists(path: String)(implicit sparkContext: SparkContext): Boolean = {
    val hadoopConf = sparkContext.hadoopConfiguration    // sparkContext에 저장된 하둡 클러스터 환경 정보
    val uri = new URI(path)    // 전달 받은 경로는 하둡 URI 형태로 변환
    val fs = FileSystem.get(uri, hadoopConf)    // 하둡 URI와 설정 기반의 HDFS 인스턴스

    if (fs.exists(new Path(path)))  // 전달한 경로(path)가 존재하면
      fs.delete(new Path(path), true)    // recursive가 true이므로 해당 경로와 하위 경로 삭제하고 true 반환
    else false    // 없으면 false 반환     
  }
}
 
 
특정 기능을 보조하는 유틸리티 (Helper)의 경우, 일반적으로 Docstring을 작성하지 않는다.
다만, 유틸리티 기능이 복잡하여 설명이 필요한 경우에는 작성해도 무방하다.
 
참고로 Docstring은 클래스나 함수의 기능을 설명하기 위해 작성한 주석을 의미한다.
  • 코드 사용법, 동작 방식, 입력 및 출력 값, 예제 등을 설명한다.
  • 드의 가독성과 유지 보수 향상이 목적이다.
 

데이터 처리 로직 작성

데이터 처리 로직 작성은 다음 문서를 참고한다.