로그 관리

Apache Airflow를 사용하면 dag가 실행될 때 마다 위와 같이 로그가 남는다.
한 폴더당 20KB 정도 되는데 하루에 10개씩 생기며 다른 대그가 추가될 수도 있기에 조금만 지나도 메모리를 많이 차지할 것이다.
때문에 로그를 14일치만 저장해두고 삭제하는 dag를 따로 작성하였다.

import os
import shutil
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.operators.python import PythonOperator

def arrange_old_logs():
    log_dir = '/opt/airflow/logs'
    # 14일치 로그 파일 계산
    cutoff_date = (datetime.now(timezone.utc) - timedelta(days=14)).date()
    delete_count = 0

    # dag_id : logs 아래 각 폴더명
    # dag_path : logs 아래 각 폴더의 경로
    # run_id : dag_id=~~ 폴더 아래의 각 폴더명
    # run_path : run_id로 되어있는 로그를 담고있는 파일 경로
    # run_time_str : 런타임 시간 문자열
    
    for dag_id in os.listdir(log_dir):
        dag_path = os.path.join(log_dir, dag_id)
        if os.path.isdir(dag_path):
            for run_id in os.listdir(dag_path):
                run_path = os.path.join(dag_path, run_id)
                if os.path.isdir(run_path):
                    run_time_str = run_id.split('__')[-1]
                    try:
                        run_time = datetime.fromisoformat(run_time_str).date()
                        # 지금으로부터 14일 전 보다 더 이전 파일들 제거
                        if run_time < cutoff_date:
                            shutil.rmtree(run_path)
                            delete_count += 1
                            print(f"Deleted logs: {run_path}")
                    except ValueError:
                        print(f"Skipping: {run_path}, unable to parse date from {run_time_str}")
    
    print(f"Total logs deleted: {delete_count}")

default_args = {
    'owner': 'joonghyeon',
    'depends_on_past': False,
    'start_date': datetime(2025, 6, 23),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
with DAG(
    dag_id='arrangeLogs',
    default_args=default_args,
    schedule='0 17 * * 1-5',  # 매일 새벽 2시에 실행
    catchup=False,
    max_active_runs=1,
    tags=['log', 'arrange', 'cleanup']
) as dag:
    arrange_task = PythonOperator(
        task_id='arrange_old_logs',
        python_callable=arrange_old_logs,
    )
    arrange_task


추가로 dag가 많아진다면 실패한 task에 대한 로그는 남겨두는 기능을 추가할 예정이다.
(지금은 매일 Airflow UI로 확인 중이다.)

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (4)  (0) 2025.08.19
노션 - 구글 Sheet 연동 (3)  (2) 2025.08.18
노션 - 구글 Sheet 연동 (2)  (1) 2025.07.23
노션 - 구글 Sheet 연동 (1)  (1) 2025.07.21

시트 생성 및 업데이트 - 시트 업데이트

 

파일 로드 및 저장

def load_file_to_dict(path):
    if os.path.exists(path):
        with open(path, "r") as f:
            return dict(json.load(f))
    return dict()

def save_file_to_dict(path, cache):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(cache, f, ensure_ascii=False, indent=4)

- 시트 생성과 업데이트에 둘 다 쓰이고 원하는 구조가 key : value 이기에 함수화 하였다.

 

시트 업데이트

시트 업데이트 같은 경우 두가지 dictionary 구조의 json 파일을 이용한다.

  • tabs_dict : 프로젝트 명이 변경될 경우 시트 탭 타이틀이 변경되어야 한다. 구글 시트의 각 탭에는 gid라는 값이 존재하는데 이 값이 시트의 각 탭의 유일성을 보장해주기에 이 값을 활용한다. {gid : 탭 타이틀}
  • update_dict : 노션의 데이터베이스의 '최종 편집 일시' 속성을 이용한다. 그리고 노션의 데이터베이스에서 새로 row(페이지)가 생성되면 각 row마다 페이지 링크가 있고 링크마다 id가 존재한다. {페이지 id : 최종 편집 일시} 를 저장해두어 만약 페이지 id 값이 저장되어 있지 않거나 현재 노션에 표기된 '최종 편집 일시'와 저장되어 있는 값이 다르다면 업데이트가 이루어진 것이고 시트에도 반영되어야 할 것이다.
# 시트 업데이트
def update_sheets():
    # === 기존 기록 로드 ===
    tabs_dict = load_file_to_dict(SHEET_TABS_INFO)
    
    # === 업데이트  ===
    update_dict = load_file_to_dict(PROJECT_UPDATE_INFO)
    
    # 시트 연결
    sheets = get_sheets_service()
    
    # === 노션 데이터 불러오기
    results = notion.databases.query(database_id=PROJECT_DB_ID)["results"]
    
    # === 시트 복사 및 데이터 삽입
    for page in results:
        props = page["properties"]
        sheet_url = props["sheet url"]['url']
        last_edited_time = props["최종 편집 일시"]["last_edited_time"]
        page_id = page["id"]
        
        if page_id in update_dict and update_dict[page_id] == last_edited_time:
            continue
    
        # 시트 생성이 되지 않은 프로젝트
        if sheet_url == None:
            continue
        
        update_dict[page_id] = last_edited_time
        project_name = props["프로젝트명"]["title"][0]["plain_text"]
        tab_title = f"{project_name}_결산"
        gid = props["sheet url"]["url"].split('gid=')[-1]
        
        if tabs_dict[gid] != tab_title:
            # 탭 타이틀 최신화
            tabs_dict[gid] = tab_title
            
            # 시트 탭 이름 변경
            sheets.spreadsheets().batchUpdate(
                spreadsheetId=GOOGLE_SHEET_ID,
                body={
                    "requests": [
                        {
                            "updateSheetProperties": {
                                "properties": {
                                    "sheetId": gid,
                                    "title": tab_title
                                },
                                "fields": "title"
                            }
                        }
                    ]
                }
            ).execute()
        
        project_info = {
            "project_name": project_name,
            "project_type": props["프로젝트 형태"]["select"]["name"] if props["프로젝트 형태"]["select"] else '-',
            "business_manager": props["영업 담당자"]["multi_select"][0]["name"] if len(props["영업 담당자"]["multi_select"]) != 0 else '-',
            "release_date": props["납품일"]["date"]["start"] if props["납품일"]["date"] else '',
            "catalog_no": props["Cat No."]["rich_text"][0]["plain_text"] if len(props["Cat No."]["rich_text"]) != 0 else '',
            "unit_quantity": props["unit quantity"]["number"] if props["unit quantity"]["number"] else 0,
            "extra_quantity": props["extra quantity"]["number"] if props["extra quantity"]["number"] else 0,
            "vinyl_set": props["vinyl set"]["select"]["name"] if props["vinyl set"]["select"] else '-',
        }
        # 값 삽입
        sheets.spreadsheets().values().batchUpdate(
            spreadsheetId=GOOGLE_SHEET_ID,
            body={
                "valueInputOption": "USER_ENTERED",
                "data": [
                    {"range": tab_title + "!D4", "values": [[project_info["project_name"]]]},
                    {"range": tab_title + "!D6", "values": [[project_info["catalog_no"]]]},
                    {"range": tab_title + "!D7", "values": [[project_info["project_type"]]]},
                    {"range": tab_title + "!D8", "values": [[project_info["business_manager"]]]},
                    {"range": tab_title + "!F6", "values": [[project_info["release_date"]]]},
                    {"range": tab_title + "!I5", "values": [[project_info["unit_quantity"]]]},
                    {"range": tab_title + "!I6", "values": [[project_info["extra_quantity"]]]},
                    {"range": tab_title + "!I7", "values": [[project_info["vinyl_set"]]]},
                ]
            }
        ).execute()
        time.sleep(3)
    
    # 기록 저장
    save_file_to_dict(SHEET_TABS_INFO, tabs_dict)
    save_file_to_dict(PROJECT_UPDATE_INFO, update_dict)
    
    print("✅ 시트 업데이트 완료")

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (5)  (0) 2025.08.20
노션 - 구글 Sheet 연동 (3)  (2) 2025.08.18
노션 - 구글 Sheet 연동 (2)  (1) 2025.07.23
노션 - 구글 Sheet 연동 (1)  (1) 2025.07.21

시트 생성 및 업데이트 - 시트 생성

시트 생성 및 업데이트를 하나의 DAG에서 Task를 나눠 진행하였다. 처음에는 DAG를 나눠 진행하였는데 그 이유는 다음과 같다.
- 시트 생성이 다 되기 전에 업데이트 Task 진행에 대한 우려.
- 생성 및 업데이트에서 오류가 발생하더라도 다른 Task는 계속 진행되게끔 하기 위하여.

하지만 결산을 위한 시트이기에 생성 및 업데이트에 오류가 있더라도 코드 수정을 최대한 빠르게 한다면 업무에 지장이 없었고 시트 생성 후 time 라이브러리의 sleep을 활용하여 강제적으로 잠시 쉬게 만들어 해결하였다.

노션 프로젝트 관리 탭 및 결산 시트

def create_sheets():
    # === 기존 기록 로드 ===
    tabs_dict = load_file_to_dict(SHEET_TABS_INFO)
    
    # === 구글 시트 연결
    sheets = get_sheets_service()
    
    # === 노션 데이터 불러오기
    results = notion.databases.query(database_id=PROJECT_DB_ID)["results"]
    
    # 신규 프로젝트
    new_pages = []
    
    # 신규 프로젝트 체크
    for page in results:
        sheet_url = page["properties"]["sheet url"]["url"]
        if sheet_url == None:
            new_pages.append(page)
    
    # 최근 생성된 프로젝트일 수록 시트가 마지막에 생성되게끔 정열
    new_pages.reverse()
    # === 시트 복사 및 데이터 삽입
    for page in new_pages:
        props = page["properties"]
        project_name = props["프로젝트명"]["title"][0]["plain_text"]
        new_tab_title = f"{project_name}_결산"
        
        # 탭 복사
        copied_tab = sheets.spreadsheets().sheets().copyTo(
            spreadsheetId=GOOGLE_SHEET_ID,
            sheetId=TEMPLATE_TAB_ID,
            body={"destinationSpreadsheetId": GOOGLE_SHEET_ID}
        ).execute()
        
        # 새로 생긴 결산 탭 gid
        new_tab_id = copied_tab["sheetId"]

        # gid : tab_title 저장
        tabs_dict[new_tab_id] = new_tab_title

        # 탭 이름 변경
        sheets.spreadsheets().batchUpdate(
            spreadsheetId=GOOGLE_SHEET_ID,
            body={
                "requests": [
                    {
                        "updateSheetProperties": {
                            "properties": {
                                "sheetId": new_tab_id,
                                "title": new_tab_title
                            },
                            "fields": "title"
                        }
                    }
                ]
            }
        ).execute()
        
        # 노션에 링크 업데이트
        sheet_url = f"https://docs.google.com/spreadsheets/d/{GOOGLE_SHEET_ID}/edit?gid={new_tab_id}#gid={new_tab_id}"
        
        notion.pages.update(
            page_id=page["id"],
            properties={
                "sheet url": {"url": sheet_url}
            }
        )
        notion.blocks.children.append(
            block_id=page["id"],
            children=[
                {
                    "object": "block",
                    "type": "embed",
                    "embed": {
                        "url": sheet_url  # 시트 URL 입력
                    }
                },
                {
                    "object": "block",
                    "type": "heading_2",
                    "heading_2": {
                        "rich_text": [
                            {
                                "type": "text",
                                "text": {
                                    "content": "공지사항"
                                }
                            }
                        ]
                    }
                }
            ]
        )
        time.sleep(3)
        
    # 기록 저장
    save_file_to_dict(SHEET_TABS_INFO, tabs_dict)

    print("✅ 시트 생성 완료")
    time.sleep(5)

시트 생성 과정

  1. 노션에서 신규 프로젝트 여부 체크.(각 프로젝트 속성으로 시트의 url을 저장하게끔 하여 url이 채워지지 않았다면 신규 프로젝트)
  2. 신규 시트 생성(템플릿 시트 복제) 및 시트 탭 타이틀 변경
  3. 신규 시트 gid : 탭 타이틀 정보 저장(탭 타이틀 변경 사항 체크용)
  4. 신규 시트 url을 노션에 저장

'프로젝트' 카테고리의 다른 글

노션 - 구글 Sheet 연동 (5)  (0) 2025.08.20
노션 - 구글 Sheet 연동 (4)  (0) 2025.08.19
노션 - 구글 Sheet 연동 (2)  (1) 2025.07.23
노션 - 구글 Sheet 연동 (1)  (1) 2025.07.21

+ Recent posts