SW/마이크로서비스

Memphis와 Apache Spark를 활용한 상태 기반 스트림 처리

얇은생각 2024. 8. 7. 07:30
반응형

오늘은 AWS S3에서 대규모 데이터를 처리하고 풍부하게 만드는 방법을 학습하기 위해 Apache Spark Memphis를 활용하는 방법에 대해 알아보겠습니다. 이 튜토리얼에서는 AWS S3 Apache Spark를 활용하여 대규모 데이터를 효율적으로 처리하는 방법과 이를 통해 얻을 수 있는 이점에 대해 자세히 설명하겠습니다.

 

Memphis와 Apache Spark를 활용한 상태 기반 스트림 처리

 

AWS S3 Apache Spark의 소개

Amazon S3

Amazon Simple Storage Service(S3) Amazon Web Services(AWS)에서 제공하는 매우 확장 가능하고, 내구성이 뛰어나며, 안전한 객체 저장 서비스입니다. S3는 기업이 웹 어디서나 데이터의 양에 관계없이 저장하고 검색할 수 있도록 지원합니다. S3는 다른 AWS 서비스 및 타사 도구와 원활하게 통합되어 Amazon S3에 저장된 데이터를 처리할 수 있습니다. 특히 Amazon EMR(Elastic MapReduce) Spark와 같은 오픈 소스 도구를 사용하여 대량의 데이터를 처리할 수 있게 해줍니다.

 

Apache Spark

Apache Spark는 대규모 데이터 처리에 사용되는 오픈 소스 분산 컴퓨팅 시스템입니다. Spark는 빠른 데이터 처리 속도를 지원하며, Amazon S3를 포함한 다양한 데이터 소스를 지원합니다. Spark는 대량의 데이터를 효율적으로 처리하고, 복잡한 계산을 짧은 시간 안에 수행할 수 있는 방법을 제공합니다.

 

Memphis.dev

Memphis.dev는 전통적인 메시지 브로커의 차세대 대안으로, 단순하고 강력하며 내구성이 뛰어난 클라우드 네이티브 메시지 브로커입니다. Memphis는 현대적인 큐 기반 사용 사례의 비용 효율적이고 신속하며 신뢰할 수 있는 개발을 가능하게 하는 전체 생태계를 제공합니다. 메시지 브로커의 일반적인 패턴은 정의된 보존 정책(시간/크기/메시지 수)에 따라 메시지를 삭제하는 것이지만, Memphis는 더 긴, 가능하면 무한한 보존을 위한 2차 저장 계층을 제공합니다. 각 메시지는 AWS S3로 자동으로 이동됩니다.

 

환경 설정

Memphis 설정

  1. Memphis 설치: 먼저 Memphis를 설치합니다.
  2. AWS S3 통합 활성화: Memphis 통합 센터를 통해 AWS S3 통합을 활성화합니다.
  3. 스테이션 생성: 스테이션(토픽)을 생성하고 보존 정책을 선택합니다.
  4. 메시지 오프로드: 설정된 보존 정책을 초과하는 각 메시지는 S3 버킷에 오프로드됩니다.

 

AWS S3 버킷 생성

AWS 계정을 생성한 후 AWS 관리 콘솔, AWS CLI 또는 SDK를 사용하여 데이터를 저장할 S3 버킷을 만듭니다. 이번 튜토리얼에서는 AWS 관리 콘솔을 사용하여 버킷을 생성합니다.

  1. "Create bucket"을 클릭하여 버킷을 생성합니다.
  2. 네이밍 규칙에 따라 버킷 이름을 지정하고, 버킷을 배치할 지역을 선택합니다.
  3. "Object ownership" "Block all public access" 설정을 사용 사례에 맞게 구성합니다.
  4. 버킷 권한을 구성하여 Spark 애플리케이션이 데이터를 액세스할 수 있도록 허용합니다.
  5. "Create bucket" 버튼을 눌러 버킷을 생성합니다.

 

EMR 클러스터 설정 및 Spark 설치

  1. EMR 콘솔을 열고 왼쪽 메뉴에서 "EMR on EC2" 아래의 "Clusters"를 선택합니다.
  2. "Create cluster"를 클릭하고 클러스터에 설명적인 이름을 지정합니다.
  3. "Application bundle"에서 Spark를 선택하여 클러스터에 설치합니다.
  4. "Cluster logs" 섹션에서 Amazon S3로 클러스터별 로그를 게시하는 옵션을 선택합니다.
  5. 보안 구성 및 권한 섹션에서 EC2 키 페어를 입력하거나 새로 생성합니다.
  6. "Create cluster" 버튼을 클릭하여 클러스터를 시작하고 상태를 확인합니다.

 

EMR 클러스터에서 Apache Spark 설치 및 구성

  1. EMR 클러스터를 성공적으로 생성한 후 Apache Spark를 구성합니다.
  2. SSH 보안 연결을 승인하여 클러스터에 연결할 수 있도록 설정합니다.
  3. EMR 클러스터의 마스터 노드에 연결하여 Spark 셸을 실행합니다.
  4. OS 터미널에 다음 명령을 입력하여 연결합니다:
ssh hadoop@ec2-###-##-##-###.compute-1.amazonaws.com -i ~/mykeypair.pem
  1. 마스터 공용 DNS 이름과 .pem 파일 경로를 올바르게 입력하여 SSH 연결을 설정합니다.

 

 

데이터 준비 및 S3 버킷에 업로드

데이터 처리를 시작하기 전에 데이터를 준비하여 Spark가 쉽게 처리할 수 있는 형식으로 변환해야 합니다. CSV, JSON, Parquet 등의 형식이 일반적으로 사용됩니다.

  1. 새로운 Spark 세션을 생성하고 데이터를 Spark로 로드합니다. 예를 들어, CSV 파일을 Spark DataFrame으로 읽기 위해 spark.read.csv() 메소드를 사용할 수 있습니다.
  2. 데이터를 준비한 후, DataFrame.write.format("s3") 메소드를 사용하여 S3 버킷에 데이터를 저장합니다.
  3. S3 버킷과 저장할 경로를 지정합니다.
df.write.format("s3").save("s3://my-bucket/path/to/data")
  1. 데이터가 S3 버킷에 저장되면 다른 Spark 애플리케이션이나 도구에서 액세스하거나, 추가 분석이나 처리를 위해 다운로드할 수 있습니다.

 

 

데이터 포맷과 스키마 이해하기

데이터 포맷과 스키마는 데이터 관리에서 중요한 개념입니다. 데이터 포맷은 데이터베이스 내 데이터의 조직과 구조를 나타내며, CSV, JSON, XML, YAML 등이 있습니다. 데이터 스키마는 데이터베이스 자체의 구조를 정의하며, , 테이블, 인덱스, 타입 등을 포함합니다.

 

S3에서 데이터 정리 및 전처리

데이터를 처리하기 전에 오류를 확인하고 정리하는 것이 중요합니다. S3 버킷의 데이터 폴더에 액세스하여 파일을 다운로드하고, 데이터를 정리하고 전처리합니다. Amazon Athena를 사용하여 S3에 저장된 구조화 및 비구조화 데이터를 분석할 수 있습니다.

  1. AWS 콘솔에서 Amazon Athena로 이동합니다.
  2. 새로운 테이블을 생성하고, 데이터 파일의 경로를 입력하여 스키마를 정의합니다.
  3. 쿼리를 실행하여 데이터가 올바르게 로드되었는지 확인하고, 중복 데이터를 제거하는 등의 정리 작업을 수행합니다.

 

Spark 프레임워크 이해하기

Spark 프레임워크는 오픈 소스이며, 대규모 데이터셋을 빠르게 처리하기 위한 클러스터 컴퓨팅 시스템입니다. Java 프로그래밍 언어를 기반으로 하며, 메모리 내 데이터 처리 기능을 통해 데이터 처리 속도를 높입니다.

 

Spark S3의 통합 구성

  1. Spark 애플리케이션에 Hadoop AWS 종속성을 추가합니다:
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "3.3.1"

 

  1. AWS 액세스 키 ID와 비밀 액세스 키를 Spark 애플리케이션에 설정합니다:
val conf = new SparkConf()
  .set("spark.hadoop.fs.s3a.access.key", "<ACCESS_KEY_ID>")
  .set("spark.hadoop.fs.s3a.secret.key", "<SECRET_ACCESS_KEY>")

 

  1. S3 엔드포인트 URL을 설정합니다:
spark.hadoop.fs.s3a.endpoint s3.<REGION>.amazonaws.com

 

  1. Spark 세션을 생성하여 S3와의 연결을 설정합니다:
val spark = SparkSession.builder()
  .appName("MyApp")
  .config("spark.hadoop.fs.s3a.access.key", "<ACCESS_KEY_ID>")
  .config("spark.hadoop.fs.s3a.secret.key", "<SECRET_ACCESS_KEY>")
  .config("spark.hadoop.fs.s3a.endpoint", "s3.<REGION>.amazonaws.com")
  .getOrCreate()

 

  1. S3에서 데이터를 읽어오기 위해 spark.read 메소드를 사용합니다:
val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("s3a://<BUCKET_NAME>/<FILE_PATH>")

 

 

Spark를 활용한 데이터 변환

Spark는 데이터를 정리하고, 필터링하며, 집계하고, 결합하는 데 필요한 풍부한 API 세트를 제공합니다. 다음은 Spark에서 데이터 변환 작업의 예시입니다:

  1. 데이터를 정렬합니다:
val sortedData = df.orderBy(col("age").desc)

 

  1. 변환된 데이터를 S3에 다시 저장합니다:
df.write
  .mode(SaveMode.Overwrite)
  .option("compression", "snappy")
  .parquet("s3a://<BUCKET_NAME>/<OUTPUT_DIRECTORY>")

 

 

데이터 파티셔닝 이해하기

데이터 파티셔닝은 데이터셋을 더 작은 부분으로 나누어 클러스터에서 병렬로 처리하는 것입니다. 이를 통해 성능을 최적화하고 확장성을 향상시킬 수 있습니다. RDD(Resilient Distributed Datasets) Spark에서 데이터가 기본적으로 파티셔닝되는 방식입니다.

 

 

결론

AWS S3를 사용하여 Apache Spark로 데이터를 처리하는 것은 대규모 데이터셋을 효율적으로 분석하는 방법입니다. AWS S3 Apache Spark의 클라우드 기반 저장소 및 컴퓨팅 자원을 활용하여 데이터를 빠르고 효과적으로 처리할 수 있습니다. 이번 튜토리얼에서는 S3 버킷과 Apache Spark 클러스터를 설정하고, Spark 애플리케이션을 작성하고 실행하여 데이터를 처리하는 과정을 다루었습니다.

Spark를 사용한 데이터 처리를 통해 복잡한 데이터 분석과 처리 작업을 효율적으로 수행할 수 있으며, 이를 통해 얻은 통찰력을 기반으로 더 나은 비즈니스 결정을 내릴 수 있습니다.

반응형