실행 파일 (auto_organizer.py) 구성

 

  • 감시할 폴더 경로 설정
  • 감지 핸들러와 연결
  • 감시 시작 및 종료 처리
  • 터미널 + 로그 파일에 모든 정보 출력 (logging)
  • 기존에는 print()로만 출력되던 정리 결과를
    logging 모듈을 활용해 파일(log_로그 저장 날짜.log)에 저장되도록 설정했다.
  • 덕분에 프로그램이 백그라운드에서 실행되더라도
    실제로 어떤 파일이 어떻게 정리되었는지 추적 가능하다.
# auto_organizer.py

import time
import logging
from datetime import datetime
from watchdog.observers import Observer
from handler import FileEventHandler

# 로그 설정 (터미널 + 파일 기록)
log_filename = datetime.now().strftime("log_%Y-%m-%d.log")
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(log_filename),
        logging.StreamHandler()
    ]
)


if __name__ == "__main__":
    # 감시할 폴더 경로 (변경 가능)
    path_to_watch = "폴더 경로"

    # 이벤트 핸들러와 Observer 생성
    event_handler = FileEventHandler()
    observer = Observer()
    observer.schedule(event_handler, path=path_to_watch, recursive=False)

    # 감시 시작
    observer.start()
    logging.info(f"[Started] Watching folder: {path_to_watch}")

    try:
        while True:
            time.sleep(1) # 계속 감시 유지
    except KeyboardInterrupt: # Ctrl + C로 종료
        # 종료 시 처리
        observer.stop()
        logging.info("\n[Stopped] File watcher terminated.")
    
    observer.join()

 

 

안정성 보완 - wait_until_ready()

한꺼번에 수십 ~ 수백 개의 파일이 들어오는 경우, 파일이 복사 완료되기 전에 이동이 시도될 수 있어 오류가 발생할 수 있다.

이를 해결하기 위해 wait_until_ready() 함수를 handler_utils.py에 따로 정의했다.
이 함수는 일정 시간 동안 파일 크기가 변하지 않으면 복사 완료로 간주하고 이후 처리를 진행한다.

이 함수는 handler.py의 on_created() 안에서 사용된다.

# handler_utils.py

import time
import os

def wait_until_ready(filepath, timeout=5, check_interval=1):
    # 파일 크기가 일정 시간 동안 변하지 않으면 준비 완료로 간주
    prev_size = -1

    for _ in range(timeout):
        try:
            curr_size = os.path.getsize(filepath)
        except FileNotFoundError:
            # 아직 복사 중이라 파일이 잠깐 사라진 경우
            time.sleep(check_interval)
            continue

        if curr_size == prev_size:
            return True
        
        prev_size = curr_size
        time.sleep(check_interval)
    
    return False # 시간 초과: 여전히 파일 크기 변화 중

 

handler.py 수정 - wait_until_ready 함수 호출

# handler.py

import os
import logging
import shutil
from watchdog.events import FileSystemEventHandler
from config import EXTENSION_MAP, DEFAULT_FOLDER
from handler_utils import wait_until_ready

class FileEventHandler(FileSystemEventHandler):
    def on_created(self, event):
        # event가 디렉토리일 시 무시 처리
        if event.is_directory:
            return
        
        if not wait_until_ready(event.src_path):
            logging.info(f"[Skipped] File not ready: {event.src_path}")
            return
            
        # 이하 동일

 

전체 프로젝트 흐름

1. config.py -> 확장자 기준 정의

2. handler.py -> 이벤트 감지 시 정리 로직 실행

3. handler_utils.py -> 복사 중인 파일 안정성 확보

4. auto_organizer.py -> 모든 구성 연결 및 실행

5. logging -> 로그 파일 저장 + 터미널 출력

# 실행
python auto_organizer.py

 

이 프로그램을 통해 단순한 자동화 스크립트를 넘어서, 실시간 감지, 구조화된 설계, 그리고 로그 추적까지 가능한 파일 정리 시스템을 구현할 수 있었다.

환경 설정

먼저 프로그램을 만들 디렉토리에 필요한 환경을 구성하기 위한 준비를 했다.

# 가상환경 생성
python3 -m venv venv

# 가상환경 활성화
source venv/bin/activate

# 필요한 외부 라이브러리 설치
pip install watchdog

 

config.py - 확장자에 따른 정리 기준

자동 정리기의 핵심은 파일 확장자별로 어떤 폴더로 이동시킬지 기준을 세우는 것이다.
이를 위해 딕셔너리 형태로 규칙을 정의해두었다.

  • 키는 파일 확장자, 값은 해당 확장자를 이동시킬 폴더 이름
  • 딕셔너리로 구성했기 때문에 나중에 유지보수가 매우 용이
  • DEFAULT_FOLDER는 확장자가 등록되어 있지 않은 파일이 생겼을 때의 기본 정리 폴더
  • 확장자 분류 예외를 방지하기 위해, 모든 확장자명을 '.확장자명' 형태로작성
# config.py

EXTENSION_MAP = {
    '.hwp': 'Docs',
    '.doc': 'Docs',
    '.docx': 'Docs',
    '.pdf': 'Docs',
    '.ppt': 'Presentations',
    '.pptx': 'Presentations',
    '.jpg': 'Imgs',
    '.jpeg': 'Imgs',
    '.png': 'Imgs',
    '.gif': 'Imgs',
    '.txt': 'Texts',
    '.zip': 'Archives',
    '.mp3': 'MV',
    '.mp4': 'MV',
}

DEFAULT_FOLDER = 'Others'

 

 

handler.py - 감지된 파일을 분류하여 이동시키는 로직

watchdog이 감지한 이벤트에 따라 어떤 작업을 수행할지를 정의하는 부분이다.
FileSystemEventHandler를 상속받아, 파일이 생성될 때(on_created) 작동하도록 구현했다.

import os
import shutil
from watchdog.events import FileSystemEventHandler
from config import EXTENSION_MAP, DEFAULT_FOLDER

class FileEventHandler(FileSystemEventHandler):
    def on_created(self, event):
        # event가 디렉토리일 시 무시 처리
        if event.is_directory:
            return
        
        # 확장자명 추출 (.jpg, .pdf 등등)
        _, ext = os.path.splitext(event.src_path) # event.src_path: 새로 생성된 파일의 전체 경로
        ext = ext.lower()

        # config.py에서 지정한 이동할 폴더명 결정
        folder_name = EXTENSION_MAP.get(ext, DEFAULT_FOLDER)

        base_dir = os.path.dirname(event.src_path) # 파일이 생성된 상위 폴더의 경로
        target_dir = os.path.join(base_dir, folder_name) # 해당 확장자에 맞는 정리 대상 폴더의 전체 경로
        # 예시: 확장자가 .pdf 면 folder_name = 'Docs' -> target_dir = ".../Docs"

        # 폴더가 없으면 생성
        if not os.path.exists(target_dir):
            os.makedirs(target_dir)
        
        # 파일 이동
        try:
            shutil.move(
                event.src_path,
                os.path.join(target_dir, os.path.basename(event.src_path))
            )
            print(f"[Moved] {event.src_path} -> {target_dir}/")
        except Exception as e:
            print(f"[Error] Failed to move file: {e}")

 

흐름 설명 예시

감지된 파일 (event.src_path) /Users/username/Documents/test.pdf
확장자 (ext) .pdf
폴더명 (folder_name) Docs (config에서 매핑)
base_dir /Users/username/Documents
target_dir /Users/username/Documents/Docs
최종 이동 경로 /Users/username/Documents/Docs/test.pdf

 

핵심 포인트

 

  • 감지된 확장자에 따라 정리 기준을 찾고
  • 그 폴더가 없다면 생성하고
  • shutil.move()로 해당 위치로 이동시킴

 

공식 문서 참고

이미 shutil과 watchdog을 사용하는 예제들은 블로그나 커뮤니티에 많이 나와 있지만,
각 라이브러리가 어떤 구조로 설계되어 있고, 어떤 동작 흐름을 따르는지 정확히 이해하기 위해
공식 문서를 먼저 참고했다.

 

shutil 공식 문서 요약

https://docs.python.org/ko/3.13/library/shutil.html

 

shutil — High-level file operations

Source code: Lib/shutil.py The shutil module offers a number of high-level operations on files and collections of files. In particular, functions are provided which support file copying and removal...

docs.python.org

 

  • 표준 라이브러리이며, 파일 및 디렉토리를 복사, 이동, 삭제할 수 있는 고수준 함수를 제공
  • 주요 함수: move(), copy(), rmtree() 등
  • move()는 내부적으로 상황에 따라 os.rename() 또는 복사 후 삭제 방식으로 동작
  • 따라서 파일 이동 시 예외 처리(파일 충돌, 경로 존재 여부 등)를 고려해야 함

 

watchdog 공식 문서 요약

https://python-watchdog.readthedocs.io/en/stable/

 

  • 외부 라이브러리로, 파일 시스템 상의 변화를 감지하는 데 사용
  • 기본 구조:
    1. Observer 객체를 통해 감시할 폴더를 지정
    2. FileSystemEventHandler를 상속한 클래스를 만들고 on_created() 등 이벤트 메서드 구현
    3. observer.schedule()을 통해 감시 대상과 핸들러를 연결
  • 감지 가능한 이벤트 종류: 파일 생성, 수정, 삭제, 이동 등
  • 문서에서는 Observer 패턴에 따라 클래스를 분리해서 설계할 것을 권장

 

세부 설계 방향

 

  • watchdog → 실시간 이벤트 감지 (on_created 사용 예정)
  • shutil → 감지된 파일을 자동으로 분류/이동 처리이를 기반으로 코드 구조를 아래와 같이 나눌 예정:
    • handler.py → 감지된 이벤트에 따라 동작하는 핸들러 정의
    • config.py → 확장자별 분류 규칙을 설정하는 설정 파일

 

 

 

파일 자동 정리 스크립트란?

폴더 내에 파일을 저장할 때, 확장자명 (jpg, png, txt, pdf ... ) 에 따라 자동으로 분류하여 저장하도록 하는 프로그램.

이력서, 증명 사진 등등이 늘어나 폴더 내에 파일들이 너무 중구난방식으로 저장되어 정리가 필요한데, 앞으로 늘어날 때마다 자동으로 정리하기 위해 프로그램 생성

 

계획

대표적으로 Python에서 파일 자동 정리를 위한 라이브러리로 shutil이라는 라이브러리를 사용하지만, 이는 명백한 단점이 존재.

파일을 저장할 때마다 수동으로 프로그램을 실행시켜야 파일이 정리됌. 이는 자동 정리라는 목표에 부합하지 않음.

그러면 파일이 들어왔을 때 이를 감지하는 라이브러리는 없을까? 알아본 결과, watchdog이라는 라이브러리를 발견.

이는 폴더에 파일이 들어왔을 때, 이를 감지하고 알림을 주는 기능을 한다.

두 라이브러리를 사용하기 위한 것들을 정리해봤다.

shutil 파일 이동 ❌ 표준 라이브러리
os 경로 생성, 파일 검사 등 ❌ 표준 라이브러리
watchdog 파일 시스템 감시 ✅ pip install watchdog 필요
time 루프 유지를 위한 대기 등 ❌ 표준 라이브러리
logging 콘솔 메시지 출력용  ❌ 표준 라이브러리

 

예상 폴더 구조

project_folder/
│
├── auto_organizer.py     ← 메인 실행 파일
├── config.py             ← 확장자별 정리 규칙 정의
├── handler.py            ← 이벤트 핸들러 정의
└── README.md             ← 프로젝트 설명 및 사용법

 

예상 시나리오

 

  • 프로그램이 실행되면 Observer가 해당 폴더 감시 시작
  • 새 파일이 생성되면 이벤트 발생
  • 핸들러가 해당 파일의 확장자 확인
  • shutil을 이용해 해당하는 폴더로 이동
  • 이동이 완료되면 로그 출력

 

해당 문제를 접근하기 전에, 누적합에 대해 먼저 서술해보자.

첫번째로, 문제의 유형이 "구간의 합" 혹은 "구간의 평균/최댓값" 등이 반복된다면 누적합을 검토한다.

위 문제를 예시로 보면, 정사각형의 배열에서 구간의 합을 구하라는 문제이므로, 누적합을 생각할 수 있다.

 

두번째로, 문제에서 주어진 구간의 합을 구하기 전에 전체 배열을 한 번 읽을 수 있는가를 확인해야 한다.

누적합을 적용하려면, 전체 배열의 합을 구할 수 있어야 유의미하기 때문에 전체 배열을 읽을 수 있는 지 확인해야 한다.

 

위 문제는 2차원 배열이지만, 우선 1차원 배열에서 접근하는 방식을 알고 있어야 한다.

다음과 같은 문제가 있다고 하자.

더보기

"길이가 N 인 배열 A 가 주어지고, M 개의 입력값을 받는다. 각 입력값 마다 구간 [L, R]의 합을 구해 출력한다."

 이 문제를 직관적으로 풀어보면, 각 입력값마다 L ~ R 까지를 계산해서 출력하는 방법이 있다.

N, M = map(int, input().split())

A = list(map(int, input().split()))

for _ in range(M):
	L, R = map(int, input().split())
    cnt = 0
    for i in range(L, R + 1):
    	cnt += A[i]
   	print(cnt)

이런 식으로 풀 수 있는데, 이렇게 되면 N 과 M 이 커질수록 반복 계산이 늘어나 시간이 오래 걸리는 문제가 생긴다.

그러면 이 반복 계산을 없애는 방법에 대해 고민해야 하는데, 이 때 누적합을 사용하면된다.

 

누적합의 핵심은 "미리 한번 계산한다" 이다.

배열의 첫 i 개 원소 합을 저장하는 보조 배열 P 를 하나 만든다면, 아래와 같고,

더보기

P[i] = A[1] + A[2] + ... + A[i]

이렇게 됬을 때, 구간 [L, R]의 합은

더보기

A[L] + A[L + 1] + ... + A[R] = (A[1] + ... + A[R]) - (A[1] + ... + A[L - 1]) = P[R] - P[L - 1]

이렇게 계산할 수 있다. 그러면 우리는 한 번 P 를 채우는 데에만 시간을 쓰고, 각 입력값에 대한 계산은 한번에 처리 가능한 구조가 된다.

 

이제 P 배열을 만드는 방법에 대해 생각해보자, 간단하게 앞에서부터 각각의 합을 구하는 점화식을 생각해보면, 아래와 같다.

더보기

P[i] = P[i - 1] + A[i]

작은 예제로 간단하게 계산해 보겠다.

- 배열 A = [3, 1, 4, 1, 5] (실제 코드에선 A[0] = 3 이지만, 이 문제에서는 예시이므로 A[1] = 3 이라 칭하겠다.)

- 누적합 P 계산

  1. P[0] = 0

  2. P[1] = P[0] + A[1] = 0 + 3 = 3

  3. P[2] = P[1] + A[2] = 3 + 1 = 4

  4. P[3] = P[2] + A[3] = 4 + 4 = 8

  5. P[4] = P[3] + A[4] = 8 + 1 = 9

  4. P[3] = P[4] + A[5] = 9 + 5 = 14

- 완성된 P = [0, 3, 4, 8, 9, 14]

- 입력값 예시

  1. 구간 [2, 4] 합: A[2] + A[3] + A[4] = 1 + 4 + 1 = 6

     -> P[4] - P[1] = 9 - 3 = 6

  2. 구간 [1, 5] 합: P[5] - P[0] = 14 - 0 = 14

 

이제  이 1차원 누적합을 2차원으로 확장시켜 위 문제에 적용시켜보자.

우선 2차원 배열을 아래와 같이 표로 표현한다.

  1 2 3 4
1 1 2 3 4
2 2 3 4 5
3 3 4 5 6
4 4 5 6 7

그 후, 원본 배열과 같은 크기의 누적합 배열을 만든다. 이 때 누적합 배열에는 0번째 행과 0번째 열을 추가하여 모두 0으로 채운다.

  0 1 2 3 4
0 0 0 0 0 0
1 0        
2 0        
3 0        
4 0        

이제 이 누적합 배열을 합쳐야 하는데, 그 원리는 다음과 같다.

(1, 1) ~ (i, j) 까지의 합은 다음 4 가지를 이용하여 계산할 수 있다.

A. (1, 1) ~ (i - 1, j) 까지 합

  예를 들어, (1, 1) ~ (3, 3) 까지를 더한다면, (3, 3)의 위쪽까지의 합을 의미한다.

B. (1, 1) ~ (i, j - 1) 까지 합

  마찬가지로, (1, 1) ~ (3, 3) 까지 더할 때, (3, 3)의 왼쪽까지의 합을 의미한다.

C. (1, 1) ~ (i - 1, j - 1) 중복된 영역의 합

  A와 B 계산에서 중복적으로 계산된 합을 의미한다.

D. 현재 위치(i, j)의 원본 배열 값

  예시에서의 (3, 3)을 의미한다.

(이 원리는 표를 함께 보면 더 이해하기 쉬울 것이다.)

최종적으로는 아래와 같은 식이 완성된다.

더보기

S[i][j] = A + B - C + D

= S[i - 1][j] + S[i][j - 1] - S[i - 1][j - 1] + A[i][j]

자, 이제 이 식을 근거로 위 누적합 배열을 원본 배열을 이용하여 채워보면, 아래 표와 같이 된다.

  0 1 2 3 4
0 0 0 0 0 0
1 0 1 3 6 10
2 0 3 8 15 24
3 0 6 15 27 42
4 0 10 24 42 64

예) S[2][3] = 15 는 (1, 1) ~ (2, 3) 영역의 합이다.

더보기

S[1][3] + S[2][2] - S[1][2] + A[2][3] = 6 + 8 - 3 + 4 = 15

이제 이를 이용해서 우리가 원하는 구간의 합을 구해보자. 이 원리를 이해했다면, 구간의 합을 구하는 공식도 쉽게 이해할 수 있다.

더보기

S[x2][y2] - S[x1 -1][y2] - S[x2][y1 - 1] + S[x1 - 1][y1 - 1] = 전체 - 위쪽 - 왼쪽 + 중복 제거된 것

예시로 (2, 2) ~ (3, 4) 까지의 합을 구한다고 하면, 42 - 10 - 6 + 1 = 27 이 나오게 된다.

(이 또한 표를 보고 왜 위쪽과 왼쪽을 빼줘야 하는 지 보면 이해할 수 있다.)

 

결론 코드

import sys

input = sys.stdin.readline

N, M = map(int, input().split())

A = [list(map(int, input().split())) for _ in range(N)]

S = [[0] * (N + 1) for _ in range(N + 1)]

for i in range(1, N + 1):
    for j in range(1, N + 1):
        S[i][j] = S[i - 1][j] + S[i][j - 1] - S[i - 1][j - 1] + A[i - 1][j - 1]

out = []

for _ in range(M):
    x1, y1, x2, y2 = map(int, input().split())
    ans = S[x2][y2] - S[x1 - 1][y2] - S[x2][y1 - 1] + S[x1 - 1][y1 - 1]
    out.append(str(ans))

sys.stdout.write("\n".join(out))

'BaekJoon' 카테고리의 다른 글

[Python] 17103. 골드바흐 파티션  (0) 2025.04.15
[Python] 1929.소수 구하기  (0) 2025.04.12
[Python] 4134. 다음 소수  (0) 2025.04.12
[Python] 2485. 가로수  (0) 2025.04.10
[Python] 1735. 분수 합  (0) 2025.04.09

User

기능 method url description
userinfo GET /accounts/api/user/ 로그인 유저 확인
signup POST /accounts/api/signup/ 회원가입
login POST /accounts/api/login/ 로그인
logout POST /accounts/api/logout/ 로그아웃
refresh POST /accounts/api/refresh/ Access Token 재발급

 

Profile

기능 method url description
profile GET /accounts/api/profile/ 프로필 조회
password_change POST /accounts/api/password_change/ 비밀번호 변경
delete DELETE /accounts/api/delete/ 회원 탈퇴

 

Chat

기능 method url description
list_session GET /chat/api/sessions/ 로그인 사용자의 모든 대화방 목록 조회
create_session POST /chat/api/sessions/ 새 대화방 생성
retrieve_session GET /chat/api/sessions/{session_id}/ 대화방의 메세지 목록 조회
send_message POST /chat/api/sessions/{session_id}/messages/ 대화방에 메세지 전송 -> LLM 응답 반환
leave_session DELETE /chat/api/sessions/{session_id}/delete/ 대화방 프로필에서 삭제

 

문제 설명

 

문제


풀이(답)

WITH RECURSIVE generations AS (  # 1. 재귀함수 정의
	SELECT				# 2. Anchor member: 부모가 없는(최초) 1 세대
    	id,
        parent_id,
        1 AS generation		# 3. 1 세대를 의미
    FROM ECOLI_DATA
    WHERE parent_id IS NULL 	# 4. 부모가 NULL이면 최상위 세대
    
    UNION ALL					# 5. Anchor 결과와 Recursive 결과를 합침
    
    SELECT				# 6. Recursive member: 이미 찾은 세대의 자식들을 찾아 세대를 +1 해가며 확장
    	child.id,
        child.parent_id
        parent.generation + 1 as generation	# 7. 부모 세대 +1 을 자식 세대로 지정
    FROM ECOLI_DATA AS child JOIN generations as parent ON child.parent_id = parent.id
    # 8. 전체 데이터(child)에서 기존의 계산된 generations(parent)의 id와 child.parent_id가 
    # 일치하는 자식만 선택
)
# 9. 재귀 완성 후, generations를 이용해 결과 조회
SELECT
	COUNT(*) AS COUNT
    g.generation AS GENERATION
FROM generations AS g
WHERE NOT EXISTS (	# 10. NOT EXISTS를 사용해 '자식이 없는' 레코드만 걸러냄
	SELECT GENERATION
    FROM ECOLI_DATA AS e2
    WHERE e2.parent_id = g.id	# 11. 이 ID를 부모로 가진 레코드가 하나도 없는 경우
)
GROUP BY g.generation
ORDER BY g.generation

📌 문제에 대한 접근 방법

이 문제는 ECOLI_DATA 테이블에서 계층 구조(부모-자식 관계)를 다뤄야 하는 문제이다. 특히, 특정 개체의 자식 세대가 여러 단계로 확장되어 있어서 몇 세대까지 존재할지 모르는 경우, 단순 JOIN만으로는 처리하기 어렵다.

이런 상황에서는 다음의 두 가지를 활용한다:

  • WITH RECURSIVE
    깊이를 알 수 없는 트리 구조(부모-자식 관계)를 SQL만으로 해결할 때 사용하는 방법이다. 최초 기준(부모가 없는 개체)을 세우고, 자기 자신과 JOIN을 반복(재귀 호출)하면서 다음 세대, 그 다음 세대까지 무한히 확장해 나갈 수 있다.
  • UNION ALL
    위에서 만든 최초 기준(Anchor member)의 결과와 재귀 호출(Recursive member)의 결과를 하나로 합치는 역할을 한다.
    UNION ALL은 단순히 결과를 붙이는 연산으로 중복 제거가 없기 때문에, 각 세대가 정확히 한번씩만 등장하는 이 문제에서 적합하다.

📌 코드 한 줄씩 상세히 복기하기 

🔍 WITH RECURSIVE 재귀함수 정의

  • 2~4번째 줄 (Anchor Member)
    parent_id가 NULL인 개체들은 최초의 개체, 즉 1세대임을 의미한다. 최초 개체의 generation을 1로 지정하고, 이들을 재귀의 기준(Anchor member)으로 삼는다.
  • 5번째 줄 (UNION ALL)
    Anchor에서 나온 1세대 결과와 아래에서 계속 찾을 자식 세대들을 모두 합치기 위해 사용한다.
  • 6~8번째 줄 (Recursive Member)
    이미 찾은 세대의 결과(generations)와 원본 데이터를 JOIN 하면서 다음 세대를 찾아가는 과정이다. JOIN 조건은 원본 테이블의 child.parent_id와 이미 찾은 부모 세대의 parent.id가 같아야 한다. 즉, “부모 세대와 자식 세대를 연결”해가는 과정이며, 세대를 하나씩 증가(parent.generation + 1)시키며 확장해간다.

이렇게 Anchor member(최초 세대)와 Recursive member(이후 모든 세대)가 계속 반복되면서, 전체 세대를 모두 찾아 generations 라는 임시 테이블이 만들어진다.


🔍 결과 조회 및 NOT EXISTS로 자식 없는(leaf) 개체 필터링

  • 9번째 줄 (재귀 결과 활용)
    재귀 호출로 완성된 전체 세대 데이터를 이제 실제 필요한 형식으로 가공하여 출력한다.
  • 10~11번째 줄 (NOT EXISTS)
    이 문제에서는 “자식이 없는 개체”를 찾는 게 핵심이다.
    여기서 NOT EXISTS는 하위 쿼리 결과가 하나도 없는 경우에만 참(True)을 반환한다. 즉,
SELECT 1 FROM ECOLI_DATA AS e2 WHERE e2.parent_id = g.id
위 서브쿼리가 의미하는 건, 현재 개체(g.id)를 부모로 가진 자식 개체가 존재하는지를 검사하는 것이다.

 

g.id e2.parent_id 존재 여부 EXISTS 결과 NOT EXISTS 결과
1 X (자식 없음) FALSE TRUE
2 O (자식 있음: 3,4,5) TRUE FALSE ❌
3 X (자식 없음) FALSE TRUE
4 O (자식 있음: 6,7) TRUE FALSE ❌
5 X (자식 없음) FALSE TRUE
6 O (자식 있음: 8) TRUE FALSE ❌
7 X (자식 없음) FALSE TRUE
8 X (자식 없음) FALSE TRUE

위 표를 보면 명확히 알 수 있듯, NOT EXISTS를 통해 자식이 없는(leaf) 개체만 걸러서 선택된다.


🔍 최종 집계와 정렬

  • 위에서 걸러낸 자식이 없는 개체들을 세대별로(GROUP BY g.generation) 몇 개인지 카운트하고(COUNT(*)),
  • 세대를 오름차순으로(ORDER BY g.generation) 정렬하여 최종 결과를 얻는다.

📌 언제 이 방법을 써야 하나?

데이터베이스 테이블이 자기 자신을 참조하는 계층 구조를 가질 때, 특히 깊이를 알 수 없는 경우에 자주 사용한다.

대표적 예시:

  • 회사의 조직도(CEO → 부장 → 과장 → 사원)
  • 게시글의 댓글과 답글 관계
  • 대장균처럼 분화되어 계속 이어지는 세대 관계(이 문제와 동일한 경우)

이러한 상황에서 WITH RECURSIVE + UNION ALL + NOT EXISTS를 사용하면 효율적이고 직관적인 SQL 쿼리를 작성할 수 있다.


결론적으로 다시 요약하면

  • 계층 구조 데이터를 다룰 때 → WITH RECURSIVE를 사용.
  • 최초 기준(Anchor)을 세우고, 식 세대(Recursive)를 찾아 내려가며 반복 확장.
  • 두 결과를 UNION ALL로 합쳐 모든 세대를 동적 생성.
  • 최종적으로 자식 없는 leaf 개체는 NOT EXISTS로 필터링하여 최종 결과를 도출.

문제

  • 골드바흐의 추측: 2보다 큰 짝수는 두 소수의 합으로 나타낼 수 있다.

짝수 N을 두 소수의 합으로 나타내는 표현을 골드바흐 파티션이라고 한다. 짝수 N이 주어졌을 때, 골드바흐 파티션의 개수를 구해보자. 두 소수의 순서만 다른 것은 같은 파티션이다.

입력

첫째 줄에 테스트 케이스의 개수 T (1 ≤ T ≤ 100)가 주어진다. 각 테스트 케이스는 한 줄로 이루어져 있고, 정수 N은 짝수이고, 2 < N ≤ 1,000,000을 만족한다.

출력

각각의 테스트 케이스마다 골드바흐 파티션의 수를 출력한다.

예제 입력 1 

5
6
8
10
12
100

예제 출력 1 

1
1
2
1
6

 

예제를 보고 어떤 식으로 이루어지는 지 추측해보자.

6 = 3 + 3

8 = 3 + 5

10 = 3 + 7 = 5 + 5

12 = 5 + 7

이런 식으로 순서와 상관 없이 이루어지는 걸 구하면 된다. -> 3 + 7과 7 + 3은 하나로 본다.

 

먼저, 그 숫자까지의 소수가 어떤 수가 있는 지를 구한다. 이 때, 테스트케이스를 여러 개 받는데, 받을 때마다 소수를 찾는 것은 불필요하니,

테스트케이스 중 가장 큰 숫자를 기준으로 소수를 구하고, 처리하는 것이 시간적으로 이득이다.

data = sys.stdin.read().split()
T = int(data[0])
nums = list(map(int, data[1:]))
max_n = max(nums)
sys.stdin.read().split()
read() : EOF가 들어올 때까지 읽는다. 즉, 테스트케이스를 한번에 입력해서 받아올 수 있게 함
split() : 공백(줄바꿈도 포함)을 기준으로 나누어 파싱해줌
예제를 이 방식으로 입력했다면 아래와 같이 저장됌
data = [5, 6, 8, 10, 12, 100]

 

가장 큰 수를 이용하여 소수를 구하는 함수에 넣고 소수가 들어있는 배열을 가져온다.

def sieve(limit):
    is_prime = [True] * (limit + 1)
    is_prime[0] = is_prime[1] = False
    end = int(limit ** 0.5) + 1

    for i in range(2, end):
        if is_prime[i]:
            for multiple in range(i * i, limit + 1, i):
                is_prime[multiple] = False
    
    primes = [idx for idx, is_p in enumerate(is_prime) if is_p]
    return is_prime, primes
    
 `````` 위 로직들 ````````
_, primes = sieve(max_n)
enumerate
배열에 인덱스와, 값이 동시에 필요한 상황에 사용하는 내장 함수
for idx, val in enumerate(arr, start=0) ....
start 옵션을 통해 인덱스 시작 위치를 정할 수 있다.
위의 적힌 코드를 풀어서 보자면,
primes[]
for idx in range(len(is_prime)):
     if is_prime[idx]:
          primes.append[idx]
인데, 이는 인덱스(idx)와 값(is_prime[idx])을 모두 사용하기 때문에 이렇게 풀어 쓰지 않고, enumerate를 쓰는 것이 코드의 가독성과
리스트를 순회하며 동시에 인덱스와 값을 꺼내는 성질로 인해 시간적으로도 이득이다.

_, primes = sieve(max_n)
위 함수에서 나온 만들어져 나온 배열은 is_prime 과 primes 두 개로, 실제로 문제에서 사용하는 것은 primes 하나 이기 때문에 primes만 return 해도 되지만, 모든 배열을 return(언젠가 쓰일 수도 있다는 가정)한다면, _ 값으로 받는다.(여기에 들어오는 값을 의도적으로 무시한다.)

 

이제 구한 소수들을 통해, 마지막으로 소수의 합이 n 이 되는 개수를 찾는 함수를 만든다.

def count_pairs(n, primes):
    l, r = 0, len(primes) - 1
    while r >=0 and primes[r] > n: # n 보다 큰 소수들을 배제한다.
        r -= 1
    
    ans = 0
    while l <= r:
        s = primes[l] + primes[r]
        if s == n:
            l += 1
            r -= 1
            ans += 1

        elif s < n:
            l += 1 
        
        else:
            r -= 1
    return ans

 

최종적인 코드를 보면,

import sys

def sieve(limit):
    is_prime = [True] * (limit + 1)
    is_prime[0] = is_prime[1] = False
    end = int(limit ** 0.5) + 1

    for i in range(2, end):
        if is_prime[i]:
            for multiple in range(i * i, limit + 1, i):
                is_prime[multiple] = False
    
    # primes = []
    # for j in range(limit + 1):
    #     if is_prime[j]:
    #         primes.append(j)
    primes = [idx for idx, is_p in enumerate(is_prime) if is_p]
    return is_prime, primes

def count_pairs(n, primes):
    l, r = 0, len(primes) - 1
    while r >=0 and primes[r] > n:
        r -= 1
    
    ans = 0
    while l <= r:
        s = primes[l] + primes[r]
        if s == n:
            l += 1
            r -= 1
            ans += 1

        elif s < n:
            l += 1 
        
        else:
            r -= 1
    return ans

data = sys.stdin.read().split()
T = int(data[0])
nums = list(map(int, data[1:]))
max_n = max(nums) 
_, primes = sieve(max_n)

out = []
for n in nums:
    out.append(str(count_pairs(n, primes)))
sys.stdout.write("\n".join(out))

 

'BaekJoon' 카테고리의 다른 글

[Python] 11660. 구간 합 구하기 5  (0) 2025.07.02
[Python] 1929.소수 구하기  (0) 2025.04.12
[Python] 4134. 다음 소수  (0) 2025.04.12
[Python] 2485. 가로수  (0) 2025.04.10
[Python] 1735. 분수 합  (0) 2025.04.09

+ Recent posts