사용법
1. 문자열 합치기
SELECT CONCAT('문자열', '합치기');
-> 문자열합치기
2. 숫자 합치기
SELECT CONCAT(123, 456, 789);
-> 123456789
3. 숫자와 문자 합치기
SELECT CONCAT('문자열', 1004)
-> 문자열1004
1. 문자열 합치기
SELECT CONCAT('문자열', '합치기');
-> 문자열합치기
2. 숫자 합치기
SELECT CONCAT(123, 456, 789);
-> 123456789
3. 숫자와 문자 합치기
SELECT CONCAT('문자열', 1004)
-> 문자열1004
pyspark
#1 어디 폴더 아래서든 터미널에서 입력
./pyspark
#2 bin 폴더에서 입력
스파크가 배포된 방식에 맞춰 드라이버가 웹 UI를 띄우게 되며 기본적으로 4040 포트를 사용한다.(http://localhost:4040)
스파크 UI에서는 다음과 같은 다양한 수치와 내용을 볼 수 있다.
Spark Start(1) (1) | 2024.10.30 |
---|---|
Spark Components (0) | 2024.10.25 |
About Spark (2) | 2024.10.11 |
Spark 용어 정리 (0) | 2024.06.21 |
러닝 스파크에서 나오는 실습들은 스파크셸에서의 모든 처리가 하나의 머신에서 이루어지는 로컬 모드를 사용한다.
분산 처리의 이득을 보기 원하거나 실무 작업에서는 YARN이나 Kubernetes 배포 모드를 사용해야 한다.
로컬 모드는 프레임워크를 익히면서 반복적인 스파크 수행이 가능하여 빠른 피드백을 얻을 수 있다.
기본 설치 방법
! macOS를 사용중이기에 이 운영체제를 기준으로 설치 방법을 작성하겠습니다.
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.jdk/Contents/Home
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export PATH
PyPI 설치 방법
터미널에서 'pip install pyspark' 입력
추가 라이브러리는 'pip install pyspark[sql,ml,mllib]' 이렇게 설치
https://spark.apache.org/downloads.html
Downloads | Apache Spark
Download Apache Spark™ Choose a Spark release: Choose a package type: Download Spark: Verify this release using the and project release KEYS by following these procedures. Note that Spark 3 is pre-built with Scala 2.12 in general and Spark 3.2+ provides
spark.apache.org
spark-x.x.x-bin-hadoopx.x.tgz를 압축 해제하고 나면 아래처럼 파일들이 있을 것입니다.
Spark Start(2) (3) | 2024.10.31 |
---|---|
Spark Components (0) | 2024.10.25 |
About Spark (2) | 2024.10.11 |
Spark 용어 정리 (0) | 2024.06.21 |
Airflow의 핵심 개념인 DAG (Directed Acyclic Graph)는 작업(Task)들을 모아 실행 순서와 관계를 설정하여, 작업이 어떤 방식으로 실행되어야 하는지 정의하는 구조입니다. DAG 내의 작업들은 의존 관계와 순서가 있으며, 각 작업이 선행 작업의 완료 후에 실행되도록 구성됩니다. DAG는 Airflow에서 전체 워크플로의 청사진 역할을 하며, 특정한 실행 흐름을 따르게 만들어 줍니다.
DAG를 정의하는 방법에는 세가지가 있다.
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
EmptyOperator(task_id="task")
# 1. with 명령어를 사용하는 방법.
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
# 2. 표준 생성자를 사용하는 방법
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
# 3. @dag 데코레이터를 사용하는 방법
Task나 Operator은 혼자 존재하지 않는다. Task간의 의존성을 선언하는 것이 DAG의 구조를 형성합니다.
task 의존성을 선언하는 방법은 여러가지 있습니다. 3, 4번 방법은 만약 task의 리스트를 다른 리스트에 의존하게끔 만들고 싶을 때 사용.
# 1
first_task >> [second_task, third_task]
third_task << fourth_task
# 2
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)
# 3
from airflow.models.baseoperator import cross_downstream
# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])
# 4-1
from airflow.models.baseoperator import chain
# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)
# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])
# 4-2 동일한 크기에 대해 쌍별 종속성 수행
from airflow.models.baseoperator import chain
# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
Airflow Architecture (0) | 2024.10.25 |
---|---|
About Airflow (0) | 2024.06.08 |
아래 그림처럼 네 개의 다양한 워크로드를 위한 라이브러리로 각각의 컴포넌트는 스파크 중심 장애 대응 엔진과는 별도로 존재하며, API를 사용해서 스파크 애플리케이션을 만들면 스파크 코어 엔진이 적절한 DAG로 변환해 실행하게 된다. 그러므로 Java, Scala, Python, R, SQL 중 어느 것으로 스파크 코드를 작성해 정형화 API를 사용하더라도 실제 코드는 경량화된 바이트코드로 변환되어 클러스터 전체에 나눠져 워커 노드의 JVM에서 실행된다.
이 모듈은 구조화된 데이터와 잘 동작한다. RDBMS 테이블이나 구조화된 데이터의 파일 포멧(CSV, text, JSON, Avro, 파케이<Parquet> 등)에서 데이터를 읽어 들일 수 있으며 그 데이터로 스파크에서 영구적이거나 임시적인 테이블을 만들 수 있다.
또한 스파크 정형화 API를 사용하여 SQL 계통의 질의를 써서 데이터를 바로 데이터 프레임으로 읽어 들일 수 있다.
모델을 구축하기 위한 고수준 데이터 프레임 기반 API 기반으로 여러 인기 있는 머신러닝 알고리즘을 제공함.
Apache Kafka, 키네시스 등 다른 데이터 소스에서 들어오는 스트리밍 데이터, 정적 데이터를 실시간으로 연결하고 반응하는 모델. 정형화 스트리밍 모델(Spark Streaming)의 하부에는 스파크 SQL 엔진이 장애 복구와 지연 데이터의 모든 측면을 관리해줌.
그래프를 조작하고 병렬 연산을 수행하기 위한 라이브러리
하나의 스파크 애플리케이션은 스파크 클러스터의 병렬 작업들을 조율하는 하나의 드라이브 프로그램으로 이루어진다. 드라이버는 스파크 세션 객체를 통해 클러스터의 분산 컴포넌트들에 접근한다.
스파크 세션 객체를 초기화하는 책임을 가진 스파크 애플리케이션의 일부로서, 클러스터 매니저와 통신하며 스파크 이그제큐터들을 위해 필요한 자원(CPU, 메모리 등)을 요청하고 모든 스파크 작업을 DAG 연산 형태로 변환하고 스케줄링하며 각 실행 단위를 태스크로 나누어 스파크 이그제큐터들에게 분배해준다. 자원이 할당된 후 드라이버와 스파크 이그제큐터는 직접 통신한다.
모든 스파크 연산과 데이터에 대한 통합 연결 채널. SparkContext, SQLContext, HiveContext, SparkConf, StreamingContext 등을 합쳐놓았을 뿐 아니라 스파크 작업을 편하게 해줌. 이 일원화된 연결 채널을 통해 JVM 실행 파라미터들을 만들고 데이터 프레임이나 데이터세트를 정의하고, 데이터 소스에서 데이터를 읽고 메타데이터에 접근하여 스파크 SQL 질의를 실행할 수 있음.
스파크 애플리케이션이 실행되는 클러스터에서 자원을 관리 및 할당. 내장 단독(standalone) 클러스터 매니저, Apache Hadoop YARN, Apache Mesos, 쿠버네티스 총 네종류의 클러스터 매니저 지원.
클러스터의 각 노워커 노드에서 동작한다. 이그제큐터는 드라이버 프로그램과 통신하며 워커에서 태스크를 실행하는 역할을 함.
실제 물리 데이터는 HDFS나 클라우드 저장소에 존재하는 파티션이 되어 저장소 전체에 분산된다. 데이터가 파티션으로 되어 물리적으로 분산되면서 스파크는 각 파티션을 메모리의 데이터 프레임 객체로 바라본다. 각 스파크 이그제큐터는 가급적이면 데이터 지역성을 고려하여 네트워크에서 가장 가까운 파티션을 읽어 들이도록 태스크를 할당한다.(항상은 아님)
이러한 파티셔닝을 통해 효과적인 병렬 처리를 가능하게 해준다. 각 이그제큐터가 쓰는 CPU 코어는 작업해야 하는 데이터의 파티션에 할당됨.
Spark Start(2) (3) | 2024.10.31 |
---|---|
Spark Start(1) (1) | 2024.10.30 |
About Spark (2) | 2024.10.11 |
Spark 용어 정리 (0) | 2024.06.21 |
Apache Airflow는 워크플로우 자동화 도구입니다. 워크플로우는 의존성이 있는 작업(Task)들의 집합으로, 이들을 다이렉트 그래프 형태로 관리합니다. 각 작업은 다른 작업의 실행에 의존할 수 있으며, 작업들이 상호작용하는 방식에 따라 워크플로우가 정의됩니다.
Airflow는 이러한 핵심 개념들을 통해 복잡한 워크플로를 정의하고 실행할 수 있게 해주며, 사용자는 Python을 이용해 DAG와 Task를 정의하게 됩니다.
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html
Airflow DAGs (1) | 2024.10.29 |
---|---|
About Airflow (0) | 2024.06.08 |
RDBMS와 같은 전통적인 저장 시스템 및 어떠한 프로그래밍 수단으로도 Google이 색인을 만들고 검색할 규모의 인터넷 문서를 다루는 것은 불가능했다. 새로운 접근 방식이 필요하여 필연적으로 GFS(Google File System), 맵리듀스, 빅테이블 등이 생겨났다.
GFS : 클러스터 안에서 상용 서버에 장애 내구성이 있는 분산 파일시스템 제공
빅테이블 : GFS 기반으로 구조화된 대규모 데이터의 저장 수단 제공
맵리듀스 : 함수형 프로그래밍을 기반으로 GFS와 빅테이블 위에서 대규모 데이터 분산 처리가 가능한 병령 프로그래밍의 패러다임 제시
2006년 4월 아파치로 이관되면서 관련 모듈은 하둡 프레임워크의 일부가 되었다.
이관된 후 HDFS(Hadoop File System)에서 돌아가는 맵리듀스 프레임워크에 몇가지 단점이 있었다.
이러한 단점들이 있었기에 하둡 맵리듀스 작업에 참여했던 UC 버클리의 연구원들이 Spark 프로젝트에서 단점들을 개선한 분산 처치 프레임워크를 개발함. 지금은 훨씬 빠르지만 초기에도 특정 작업에 대해서 10~20배 빨랐음.
맵리듀스 애플리케이션은 데이터를 맵리듀스 시스템과 연계하여 데이터의 지역성과 랙의 근접성 등을 고려해 데이터가 존재하는 곳으로 연산 코드를 보냄. 클러스터의 워커 노드들은 중간 연산을 통해 집계하고, 결과를 합쳐 리듀스 함수에서 최종 결과를 생산해서 이를 애플리케이션이 접근 가능한 분산 저장소에 기록한다. 이러한 접근 방식은 네트워크 트래픽을 크게 감소시키면서 네트워크로 데이터를 분산시키는 것을 지양하고, 로컬 디스크에 대한 IO를 극대화한다.
Spark Start(2) (3) | 2024.10.31 |
---|---|
Spark Start(1) (1) | 2024.10.30 |
Spark Components (0) | 2024.10.25 |
Spark 용어 정리 (0) | 2024.06.21 |
aws ec2 위에서 작업하는 경우나 좀 더 디테일한 도커 작업을 위해서는 CLI 명령어를 알 필요가 있습니다.
ec2에 도커를 올리면서 필요했던 명령어들을 정리해보겠습니다.
# 도커 자원들이 얼마나 메모리 차지하고 있는지
$ docker system df
# 도커 자원들 상세히
$ docker system df -v
# 컨테이너 실행
# -f를 통해 파일 경로를 설정.
# -d : 백그라운드 실행
$ docker compose -f ./docker-compose.prod.yml up
# 모든 이미지를 새로 빌드해서 컨테이너 생성
$ docker compose up --build
# 컨테이너 중단 및 제거
$ docker compose down
# 컨테이너 중단
$ docker compose stop
# 컨테이너 실행
$ docker compose ㄴㅅㅁㄱㅅ
# 현재 이미지 확인
$ docker images
# 이미지 삭제
# 컨테이너 삭제 전 강제로 삭제하고 싶다면 -f 옵션 추가
$ docker rmi [이미지id]
# 동작중인 컨테이너 확인
$ docker ps
# 정지된 컨테이너 확인
$ docker ps -a
# 컨테이너 삭제
$ docker rm [컨테이너id1], [컨테이너id2]
# 컨테이너 모두 삭제
$ docker rm `docker ps -a -q`
# 실행중인 컨테이너 자원 할당 정보 확인
$ docker stats
# 메모리 제한 변경
$ docker update --memory 1024m [컨테이너id]
# 메모리 제한보다 꼭 커야함
docker update --memory-swap 2048m [컨테이너id]
# 확인
docker inspect [컨테이너id] | grep -i memory
Docker 명령어 정리 (0) | 2024.05.30 |
---|
boto3는 Python용 AWS SDK로, S3, EC2와 같은 AWS 서비스에 접근하는 프로그램을 개발할 수 있도록 해준다.
AWS에서 생성된 Credantial을 환경변수 혹은 boto3 config에 입력해 주면 해당 AWS 계정의 권한을 획득할 수 있다.
※ SDK(software development kit)란 개발자를 위한 플랫폼별 구축 도구 세트이다. 특정 플랫폼, 운영 체제 또는 프로그래밍 언어에서 실행되는 코드를 만들려면 디버거, 컴파일러 및 라이브러리와 같은 구성요소가 필요하고 이를 제공해주는 것이 SDK이다.
import boto3
# s3 클라이언트 연결
s3_client = boto3.client(
service_name='s3' # s3에 연결
region_name="사용하는 AWS 서비스 지역"
aws_access_key_id="액세스 키"
aws_secret_access_key="시크릿 키"
)
# file_name, Body : 업로드할 파일명
# Bucket : S3에 적재될 bucket 이름
# key : 키의 이름. 저장될 데이터 이름
s3_client.upload_file(file_name, bucket_name, key) # AWS S3로 파일 업로드
s3_client.put_object(Body, Bucket, Key) # AWS S3로 파일 업로드
s3_client.get_object(Bucket=bucket_name, Key=file_name) # AWS S3에서 로컬로 파일 읽기
이 밖에도 boto3를 이용해 사용할 수 있는 메소드들이 많고 ec2도 제어할 수 있는데 필요할 때 추가해보겠습니다.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
S3 - Boto3 1.34.145 documentation
Previous ListRumMetricsDestinations
boto3.amazonaws.com
셔플링 : 파티션간에 데이터 이동이 필요한 경우 발생
셔플링이 발생하는 경우
- 명시적 파티션을 새롭게 하는 경우
- 시스템에 의해 이뤄지는 셔플링
hasing partition
range partition
데이터 구조
- RDD
로우레벨 데이터로 클러스터내의 서버에 분산된 데이터를 지칭
레코드별로 존재하지만 스키마가 존재하지 않음
- 구조화된 데이터나 비구조화된 데이터 모두 지원
변경이 불가능한 분산 저장된 데이터
- 다수의 파티션으로 구성
- 로우레벨의 함수형 변환 지원(map, filter, flatMap 등등)
일반 파이썬 데이터는 parallelize 함수로 RDD로 변환
- 반대는 collect로 파이썬 데이터로 변환 가능
- DataFrame
변경이 불가한 분산 저장된 데이터(RDD 위에서 돌기 때문)
RDD와는 다르게 관계형 데이터베이스 테이블처럼 컬럼으로 나눠 저장
- Pandas의 데이터 프레임 혹은 RDB의 테이블과 거의 흡사
- 다양한 데이터 소스 지원: HDFS, Hive, 외부 DB, RDD 등
스칼라, 자바, 파이썬과 같은 언어에서 지원
- Dataset
RDD위에 만들어지는 RDD와는 달리 필드 정보를 갖고 있음(테이블)
Dataset은 타입 정보가 존재하며 컴파일 언어에서 사용 가능
컴파일 언어 : Scala/Java에서 사용 가능
PySpark에서는 DataFrame을 사용
Spark Session 생성
Spark 프로그램의 시작은 Spark Session을 생성하는 것
Spark Start(2) (3) | 2024.10.31 |
---|---|
Spark Start(1) (1) | 2024.10.30 |
Spark Components (0) | 2024.10.25 |
About Spark (2) | 2024.10.11 |