로그 관리

Apache Airflow를 사용하면 dag가 실행될 때 마다 위와 같이 로그가 남는다.
한 폴더당 20KB 정도 되는데 하루에 10개씩 생기며 다른 대그가 추가될 수도 있기에 조금만 지나도 메모리를 많이 차지할 것이다.
때문에 로그를 14일치만 저장해두고 삭제하는 dag를 따로 작성하였다.

import os
import shutil
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.operators.python import PythonOperator

def arrange_old_logs():
    log_dir = '/opt/airflow/logs'
    # 14일치 로그 파일 계산
    cutoff_date = (datetime.now(timezone.utc) - timedelta(days=14)).date()
    delete_count = 0

    # dag_id : logs 아래 각 폴더명
    # dag_path : logs 아래 각 폴더의 경로
    # run_id : dag_id=~~ 폴더 아래의 각 폴더명
    # run_path : run_id로 되어있는 로그를 담고있는 파일 경로
    # run_time_str : 런타임 시간 문자열
    
    for dag_id in os.listdir(log_dir):
        dag_path = os.path.join(log_dir, dag_id)
        if os.path.isdir(dag_path):
            for run_id in os.listdir(dag_path):
                run_path = os.path.join(dag_path, run_id)
                if os.path.isdir(run_path):
                    run_time_str = run_id.split('__')[-1]
                    try:
                        run_time = datetime.fromisoformat(run_time_str).date()
                        # 지금으로부터 14일 전 보다 더 이전 파일들 제거
                        if run_time < cutoff_date:
                            shutil.rmtree(run_path)
                            delete_count += 1
                            print(f"Deleted logs: {run_path}")
                    except ValueError:
                        print(f"Skipping: {run_path}, unable to parse date from {run_time_str}")
    
    print(f"Total logs deleted: {delete_count}")

default_args = {
    'owner': 'joonghyeon',
    'depends_on_past': False,
    'start_date': datetime(2025, 6, 23),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    dag_id='arrangeLogs',
    default_args=default_args,
    schedule='0 17 * * 1-5',  # 매일 새벽 2시에 실행
    catchup=False,
    max_active_runs=1,
    tags=['log', 'arrange', 'cleanup']
) as dag:
    arrange_task = PythonOperator(
        task_id='arrange_old_logs',
        python_callable=arrange_old_logs,
    )
    arrange_task


추가로 dag가 많아진다면 실패한 task에 대한 로그는 남겨두는 기능을 추가할 예정이다.
(지금은 매일 Airflow UI로 확인 중이다.)

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (4)  (0) 2025.08.19
노션 - 구글 Sheet 연동 (3)  (2) 2025.08.18
노션 - 구글 Sheet 연동 (2)  (1) 2025.07.23
노션 - 구글 Sheet 연동 (1)  (1) 2025.07.21

시트 생성 및 업데이트 - 시트 업데이트

 

파일 로드 및 저장

def load_file_to_dict(path):
    if os.path.exists(path):
        with open(path, "r") as f:
            return dict(json.load(f))
    return dict()

def save_file_to_dict(path, cache):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(cache, f, ensure_ascii=False, indent=4)

- 시트 생성과 업데이트에 둘 다 쓰이고 원하는 구조가 key : value 이기에 함수화 하였다.

 

시트 업데이트

시트 업데이트 같은 경우 두가지 dictionary 구조의 json 파일을 이용한다.

  • tabs_dict : 프로젝트 명이 변경될 경우 시트 탭 타이틀이 변경되어야 한다. 구글 시트의 각 탭에는 gid라는 값이 존재하는데 이 값이 시트의 각 탭의 유일성을 보장해주기에 이 값을 활용한다. {gid : 탭 타이틀}
  • update_dict : 노션의 데이터베이스의 '최종 편집 일시' 속성을 이용한다. 그리고 노션의 데이터베이스에서 새로 row(페이지)가 생성되면 각 row마다 페이지 링크가 있고 링크마다 id가 존재한다. {페이지 id : 최종 편집 일시} 를 저장해두어 만약 페이지 id 값이 저장되어 있지 않거나 현재 노션에 표기된 '최종 편집 일시'와 저장되어 있는 값이 다르다면 업데이트가 이루어진 것이고 시트에도 반영되어야 할 것이다.
# 시트 업데이트
def update_sheets():
    # === 기존 기록 로드 ===
    tabs_dict = load_file_to_dict(SHEET_TABS_INFO)
    
    # === 업데이트  ===
    update_dict = load_file_to_dict(PROJECT_UPDATE_INFO)
    
    # 시트 연결
    sheets = get_sheets_service()
    
    # === 노션 데이터 불러오기
    results = notion.databases.query(database_id=PROJECT_DB_ID)["results"]
    
    # === 시트 복사 및 데이터 삽입
    for page in results:
        props = page["properties"]
        sheet_url = props["sheet url"]['url']
        last_edited_time = props["최종 편집 일시"]["last_edited_time"]
        page_id = page["id"]
        
        if page_id in update_dict and update_dict[page_id] == last_edited_time:
            continue
    
        # 시트 생성이 되지 않은 프로젝트
        if sheet_url == None:
            continue
        
        update_dict[page_id] = last_edited_time
        project_name = props["프로젝트명"]["title"][0]["plain_text"]
        tab_title = f"{project_name}_결산"
        gid = props["sheet url"]["url"].split('gid=')[-1]
        
        if tabs_dict[gid] != tab_title:
            # 탭 타이틀 최신화
            tabs_dict[gid] = tab_title
            
            # 시트 탭 이름 변경
            sheets.spreadsheets().batchUpdate(
                spreadsheetId=GOOGLE_SHEET_ID,
                body={
                    "requests": [
                        {
                            "updateSheetProperties": {
                                "properties": {
                                    "sheetId": gid,
                                    "title": tab_title
                                },
                                "fields": "title"
                            }
                        }
                    ]
                }
            ).execute()
        
        project_info = {
            "project_name": project_name,
            "project_type": props["프로젝트 형태"]["select"]["name"] if props["프로젝트 형태"]["select"] else '-',
            "business_manager": props["영업 담당자"]["multi_select"][0]["name"] if len(props["영업 담당자"]["multi_select"]) != 0 else '-',
            "release_date": props["납품일"]["date"]["start"] if props["납품일"]["date"] else '',
            "catalog_no": props["Cat No."]["rich_text"][0]["plain_text"] if len(props["Cat No."]["rich_text"]) != 0 else '',
            "unit_quantity": props["unit quantity"]["number"] if props["unit quantity"]["number"] else 0,
            "extra_quantity": props["extra quantity"]["number"] if props["extra quantity"]["number"] else 0,
            "vinyl_set": props["vinyl set"]["select"]["name"] if props["vinyl set"]["select"] else '-',
        }
        # 값 삽입
        sheets.spreadsheets().values().batchUpdate(
            spreadsheetId=GOOGLE_SHEET_ID,
            body={
                "valueInputOption": "USER_ENTERED",
                "data": [
                    {"range": tab_title + "!D4", "values": [[project_info["project_name"]]]},
                    {"range": tab_title + "!D6", "values": [[project_info["catalog_no"]]]},
                    {"range": tab_title + "!D7", "values": [[project_info["project_type"]]]},
                    {"range": tab_title + "!D8", "values": [[project_info["business_manager"]]]},
                    {"range": tab_title + "!F6", "values": [[project_info["release_date"]]]},
                    {"range": tab_title + "!I5", "values": [[project_info["unit_quantity"]]]},
                    {"range": tab_title + "!I6", "values": [[project_info["extra_quantity"]]]},
                    {"range": tab_title + "!I7", "values": [[project_info["vinyl_set"]]]},
                ]
            }
        ).execute()
        time.sleep(3)
    
    # 기록 저장
    save_file_to_dict(SHEET_TABS_INFO, tabs_dict)
    save_file_to_dict(PROJECT_UPDATE_INFO, update_dict)
    
    print("✅ 시트 업데이트 완료")

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (5)  (0) 2025.08.20
노션 - 구글 Sheet 연동 (3)  (2) 2025.08.18
노션 - 구글 Sheet 연동 (2)  (1) 2025.07.23
노션 - 구글 Sheet 연동 (1)  (1) 2025.07.21

시트 생성 및 업데이트 - 시트 생성

시트 생성 및 업데이트를 하나의 DAG에서 Task를 나눠 진행하였다. 처음에는 DAG를 나눠 진행하였는데 그 이유는 다음과 같다.
- 시트 생성이 다 되기 전에 업데이트 Task 진행에 대한 우려.
- 생성 및 업데이트에서 오류가 발생하더라도 다른 Task는 계속 진행되게끔 하기 위하여.

하지만 결산을 위한 시트이기에 생성 및 업데이트에 오류가 있더라도 코드 수정을 최대한 빠르게 한다면 업무에 지장이 없었고 시트 생성 후 time 라이브러리의 sleep을 활용하여 강제적으로 잠시 쉬게 만들어 해결하였다.

노션 프로젝트 관리 탭 및 결산 시트

def create_sheets():
    # === 기존 기록 로드 ===
    tabs_dict = load_file_to_dict(SHEET_TABS_INFO)
    
    # === 구글 시트 연결
    sheets = get_sheets_service()
    
    # === 노션 데이터 불러오기
    results = notion.databases.query(database_id=PROJECT_DB_ID)["results"]
    
    # 신규 프로젝트
    new_pages = []
    
    # 신규 프로젝트 체크
    for page in results:
        sheet_url = page["properties"]["sheet url"]["url"]
        if sheet_url == None:
            new_pages.append(page)
    
    # 최근 생성된 프로젝트일 수록 시트가 마지막에 생성되게끔 정열
    new_pages.reverse()
    # === 시트 복사 및 데이터 삽입
    for page in new_pages:
        props = page["properties"]
        project_name = props["프로젝트명"]["title"][0]["plain_text"]
        new_tab_title = f"{project_name}_결산"
        
        # 탭 복사
        copied_tab = sheets.spreadsheets().sheets().copyTo(
            spreadsheetId=GOOGLE_SHEET_ID,
            sheetId=TEMPLATE_TAB_ID,
            body={"destinationSpreadsheetId": GOOGLE_SHEET_ID}
        ).execute()
        
        # 새로 생긴 결산 탭 gid
        new_tab_id = copied_tab["sheetId"]

        # gid : tab_title 저장
        tabs_dict[new_tab_id] = new_tab_title

        # 탭 이름 변경
        sheets.spreadsheets().batchUpdate(
            spreadsheetId=GOOGLE_SHEET_ID,
            body={
                "requests": [
                    {
                        "updateSheetProperties": {
                            "properties": {
                                "sheetId": new_tab_id,
                                "title": new_tab_title
                            },
                            "fields": "title"
                        }
                    }
                ]
            }
        ).execute()
        
        # 노션에 링크 업데이트
        sheet_url = f"https://docs.google.com/spreadsheets/d/{GOOGLE_SHEET_ID}/edit?gid={new_tab_id}#gid={new_tab_id}"
        
        notion.pages.update(
            page_id=page["id"],
            properties={
                "sheet url": {"url": sheet_url}
            }
        )
        notion.blocks.children.append(
            block_id=page["id"],
            children=[
                {
                    "object": "block",
                    "type": "embed",
                    "embed": {
                        "url": sheet_url  # 시트 URL 입력
                    }
                },
                {
                    "object": "block",
                    "type": "heading_2",
                    "heading_2": {
                        "rich_text": [
                            {
                                "type": "text",
                                "text": {
                                    "content": "공지사항"
                                }
                            }
                        ]
                    }
                }
            ]
        )
        time.sleep(3)
        
    # 기록 저장
    save_file_to_dict(SHEET_TABS_INFO, tabs_dict)

    print("✅ 시트 생성 완료")
    time.sleep(5)

시트 생성 과정

  1. 노션에서 신규 프로젝트 여부 체크.(각 프로젝트 속성으로 시트의 url을 저장하게끔 하여 url이 채워지지 않았다면 신규 프로젝트)
  2. 신규 시트 생성(템플릿 시트 복제) 및 시트 탭 타이틀 변경
  3. 신규 시트 gid : 탭 타이틀 정보 저장(탭 타이틀 변경 사항 체크용)
  4. 신규 시트 url을 노션에 저장

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (5)  (0) 2025.08.20
노션 - 구글 Sheet 연동 (4)  (0) 2025.08.19
노션 - 구글 Sheet 연동 (2)  (1) 2025.07.23
노션 - 구글 Sheet 연동 (1)  (1) 2025.07.21

시트 생성 및 업데이트 - 인증

결산 시트는 실시간으로 작성을 할 필요는 없기에 배치 처리하는 방식을 택했고 Airflow를 활용하여 자동화 및 스케쥴링을 하였습니다.

구글, 노션 API 엔드포인트 연결

# === 구글 시트 연결 ===
def get_sheets_service():
    creds = None
    
    # 1
    if os.path.exists(TOKEN_PICKLE):
        with open(TOKEN_PICKLE, 'rb') as token:
            creds = pickle.load(token)
    if not creds or not creds.valid:
    	# 2
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        # 3
        else:
            flow = InstalledAppFlow.from_client_secrets_file(CREDENTIALS_FILE, SCOPES)
            creds = flow.run_local_server(port=0)
        with open(TOKEN_PICKLE, 'wb') as token:
            pickle.dump(creds, token)
	# 4
    return build('sheets', 'v4', credentials=creds)


# === 노션 연결 ===
# 5
notion = NotionClient(auth=NOTION_TOKEN)

구글 API를 사용하려면 Oauth 2.0으로 인증을 해야하는데 새로운 창이 열리고 로그인 후 인증을 해야합니다.
그렇기에 토큰을 피클 파일로 저장하는 과정을 거치면 로그인 후 인증하는 과정이 생략됩니다.

# 1 : 로그인 토큰이 있으면 불러오기
# 2 : 토큰이 없거나 만료됐으면 refresh
# 3 : 처음 로그인일 때. 브라우저로 사용자 인증 받은 후 토큰을 파일로 저장
# 4 : 인증된 Google Sheets API 객체 반환.
# 5 : 노션 API 연결. 전역 변수로 둔 이유는 시트 생성과 업데이트에 모두 사용되기 때문.

* NotionClient는 JS Notion SDK를 기준으로 작성된 모듈. Notion API의 사용을 좀 더 편리하게 해줍니다. 특히 Notion API를 사용할 때는 아래와 같이 토큰 인증과 버전도 명시해주어야 해서 상당히 귀찮지만 NotionClient에서는 Notion API 관리할 수 있기에 Default 값으로 지정한 "2022-06-28" 사용하거나 필요에 따라 버전을 변경해서 사용할 수 있습니다.
노션 API에 대한 자세한 설명은 따로 포스트를 작성할 예정입니다.

curl https://api.notion.com/v1/users/01da9b00-e400-4959-91ce-af55307647e5 \
  -H "Authorization: Bearer secret_t1CdN9S8yicG5eWLUOfhcWaOscVnFXns"
  -H "Notion-Version: 2022-06-28"

 

 

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (5)  (0) 2025.08.20
노션 - 구글 Sheet 연동 (4)  (0) 2025.08.19
노션 - 구글 Sheet 연동 (3)  (2) 2025.08.18
노션 - 구글 Sheet 연동 (1)  (1) 2025.07.21

노션 사용 이유

  • 회사 프로젝트와 발주, 문서, 재고 등을 관리하고 공유하기 위해 사용하기로 결정.
  • API를 활용한 Google Sheets와의 연동.
  • 효율적인 자동화.
  • 관리에 있어 가시성이 좋고 편리함.

Airflow 사용 이유

  • 실시간 처리하기에는 Notion에서 제공하는 웹훅이 오류가 많음.
  • 배치 처리로도 충분하기에.

대략적인 아키텍처
노션 프로젝트
노션 프로젝트 생성하면 생기는 결산 시트

 

코드에 대한 회고는 다음 포스트부터 해보겠습니다..!

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (5)  (0) 2025.08.20
노션 - 구글 Sheet 연동 (4)  (0) 2025.08.19
노션 - 구글 Sheet 연동 (3)  (2) 2025.08.18
노션 - 구글 Sheet 연동 (2)  (1) 2025.07.23

먼저 객체 지향 프로그래밍이란 인간 중심적 프로그래밍 패러다임이라고 할 수 있습니다. 현실 세계의 사물들을 객체라고 보고 그 객체로부터 개발하고자 하는 애플리케이션에 필요한 특징들을 프로그래밍 하는 것이다. 객체는 속성과 기능으로 이루어져 있다.

장점과 단점

💡장점
- 모듈화, 캡슐화로 인해 유지보수에 용이하다.
- 객체 자체가 하나의 프로그램이기에 재사용에 용이하다.
- 에러를 컴파일 단계에서 잡아낼 수 있으므로 버그 발생이 줄어듦.
- 데이터 모델링을 할 때 객체와 매핑하는 것이 수월하기 때문에 요구사항을 보다 명확하게 파악 가능하다.

💡단점
- 대부분의 객체지향 프로그램은 속도가 상대적으로 느려지고, 메모리를 많이 사용하는 경향이 있다.
- 객체 간의 정보 교환이 모두 메시지 교환을 통해 일어나므로 실행 시스템에 많은 overhead가 발생한다.
- 코드를 설계하고 작성하는데에 많은 시간이 소요된다.

 

4가지 특징

  • 추상화
    - 어떤 대상/집단의 공통적이면서 본질적인 특징을 추출하여 정의하는 것
  • 상속
    - 상위 클래스의 모든 것을 하위 클래스가 이어받는 것.
    - 기존에 구현한 클래스를 재활용 함으로써 코드 재사용성을 높일 수 있음.
  • 다형성
    - 어떤 객체의 속성이나 기능이 상황에 따라 여러 형태로 변할 수 있다는 것을 의미
    - 메서드 오버라이딩/오버로딩이 있다.
    - 오버로딩 : 같은 이름의 메서드 여러개를 가지면서 매개변수의 유형과 개수를 다르게 하여 다양한 유형의 호출에 응답할 수 있게 하는 기술
    - 오버라이딩 : 상위 클래스가 가지고 있는 메서드를 하위 클래스가 재정의해서 사용
  • 캡슐화 : 데이터와 코드의 형태를 외부로부터 알 수 없게하고, 데이터의 구조와 역할, 기능을 하나의 캡슐 형태로 만드는 방법

 

객체 지향 설계의 5원칙 S.O.L.I.D

  • SRP(Single Responsibility Principle) : 단일 책임 원칙
    - 클래스는 단 하나의 책임만 가져야 한다는 원칙
    - 책임은 하나의 '기능 담당' 으로 보면 됨.
    - 한 책임의 변경으로부터 다른 책임이 변경이 되는 연쇄작용 방지.
  • OCP(Open Closed Principle) : 개방 폐쇄 원칙
    - 확장에 열려있어야 하며 수정에는 닫혀있어야 한다는 원칙
    - 새로운 변경 사항이 발생했을 때
        - 유연하게 코드를 추가함으로써 애플리케이션 기능을 확장. (확장에 열려있음)
        - 객체를 직접적으로 수정을 제한함. (수정에 닫혀있음)
  • LSP(Listov Substitution Principle) : 리스코프 치환 원칙
    서브 타입은 언제나 부모 타입으로 교체할 수 있어야 한다는 원칙
    - 다형성 원리를 이용하기 위한 원칙
  • ISP(Interface Segregation Principle) : 인터페이스 분리 원칙
    - 인터페이스를 각각 사용에 맞게 잘게 분리해야 한다는 설계 원칙
    - 인터페이스를 사용하는 클라이언트를 기준으로 분리함으로써 클라이언트의 목적과 용도에 적합한 인터페이스를 제공하는 것이 목표
  • DIP(Dependency Inversion Principle) : 의존 역전 원칙
    - 어떤 클래스를 참조해서 사용해야하는 상황에 그 클래스를 직접 참조하는 것이 아니라 그 대상의 상위 요소(추상 클래스 or 인터페이스)를 참조하라는 원칙
    - 클래스간의 결합도(coupling)을 낮추는 것

사용법

1. 문자열 합치기
SELECT CONCAT('문자열', '합치기');
-> 문자열합치기
2. 숫자 합치기
SELECT CONCAT(123, 456, 789);
-> 123456789
3. 숫자와 문자 합치기
SELECT CONCAT('문자열', 1004)
-> 문자열1004

대화형 셸

pyspark
#1 어디 폴더 아래서든 터미널에서 입력

./pyspark
#2 bin 폴더에서 입력
  1. pip install pyspark로 설치.
  2. apache spark 다운로드 페이지에서 다운.

대화형 셸 화면(로컬 모드)

 

스파크 애플리케이션

  • 애플리케이션
    API를 써서 스파크 위에서 돌아가는 사용자 프로그램. 드라이버 프로그램과 클러스터의 실행기로 이루어져 있습니다.
  • SparkSession
    스파크 코어 기능들과 상호 작용할 수 있는 진입점을 제공하며 그 API로 프로그래밍을 할 수 있게 해주는 객체. 스파크 셸에서 스파크 드라이버는 기본적으로 SparkSession을 제공하지만 스파크 애플리케이션에서는 사용자가 객체를 생성해서 써야 합니다.
  • 잡(job)
    스파크 액션(save(), collect() 등)에 대한 응답으로 생성되는 여러 태스크로 이루어진 병렬 연산.
  • 스테이지(stage)
    각 잡은 스테이지라 불리는 서로 의존성을 가지는 다수의 태스크 모음으로 나뉩니다.
  • 태스크(task)
    스파크 이그제큐터로 보내지는 작업 실행의 가장 기본적인 단위.

  • 애플리케이션 안의 드라이버가 스파크세션 객체 생성.
  • 스파크 셸을 사용할 땐 셸에 포함되어있는 형태이며 SparkSession 객체가 미리 만들어진다.
  • 스파크 셸로 상호 작용하는 작업 동안 드라이버는 스파크 애플리케이션을 하나 이상의 스파크 잡으로 변환한다. 그리고 각 잡은 DAG로 변환된다. 또한 DAG에서 각각의 노드는 하나 이상의 스파크 스테이지에 해당한다.
  • 어떤 작업이 연속적 또는 병렬적으로 수행되는지에 맞춰 스테이지에 해당하는 DAG 노드가 생성된다. 종종 스파크 이그제큐터끼리의 데이터 전송이 이루어지는 연산 범위 경계 위에서 스테이지가 결정되기도 한다.(셔플이라 불리는 노드끼리의 데이터 교환이 스테이지의 경계가 되는 경우)
  • 각 스테이지는 최소 실행 단위이며 스파크 이그제큐터들 위에서 연합 실행되는 스파크 태스크들로 이루어진다. 각 태스크는 개별 CPU 코어에 할당되고 데이터의 개별 파티션을 갖고 작업한다. 이렇게 병렬처리가 되는 것.


트랜스포메이션, 액션, 지연 평가

  • 트랜스포메이션
    • 이미 불변성의 특징을 가진 원본 데이터를 수정하지 않고 하나의 스파크 데이터 프레임을 새로운 데이터 프레임으로 변형.
    • 트랜스포메이션의 결과는 즉시 계산되는 것이 아니라 계보(lineage)라 불리는 형태로 기록된다. 기록된 리니지는 실행 계획에서 후반쯤에 스파크가 확실한 트랜스포메이션들끼리 재배열하거나 합치거나 해서 더 효육적으로 실행할 수 있도록 최적화.
    • 스파크는 리니지에 각 트랜스포메이션을 기록해 놓고 데이터 프레임들은 과정 중 변하지 않기 때문에 단순히 기록된 리니지를 재실행하는 것만으로도 복원할 수 있으며 덕분에 장애 상황에도 유연성을 확보할 수 있다.
    • 의존성의 정도에 따라 두가지로 분류된다.
      • 좁은 의존성 : 하나의 입력 파티션을 연산하여 하나의 결과 파티션을 내놓는 트랜스포메이션
        예) filter(), contains()
      • 넓은 의존성 : 하나의 입력 파티션을 연산하여 여러 결과 파티션을 내놓는 트랜스포메이션
        예) orderBy(), groupBy()
    • 예 : orderBy(), groupBy(), filter(), select(), join() 
  • 액션
    • 하나의 액션은 모든 기록된 트랜스포메이션의 지연 연산을 발동시킨다.
    • 쿼리 계획 안의 어떤 것도 액션이 호출되기 전에는 실행되지 않음.
    • 쿼리 실행 계획의 일부로서 기록된 모든 트랜스포메이션들의 실행을 시작하게 한다.
    • 예 : show(), take(), count(), collect(), save()
  • 지연평가
    • 액션이 실행되는 시점이나 데이터에 실제 접근하는 시점까지 실제 실행을 미루는 스파크의 전략.
    • 스파크가 사용자의 연계된 트랜스포메이션들을 살펴봄으로써 쿼리 최적화를 가능하게 하지만 리니지와 데이터의 불변성은 장애에 대한 데이터 내구성을 제공.

 

스파크 UI

스파크가 배포된 방식에 맞춰 드라이버가 웹 UI를 띄우게 되며 기본적으로 4040 포트를 사용한다.(http://localhost:4040)
스파크 UI에서는 다음과 같은 다양한 수치와 내용을 볼 수 있다.

  • 스케줄러의 스테이지와 태스크 목록
  • RDD 크기와 메모리 사용의 요약
  • 환경 정보
  • 실행 중인 이그제큐터 정보
  • 모든 스파크 SQL 쿼리

'Data Engineering > Spark' 카테고리의 다른 글

Spark Start(1)  (1) 2024.10.30
Spark Components  (0) 2024.10.25
About Spark  (2) 2024.10.11
Spark 용어 정리  (0) 2024.06.21

러닝 스파크에서 나오는 실습들은 스파크셸에서의 모든 처리가 하나의 머신에서 이루어지는 로컬 모드를 사용한다.
분산 처리의 이득을 보기 원하거나 실무 작업에서는 YARN이나 Kubernetes 배포 모드를 사용해야 한다.
로컬 모드는 프레임워크를 익히면서 반복적인 스파크 수행이 가능하여 빠른 피드백을 얻을 수 있다.

1. 설치

기본 설치 방법

! macOS를 사용중이기에 이 운영체제를 기준으로 설치 방법을 작성하겠습니다.

  1. Spark release : 최신 버전
  2. package type : pre-built for Apache Hadoop x.x for later 선택
  3. Download Spark : 링크 클릭!
  4. 자바 버전 8/11/17 설치 후 JAVA_HOME 환경변수 설정 필요. 해당 사이트에서 자신이 사용하는 Spark 버전과 호환되는 자바 설치 필요. https://spark.apache.org/docs/latest/

    4-1 어떤 터미널을 사용하냐에 따라 다르지만 ohmyzsh를 사용중이라면 'vi ~/.zshrc'를 입력, 'vi ~/.bash_profile'나 'vi ~/.bashrc' 등 다 다르니 환경에 맞게 입력.
    4-2 vi 편집기로 들어가면 여러 환경변수들이 있을 것입니다. 맨 아래 줄에 아래 코드를 추가해줍니다.(i를 누르면 입력가능. esc를 누른 후 wq하면 저장 후 나가기)
    4-3 source ~/.zshrc를 입력하여 변경된 환경변수를 적용시켜 줍니다.
JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.jdk/Contents/Home
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export PATH

PyPI 설치 방법

터미널에서 'pip install pyspark' 입력
추가 라이브러리는 'pip install pyspark[sql,ml,mllib]' 이렇게 설치

https://spark.apache.org/downloads.html

 

Downloads | Apache Spark

Download Apache Spark™ Choose a Spark release: Choose a package type: Download Spark: Verify this release using the and project release KEYS by following these procedures. Note that Spark 3 is pre-built with Scala 2.12 in general and Spark 3.2+ provides

spark.apache.org

 

2. Spark을 설치한 후 폴더 살펴보기

spark-x.x.x-bin-hadoopx.x.tgz를 압축 해제하고 나면 아래처럼 파일들이 있을 것입니다.

  • README.md
    스파크에 관련된 상세한 설명들을 담고있습니다.
  • bin
    스파크 셸들(spark-sql, pyspark, spark-shell, sparkR)을 포함해서 스파크와 상호 작용할 수 있는 대부분의 스크립트를 갖고 있습니다. 셸과 실행 파일은 나중에 spark-submit을 사용해서 단독 스파크 애플리케이션을 제출하거나 쿠버네티스로 스파크를 실행할 때 도커 이미지를 만들고 푸시하는 스크립트 작성을 위해 사용됩니다.
  • sbin
    이 폴더의 대부분의 스크립트는 다양한 배포 모드에서 클러스터의 스파크 컴포넌트들을 시작하고 중지하기 위한 관리 목적입니다.
  • kubernetes
    쿠버네티스 클러스터에서 쓰는 스파크를 도커 이미지 제작을 위한 Dockerfile들을 담고 있습니다. 그리고 도커 이미지를 빌드하기 전 스파크 배포본을 어떻게 만들지에 대한 가이드를 제공하는 파일도 포함하고 있습니다.
  • examples
    Spark를 배울 때 참고할만한 예제 코드들이 담겨있습니다.

'Data Engineering > Spark' 카테고리의 다른 글

Spark Start(2)  (3) 2024.10.31
Spark Components  (0) 2024.10.25
About Spark  (2) 2024.10.11
Spark 용어 정리  (0) 2024.06.21

Airflow의 핵심 개념인 DAG (Directed Acyclic Graph)는 작업(Task)들을 모아 실행 순서와 관계를 설정하여, 작업이 어떤 방식으로 실행되어야 하는지 정의하는 구조입니다. DAG 내의 작업들은 의존 관계와 순서가 있으며, 각 작업이 선행 작업의 완료 후에 실행되도록 구성됩니다. DAG는 Airflow에서 전체 워크플로의 청사진 역할을 하며, 특정한 실행 흐름을 따르게 만들어 줍니다.

DAG 예시

 

DAG 정의

DAG를 정의하는 방법에는 세가지가 있다.

  1. with 명령어를 사용. 대그 안에 암묵적으로 어떤 것이든 코드를 추가할 수 있다.
  2.  표준 생성자를 사용. DAG를 사용하는 모든 연산자에 전달할 수 있음.
  3. @dag 데코레이터 사용. 함수를 DAG 생성자로 변환할 때 사용. 
import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
):
    EmptyOperator(task_id="task")

# 1. with 명령어를 사용하는 방법.

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

my_dag = DAG(
    dag_id="my_dag_name",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)

# 2. 표준 생성자를 사용하는 방법

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

# 3. @dag 데코레이터를 사용하는 방법

Task 의존성

Task나 Operator은 혼자 존재하지 않는다. Task간의 의존성을 선언하는 것이 DAG의 구조를 형성합니다.
task 의존성을 선언하는 방법은 여러가지 있습니다. 3, 4번 방법은 만약 task의 리스트를 다른 리스트에 의존하게끔 만들고 싶을 때 사용. 

  1. >>, << 연산자 사용.
  2. set_upstream, set_downstream 메소드 사용. >>, << 연산자보다 더 명시적이다.
  3.  cross_downstream 메소드 사용
  4. chain 메소드 사용.
# 1
first_task >> [second_task, third_task]
third_task << fourth_task

# 2
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

# 3
from airflow.models.baseoperator import cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])

# 4-1
from airflow.models.baseoperator import chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])

# 4-2 동일한 크기에 대해 쌍별 종속성 수행
from airflow.models.baseoperator import chain

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)

 

'Data Engineering > Airflow' 카테고리의 다른 글

Airflow Architecture  (0) 2024.10.25
About Airflow  (0) 2024.06.08

+ Recent posts