사용법

1. 문자열 합치기
SELECT CONCAT('문자열', '합치기');
-> 문자열합치기
2. 숫자 합치기
SELECT CONCAT(123, 456, 789);
-> 123456789
3. 숫자와 문자 합치기
SELECT CONCAT('문자열', 1004)
-> 문자열1004

대화형 셸

pyspark
#1 어디 폴더 아래서든 터미널에서 입력

./pyspark
#2 bin 폴더에서 입력
  1. pip install pyspark로 설치.
  2. apache spark 다운로드 페이지에서 다운.

대화형 셸 화면(로컬 모드)

 

스파크 애플리케이션

  • 애플리케이션
    API를 써서 스파크 위에서 돌아가는 사용자 프로그램. 드라이버 프로그램과 클러스터의 실행기로 이루어져 있습니다.
  • SparkSession
    스파크 코어 기능들과 상호 작용할 수 있는 진입점을 제공하며 그 API로 프로그래밍을 할 수 있게 해주는 객체. 스파크 셸에서 스파크 드라이버는 기본적으로 SparkSession을 제공하지만 스파크 애플리케이션에서는 사용자가 객체를 생성해서 써야 합니다.
  • 잡(job)
    스파크 액션(save(), collect() 등)에 대한 응답으로 생성되는 여러 태스크로 이루어진 병렬 연산.
  • 스테이지(stage)
    각 잡은 스테이지라 불리는 서로 의존성을 가지는 다수의 태스크 모음으로 나뉩니다.
  • 태스크(task)
    스파크 이그제큐터로 보내지는 작업 실행의 가장 기본적인 단위.

  • 애플리케이션 안의 드라이버가 스파크세션 객체 생성.
  • 스파크 셸을 사용할 땐 셸에 포함되어있는 형태이며 SparkSession 객체가 미리 만들어진다.
  • 스파크 셸로 상호 작용하는 작업 동안 드라이버는 스파크 애플리케이션을 하나 이상의 스파크 잡으로 변환한다. 그리고 각 잡은 DAG로 변환된다. 또한 DAG에서 각각의 노드는 하나 이상의 스파크 스테이지에 해당한다.
  • 어떤 작업이 연속적 또는 병렬적으로 수행되는지에 맞춰 스테이지에 해당하는 DAG 노드가 생성된다. 종종 스파크 이그제큐터끼리의 데이터 전송이 이루어지는 연산 범위 경계 위에서 스테이지가 결정되기도 한다.(셔플이라 불리는 노드끼리의 데이터 교환이 스테이지의 경계가 되는 경우)
  • 각 스테이지는 최소 실행 단위이며 스파크 이그제큐터들 위에서 연합 실행되는 스파크 태스크들로 이루어진다. 각 태스크는 개별 CPU 코어에 할당되고 데이터의 개별 파티션을 갖고 작업한다. 이렇게 병렬처리가 되는 것.


트랜스포메이션, 액션, 지연 평가

  • 트랜스포메이션
    • 이미 불변성의 특징을 가진 원본 데이터를 수정하지 않고 하나의 스파크 데이터 프레임을 새로운 데이터 프레임으로 변형.
    • 트랜스포메이션의 결과는 즉시 계산되는 것이 아니라 계보(lineage)라 불리는 형태로 기록된다. 기록된 리니지는 실행 계획에서 후반쯤에 스파크가 확실한 트랜스포메이션들끼리 재배열하거나 합치거나 해서 더 효육적으로 실행할 수 있도록 최적화.
    • 스파크는 리니지에 각 트랜스포메이션을 기록해 놓고 데이터 프레임들은 과정 중 변하지 않기 때문에 단순히 기록된 리니지를 재실행하는 것만으로도 복원할 수 있으며 덕분에 장애 상황에도 유연성을 확보할 수 있다.
    • 의존성의 정도에 따라 두가지로 분류된다.
      • 좁은 의존성 : 하나의 입력 파티션을 연산하여 하나의 결과 파티션을 내놓는 트랜스포메이션
        예) filter(), contains()
      • 넓은 의존성 : 하나의 입력 파티션을 연산하여 여러 결과 파티션을 내놓는 트랜스포메이션
        예) orderBy(), groupBy()
    • 예 : orderBy(), groupBy(), filter(), select(), join() 
  • 액션
    • 하나의 액션은 모든 기록된 트랜스포메이션의 지연 연산을 발동시킨다.
    • 쿼리 계획 안의 어떤 것도 액션이 호출되기 전에는 실행되지 않음.
    • 쿼리 실행 계획의 일부로서 기록된 모든 트랜스포메이션들의 실행을 시작하게 한다.
    • 예 : show(), take(), count(), collect(), save()
  • 지연평가
    • 액션이 실행되는 시점이나 데이터에 실제 접근하는 시점까지 실제 실행을 미루는 스파크의 전략.
    • 스파크가 사용자의 연계된 트랜스포메이션들을 살펴봄으로써 쿼리 최적화를 가능하게 하지만 리니지와 데이터의 불변성은 장애에 대한 데이터 내구성을 제공.

 

스파크 UI

스파크가 배포된 방식에 맞춰 드라이버가 웹 UI를 띄우게 되며 기본적으로 4040 포트를 사용한다.(http://localhost:4040)
스파크 UI에서는 다음과 같은 다양한 수치와 내용을 볼 수 있다.

  • 스케줄러의 스테이지와 태스크 목록
  • RDD 크기와 메모리 사용의 요약
  • 환경 정보
  • 실행 중인 이그제큐터 정보
  • 모든 스파크 SQL 쿼리

'Data Engineering > Spark' 카테고리의 다른 글

Spark Start(1)  (1) 2024.10.30
Spark Components  (0) 2024.10.25
About Spark  (2) 2024.10.11
Spark 용어 정리  (0) 2024.06.21

러닝 스파크에서 나오는 실습들은 스파크셸에서의 모든 처리가 하나의 머신에서 이루어지는 로컬 모드를 사용한다.
분산 처리의 이득을 보기 원하거나 실무 작업에서는 YARN이나 Kubernetes 배포 모드를 사용해야 한다.
로컬 모드는 프레임워크를 익히면서 반복적인 스파크 수행이 가능하여 빠른 피드백을 얻을 수 있다.

1. 설치

기본 설치 방법

! macOS를 사용중이기에 이 운영체제를 기준으로 설치 방법을 작성하겠습니다.

  1. Spark release : 최신 버전
  2. package type : pre-built for Apache Hadoop x.x for later 선택
  3. Download Spark : 링크 클릭!
  4. 자바 버전 8/11/17 설치 후 JAVA_HOME 환경변수 설정 필요. 해당 사이트에서 자신이 사용하는 Spark 버전과 호환되는 자바 설치 필요. https://spark.apache.org/docs/latest/

    4-1 어떤 터미널을 사용하냐에 따라 다르지만 ohmyzsh를 사용중이라면 'vi ~/.zshrc'를 입력, 'vi ~/.bash_profile'나 'vi ~/.bashrc' 등 다 다르니 환경에 맞게 입력.
    4-2 vi 편집기로 들어가면 여러 환경변수들이 있을 것입니다. 맨 아래 줄에 아래 코드를 추가해줍니다.(i를 누르면 입력가능. esc를 누른 후 wq하면 저장 후 나가기)
    4-3 source ~/.zshrc를 입력하여 변경된 환경변수를 적용시켜 줍니다.
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.jdk/Contents/Home
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export PATH

PyPI 설치 방법

터미널에서 'pip install pyspark' 입력
추가 라이브러리는 'pip install pyspark[sql,ml,mllib]' 이렇게 설치

https://spark.apache.org/downloads.html

 

Downloads | Apache Spark

Download Apache Spark™ Choose a Spark release: Choose a package type: Download Spark: Verify this release using the and project release KEYS by following these procedures. Note that Spark 3 is pre-built with Scala 2.12 in general and Spark 3.2+ provides

spark.apache.org

 

2. Spark을 설치한 후 폴더 살펴보기

spark-x.x.x-bin-hadoopx.x.tgz를 압축 해제하고 나면 아래처럼 파일들이 있을 것입니다.

  • README.md
    스파크에 관련된 상세한 설명들을 담고있습니다.
  • bin
    스파크 셸들(spark-sql, pyspark, spark-shell, sparkR)을 포함해서 스파크와 상호 작용할 수 있는 대부분의 스크립트를 갖고 있습니다. 셸과 실행 파일은 나중에 spark-submit을 사용해서 단독 스파크 애플리케이션을 제출하거나 쿠버네티스로 스파크를 실행할 때 도커 이미지를 만들고 푸시하는 스크립트 작성을 위해 사용됩니다.
  • sbin
    이 폴더의 대부분의 스크립트는 다양한 배포 모드에서 클러스터의 스파크 컴포넌트들을 시작하고 중지하기 위한 관리 목적입니다.
  • kubernetes
    쿠버네티스 클러스터에서 쓰는 스파크를 도커 이미지 제작을 위한 Dockerfile들을 담고 있습니다. 그리고 도커 이미지를 빌드하기 전 스파크 배포본을 어떻게 만들지에 대한 가이드를 제공하는 파일도 포함하고 있습니다.
  • examples
    Spark를 배울 때 참고할만한 예제 코드들이 담겨있습니다.

'Data Engineering > Spark' 카테고리의 다른 글

Spark Start(2)  (3) 2024.10.31
Spark Components  (0) 2024.10.25
About Spark  (2) 2024.10.11
Spark 용어 정리  (0) 2024.06.21

Airflow의 핵심 개념인 DAG (Directed Acyclic Graph)는 작업(Task)들을 모아 실행 순서와 관계를 설정하여, 작업이 어떤 방식으로 실행되어야 하는지 정의하는 구조입니다. DAG 내의 작업들은 의존 관계와 순서가 있으며, 각 작업이 선행 작업의 완료 후에 실행되도록 구성됩니다. DAG는 Airflow에서 전체 워크플로의 청사진 역할을 하며, 특정한 실행 흐름을 따르게 만들어 줍니다.

DAG 예시

 

DAG 정의

DAG를 정의하는 방법에는 세가지가 있다.

  1. with 명령어를 사용. 대그 안에 암묵적으로 어떤 것이든 코드를 추가할 수 있다.
  2.  표준 생성자를 사용. DAG를 사용하는 모든 연산자에 전달할 수 있음.
  3. @dag 데코레이터 사용. 함수를 DAG 생성자로 변환할 때 사용. 
import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    EmptyOperator(task_id="task")

# 1. with 명령어를 사용하는 방법.

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

my_dag = DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)

# 2. 표준 생성자를 사용하는 방법

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

# 3. @dag 데코레이터를 사용하는 방법

Task 의존성

Task나 Operator은 혼자 존재하지 않는다. Task간의 의존성을 선언하는 것이 DAG의 구조를 형성합니다.
task 의존성을 선언하는 방법은 여러가지 있습니다. 3, 4번 방법은 만약 task의 리스트를 다른 리스트에 의존하게끔 만들고 싶을 때 사용. 

  1. >>, << 연산자 사용.
  2. set_upstream, set_downstream 메소드 사용. >>, << 연산자보다 더 명시적이다.
  3.  cross_downstream 메소드 사용
  4. chain 메소드 사용.
# 1
first_task >> [second_task, third_task]
third_task << fourth_task

# 2
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

# 3
from airflow.models.baseoperator import cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])

# 4-1
from airflow.models.baseoperator import chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])

# 4-2 동일한 크기에 대해 쌍별 종속성 수행
from airflow.models.baseoperator import chain

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)

 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow Architecture  (0) 2024.10.25
About Airflow  (0) 2024.06.08

단일화된 스택으로의 아파치 컴포넌트

아래 그림처럼 네 개의 다양한 워크로드를 위한 라이브러리로 각각의 컴포넌트는 스파크 중심 장애 대응 엔진과는 별도로 존재하며, API를 사용해서 스파크 애플리케이션을 만들면 스파크 코어 엔진이 적절한 DAG로 변환해 실행하게 된다. 그러므로 Java, Scala, Python, R, SQL 중 어느 것으로 스파크 코드를 작성해 정형화 API를 사용하더라도 실제 코드는 경량화된 바이트코드로 변환되어 클러스터 전체에 나눠져 워커 노드의 JVM에서 실행된다.


Spark SQL

이 모듈은 구조화된 데이터와 잘 동작한다. RDBMS 테이블이나 구조화된 데이터의 파일 포멧(CSV, text, JSON, Avro, 파케이<Parquet> 등)에서 데이터를 읽어 들일 수 있으며 그 데이터로 스파크에서 영구적이거나 임시적인 테이블을 만들 수 있다.
또한 스파크 정형화 API를 사용하여 SQL 계통의 질의를 써서 데이터를 바로 데이터 프레임으로 읽어 들일 수 있다.

Spark MLlib

모델을 구축하기 위한 고수준 데이터 프레임 기반 API 기반으로 여러 인기 있는 머신러닝 알고리즘을 제공함.

Spark Streaming

Apache Kafka, 키네시스 등 다른 데이터 소스에서 들어오는 스트리밍 데이터, 정적 데이터를 실시간으로 연결하고 반응하는 모델. 정형화 스트리밍 모델(Spark Streaming)의 하부에는 스파크 SQL 엔진이 장애 복구와 지연 데이터의 모든 측면을 관리해줌.

GraphX

그래프를 조작하고 병렬 연산을 수행하기 위한 라이브러리

 

Spark의 분산 실행

하나의 스파크 애플리케이션은 스파크 클러스터의 병렬 작업들을 조율하는 하나의 드라이브 프로그램으로 이루어진다. 드라이버는 스파크 세션 객체를 통해 클러스터의 분산 컴포넌트들에 접근한다.

 

스파크 드라이버

스파크 세션 객체를 초기화하는 책임을 가진 스파크 애플리케이션의 일부로서, 클러스터 매니저와 통신하며 스파크 이그제큐터들을 위해 필요한 자원(CPU, 메모리 등)을 요청하고 모든 스파크 작업을 DAG 연산 형태로 변환하고 스케줄링하며 각 실행 단위를 태스크로 나누어 스파크 이그제큐터들에게 분배해준다. 자원이 할당된 후 드라이버와 스파크 이그제큐터는 직접 통신한다.

SparkSession

모든 스파크 연산과 데이터에 대한 통합 연결 채널. SparkContext, SQLContext, HiveContext, SparkConf, StreamingContext 등을 합쳐놓았을 뿐 아니라 스파크 작업을 편하게 해줌. 이 일원화된 연결 채널을 통해 JVM 실행 파라미터들을 만들고 데이터 프레임이나 데이터세트를 정의하고, 데이터 소스에서 데이터를 읽고 메타데이터에 접근하여 스파크 SQL 질의를 실행할 수 있음.

클러스터 매니저

스파크 애플리케이션이 실행되는 클러스터에서 자원을 관리 및 할당. 내장 단독(standalone) 클러스터 매니저, Apache Hadoop YARN, Apache Mesos, 쿠버네티스 총 네종류의 클러스터 매니저 지원.

스파크 이그제큐터

클러스터의 각 노워커 노드에서 동작한다. 이그제큐터는 드라이버 프로그램과 통신하며 워커에서 태스크를 실행하는 역할을 함.

배포모드

분산 데이터와 파티션

실제 물리 데이터는 HDFS나 클라우드 저장소에 존재하는 파티션이 되어 저장소 전체에 분산된다. 데이터가 파티션으로 되어 물리적으로 분산되면서 스파크는 각 파티션을 메모리의 데이터 프레임 객체로 바라본다. 각 스파크 이그제큐터는 가급적이면 데이터 지역성을 고려하여 네트워크에서 가장 가까운 파티션을 읽어 들이도록 태스크를 할당한다.(항상은 아님)
이러한 파티셔닝을 통해 효과적인 병렬 처리를 가능하게 해준다. 각 이그제큐터가 쓰는 CPU 코어는 작업해야 하는 데이터의 파티션에 할당됨.

'Data Engineering > Spark' 카테고리의 다른 글

Spark Start(2)  (3) 2024.10.31
Spark Start(1)  (1) 2024.10.30
About Spark  (2) 2024.10.11
Spark 용어 정리  (0) 2024.06.21

Apache Airflow는 워크플로우 자동화 도구입니다. 워크플로우는 의존성이 있는 작업(Task)들의 집합으로, 이들을 다이렉트 그래프 형태로 관리합니다. 각 작업은 다른 작업의 실행에 의존할 수 있으며, 작업들이 상호작용하는 방식에 따라 워크플로우가 정의됩니다.

Airflow의 주요 개념

  • DAG (Directed Acyclic Graph) : 워크플로의 구조를 나타내며, 방향성 있고 순환하지 않는 그래프를 의미합니다. 각 노드는 작업(Task)이고, 간선은 작업 간의 의존 관계를 나타냅니다. DAG는 워크플로가 어떻게 실행되어야 하는지를 정의하지만, 작업을 실행하는 코드는 아닙니다.
  • Operator : 특정 작업을 정의하는 코드입니다. Operator는 ‘이 작업이 해야 할 일’을 결정하며, 그 자체로 작업의 인스턴스는 아닙니다. 여러 종류의 Operator가 있으며, 예를 들어 BashOperator는 Bash 스크립트를 실행하는 데 사용됩니다.
  • Task : 작업의 인스턴스입니다. DAG 내에서 Operator를 사용해 정의된 작업이 Task로 실행됩니다. 각 Task는 Operator에 의해 정의된 작업을 실행하게 됩니다.
  • Task Instance : 특정 Task가 특정 시점에 실행된 것을 나타내는 객체입니다. Task Instance는 DAG 실행 중에 특정 Task가 언제, 어떻게 실행되었는지를 추적합니다.

Airflow Components

Required Components

  • Scheduler : Airflow에서 DAG와 Task를 관리하는 핵심 역할을 하는 컴포넌트로, Task의 의존성을 확인하고, 실행할 Task를 결정하여 실행시킴.
  • Executer : Task를 실행하는 컴포넌트입니다. Executor는 여러 종류가 있으며, LocalExecutor는 로컬 머신에서 Task를 실행하는 반면, CeleryExecutor는 분산 환경에서 Task를 실행할 수 있습니다. 별도의 구성 요소가 아니며 스케줄러 프로세스 내에서 실행됨.
  • Webserver : DAG 및 작업의 동작을 검사, 트리거, 디버그 할 수 있는 편리한 사용자 인터페이스를 제공.
  • A folder of dag files : 스케줄러가 어떤 작업을 실행할지, 언제 실행할지를 결정.
  • A metadata database : Airflow 구성 요소가 워크플로우와 작업의 상태를 저장하는 데에 사용됨. 이 컴포넌트의 설정은 데이터베이스 백엔드 설정에서 설명되며 Airflow가 작동하는 데에 필요함.

Optional Components

  • Optional worker : 스케줄러가 제공하는 작업을 실행합니다. 기본 설치에서는 작업자가 스케줄러의 일부일 수 있으며 별도의 구성 요소가 아닙니다. CeleryExecutor에서 장기 실행 프로세스로 실행되거나 KubernetesExecutor의 POD로 실행될 수 있습니다.
  • Optional trigger : asyncio 이벤트 루프에서 지연된 작업을 실행합니다. 기본 설치에서는 지연된 작업이 사용되지 않는 경우 트리거러가 필요하지 않습니다. 지연된 작업에 대한 자세한 내용은 지연 가능한 연산자 및 트리거에서 확인할 수 있습니다.
  • Optional dag processor : 선택적 DAG 프로세서로, DAG 파일을 파싱하고 메타데이터 데이터베이스에 직렬화합니다. 기본적으로 DAG 프로세서 프로세스는 스케줄러의 일부이지만, 확장성과 보안상의 이유로 별도의 구성 요소로 실행될 수 있습니다. DAG 프로세서가 존재하는 경우 스케줄러는 DAG 파일을 직접 읽을 필요가 없습니다. DAG 파일 처리에 대한 자세한 내용은 DAG 파일 처리에서 확인할 수 있습니다.
  • Optional folder of plugins : 플러그인의 선택적 폴더입니다. 플러그인은 Airflow의 기능을 확장하는 방법입니다(설치된 패키지와 유사합니다). 플러그인은 스케줄러, DAG 프로세서, 트리거 및 웹 서버에 의해 읽힙니다. 플러그인에 대한 자세한 내용은 플러그인에서 확인할 수 있습니다.

 

Airflow는 이러한 핵심 개념들을 통해 복잡한 워크플로를 정의하고 실행할 수 있게 해주며, 사용자는 Python을 이용해 DAG와 Task를 정의하게 됩니다.

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow DAGs  (1) 2024.10.29
About Airflow  (0) 2024.06.08

태어난 배경

RDBMS와 같은 전통적인 저장 시스템 및 어떠한 프로그래밍 수단으로도 Google이 색인을 만들고 검색할 규모의 인터넷 문서를 다루는 것은 불가능했다. 새로운 접근 방식이 필요하여 필연적으로 GFS(Google File System), 맵리듀스, 빅테이블 등이 생겨났다.

GFS : 클러스터 안에서 상용 서버에 장애 내구성이 있는 분산 파일시스템 제공
빅테이블 : GFS 기반으로 구조화된 대규모 데이터의 저장 수단 제공
맵리듀스 : 함수형 프로그래밍을 기반으로 GFS와 빅테이블 위에서 대규모 데이터 분산 처리가 가능한 병령 프로그래밍의 패러다임 제시

2006년 4월 아파치로 이관되면서 관련 모듈은 하둡 프레임워크의 일부가 되었다.
이관된 후 HDFS(Hadoop File System)에서 돌아가는 맵리듀스 프레임워크에 몇가지 단점이 있었다.

  • 번거로운 운영 복잡도로 인한 관리의 어려움
    -> Spark은 직관적인 API와 함께 다양한 언어(Python, Scala, Java, R)를 지원하며, 기존의 하둡보다 간결한 코드 작성이 가능합니다. Spark의 RDD(Resilient Distributed Dataset)는 데이터에 대한 추상화 레이어를 제공해 사용자가 복잡한 작업을 쉽게 병렬화하고 분산 처리할 수 있게 돕습니다. 특히 DataFrameDataset API는 데이터를 다루는 것을 더욱 직관적으로 만들어 관리의 복잡도를 크게 줄였습니다.
  • 일반적인 배치 처리를 위한 MR API는 많은 양의 기본 셋업 코드를 필요로 했고 장애 대응은 불안정했음.
    ->Spark의 API는 고수준의 추상화를 제공하여 복잡한 셋업 코드를 최소화합니다. Spark는 작업이 실패할 경우 자동으로 재실행하는 내장 장애 복구 메커니즘을 가지고 있으며, 이러한 안정성 덕분에 더 많은 작업을 안정적으로 처리할 수 있습니다. 특히 RDD는 데이터의 불변성(immutable)을 보장하며, 장애가 발생하면 자동으로 해당 데이터 셋의 손실된 파티션만 재연산하는 방식으로 복구를 진행합니다.
  • 방대한 배치 데이터 작업을 수행하면서 많은 MR 태스크가 필요했고 각 태스크는 이후 단계들을 위해 중간 과정의 데이터를 로컬 디스크에 써야 했음. 이는 디스크 I/O의 반복적 수행을 야기했고 거대한 MR을 처리하는 데에 많은 시간을 소비하게 됨.
    ->Spark은 메모리 중심의 처리를 지원합니다. 하둡의 맵리듀스는 각 태스크 사이에 중간 데이터를 디스크에 저장해야 하지만, Spark은 RDD를 통해 중간 데이터를 메모리에 유지할 수 있어 디스크 I/O를 최소화합니다. 이로 인해 Spark은 대용량 데이터를 처리할 때도 더 빠르게 수행될 수 있습니다. 또한, 필요할 경우에만 디스크에 데이터를 저장하기 때문에 더 적은 디스크 I/O로 고성능을 달성할 수 있습니다.
  • 하둡 MR은 머신러닝이나 스트리밍, 상호 반응하는 SQL 계통의 질의 등 다른 워크로드와 연계해서 쓰기에 한계가 있었음.
    -> Spark은 다양한 워크로드(머신러닝, 스트리밍, SQL 질의 등)를 하나의 프레임워크에서 통합적으로 처리할 수 있도록 설계되었습니다.
    Spark Streaming은 실시간 데이터 스트리밍 처리를 가능하게 하여, 실시간 데이터 흐름에 대해 반응하는 애플리케이션을 쉽게 구축할 수 있습니다.
    MLlib은 분산 머신러닝 라이브러리로, 하둡의 맵리듀스보다 머신러닝 작업을 훨씬 더 쉽게 실행할 수 있게 해줍니다.
    Spark SQL은 SQL 기반의 데이터 처리와 분석을 지원하며, 이는 맵리듀스보다 훨씬 더 상호작용적이고 빠르게 데이터를 다룰 수 있습니다.

이러한 단점들이 있었기에 하둡 맵리듀스 작업에 참여했던 UC 버클리의 연구원들이 Spark 프로젝트에서 단점들을 개선한 분산 처치 프레임워크를 개발함. 지금은 훨씬 빠르지만 초기에도 특정 작업에 대해서 10~20배 빨랐음.

 

맵리듀스

맵리듀스 애플리케이션은 데이터를 맵리듀스 시스템과 연계하여 데이터의 지역성과 랙의 근접성 등을 고려해 데이터가 존재하는 곳으로 연산 코드를 보냄. 클러스터의 워커 노드들은 중간 연산을 통해 집계하고, 결과를 합쳐 리듀스 함수에서 최종 결과를 생산해서 이를 애플리케이션이 접근 가능한 분산 저장소에 기록한다. 이러한 접근 방식은 네트워크 트래픽을 크게 감소시키면서 네트워크로 데이터를 분산시키는 것을 지양하고, 로컬 디스크에 대한 IO를 극대화한다.

맵리듀스 과정

 

Spark 특성

  • 속도
    - 하드웨어의 발전으로 가격 및 CPU, 메모리의 성능이 향상되어 Spark 내부 구현이 용이해짐.
    - 질의 연산을 방향성 비순환 그래프(DAG)로 구성. 스케줄러와 질의 최적화 모듈은 효율적인 연산 그래프를 만들어서 각각의 태스크로 분해하여 클러스터의 워커 노드 위에서 병렬 수행될 수 있도록 해줌.
    - 물리적 실행 엔진인 텅스텐은 전체적 코드 생성이라는 기법을 사용하여 실행을 위한 간결한 코드를 생성해냄.
    - 모든 중간 결과는 메모리에 유지되며, 디스크 I/O를 제한적으로 사용하므로 성능이 크게 향상됨
  • 사용 편리성
    데이터 프레임이나 데이터세트 같은 고수준 데이터 추상화 계층 아래에 유연한 분산 데이터세트(RDD)라 불리는 핵심적이면서도 단순한 논리 자료구조를 구축하여 단순성을 실현. 연산의 한 종류로서 트랜스포메이션, 액션의 집합과 단순한 프로그래밍 모델을 제공함으로써 사용자들이 각자 편한 언어로 빅데이터 애플리케이션 구현 가능.
  • 모듈성
    스파크 연산은 다양한 타입의 워크로드에 적용 가능하며, 지원하는 프로그래밍 언어로 표현할 수 있다.(스칼라, 자바, 파이썬, SQL, R)
    문서화가 잘된 API들로 이루어진 통합 라이브러리를 제공하며 컴포넌트 들은 다 같이 하나의 엔진 안에서 연동된 상태로 사용할 수 있다. 하나의 스파크 애플리케이션을 작성함으로써 전혀 다른 작업을 위해 별도의 엔진을 돌릴 필요도, 별도의 API를 배울 필요도 없게 됨.
  • 확장성
    저장보다는 빠른 병렬 연산 엔진에 초점이 맞추어져 있다. 저장과 연산을 분리하면서 수많은 데이터 소스에서 데이터를 읽어 들여 메모리에서 처리 가능하다는 의미.

'Data Engineering > Spark' 카테고리의 다른 글

Spark Start(2)  (3) 2024.10.31
Spark Start(1)  (1) 2024.10.30
Spark Components  (0) 2024.10.25
Spark 용어 정리  (0) 2024.06.21

aws ec2 위에서 작업하는 경우나 좀 더 디테일한 도커 작업을 위해서는 CLI 명령어를 알 필요가 있습니다.
ec2에 도커를 올리면서 필요했던 명령어들을 정리해보겠습니다.

시스템 관련

# 도커 자원들이 얼마나 메모리 차지하고 있는지
$ docker system df

# 도커 자원들 상세히
$ docker system df -v

compose 명령어

# 컨테이너 실행
# -f를 통해 파일 경로를 설정.
# -d : 백그라운드 실행
$ docker compose -f ./docker-compose.prod.yml up

# 모든 이미지를 새로 빌드해서 컨테이너 생성
$ docker compose up --build

# 컨테이너 중단 및 제거
$ docker compose down

# 컨테이너 중단
$ docker compose stop

# 컨테이너 실행
$ docker compose ㄴㅅㅁㄱㅅ

이미지 관련

# 현재 이미지 확인
$ docker images

# 이미지 삭제
# 컨테이너 삭제 전 강제로 삭제하고 싶다면 -f 옵션 추가
$ docker rmi [이미지id]

컨테이너 관련

# 동작중인 컨테이너 확인
$ docker ps

# 정지된 컨테이너 확인
$ docker ps -a

# 컨테이너 삭제
$ docker rm [컨테이너id1], [컨테이너id2]

# 컨테이너 모두 삭제
$ docker rm `docker ps -a -q`

# 실행중인 컨테이너 자원 할당 정보 확인
$ docker stats

# 메모리 제한 변경
$ docker update --memory 1024m [컨테이너id]

# 메모리 제한보다 꼭 커야함
docker update --memory-swap 2048m [컨테이너id]

# 확인
docker inspect [컨테이너id] | grep -i memory

'Data Engineering > Docker' 카테고리의 다른 글

Docker 명령어 정리  (0) 2024.05.30

boto3

boto3는 Python용 AWS SDK로, S3, EC2와 같은 AWS 서비스에 접근하는 프로그램을 개발할 수 있도록 해준다.
AWS에서 생성된 Credantial을 환경변수 혹은 boto3 config에 입력해 주면 해당 AWS 계정의 권한을 획득할 수 있다.
※ SDK(software development kit)란 개발자를 위한 플랫폼별 구축 도구 세트이다. 특정 플랫폼, 운영 체제 또는 프로그래밍 언어에서 실행되는 코드를 만들려면 디버거, 컴파일러 및 라이브러리와 같은 구성요소가 필요하고 이를 제공해주는 것이 SDK이다.

 

  • 클라이언트 연결
import boto3

# s3 클라이언트 연결
s3_client = boto3.client(
    service_name='s3' # s3에 연결
    region_name="사용하는 AWS 서비스 지역" 
    aws_access_key_id="액세스 키"
    aws_secret_access_key="시크릿 키"
)
  • S3 업로드
# file_name, Body : 업로드할 파일명
# Bucket : S3에 적재될 bucket 이름
# key : 키의 이름. 저장될 데이터 이름

s3_client.upload_file(file_name, bucket_name, key) # AWS S3로 파일 업로드
s3_client.put_object(Body, Bucket, Key) # AWS S3로 파일 업로드
s3_client.get_object(Bucket=bucket_name, Key=file_name) # AWS S3에서 로컬로 파일 읽기

 

이 밖에도 boto3를 이용해 사용할 수 있는 메소드들이 많고 ec2도 제어할 수 있는데 필요할 때 추가해보겠습니다.

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html

 

S3 - Boto3 1.34.145 documentation

Previous ListRumMetricsDestinations

boto3.amazonaws.com

 

 

셔플링 : 파티션간에 데이터 이동이 필요한 경우 발생
셔플링이 발생하는 경우
- 명시적 파티션을 새롭게 하는 경우
- 시스템에 의해 이뤄지는 셔플링

hasing partition

range partition

 

데이터 구조

- RDD
로우레벨 데이터로 클러스터내의 서버에 분산된 데이터를 지칭
레코드별로 존재하지만 스키마가 존재하지 않음
    - 구조화된 데이터나 비구조화된 데이터 모두 지원
변경이 불가능한 분산 저장된 데이터
    - 다수의 파티션으로 구성
    - 로우레벨의 함수형 변환 지원(map, filter, flatMap 등등)
일반 파이썬 데이터는 parallelize 함수로 RDD로 변환
    - 반대는 collect로 파이썬 데이터로 변환 가능

- DataFrame

변경이 불가한 분산 저장된 데이터(RDD 위에서 돌기 때문)
RDD와는 다르게 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
- Pandas의 데이터 프레임 혹은 RDB의 테이블과 거의 흡사
- 다양한 데이터 소스 지원: HDFS, Hive, 외부 DB, RDD 등
스칼라, 자바, 파이썬과 같은 언어에서 지원

- Dataset
RDD위에 만들어지는 RDD와는 달리 필드 정보를 갖고 있음(테이블)
Dataset은 타입 정보가 존재하며 컴파일 언어에서 사용 가능
    컴파일 언어 : Scala/Java에서 사용 가능
PySpark에서는 DataFrame을 사용

 

Spark Session 생성

Spark 프로그램의 시작은 Spark Session을 생성하는 것

'Data Engineering > Spark' 카테고리의 다른 글

Spark Start(2)  (3) 2024.10.31
Spark Start(1)  (1) 2024.10.30
Spark Components  (0) 2024.10.25
About Spark  (2) 2024.10.11

+ Recent posts