Files
web-inspector/.claude/skills/database-patterns.md
jungwoo choi c37cda5b13 Initial commit: 프로젝트 초기 구성
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 16:10:57 +09:00

11 KiB

MongoDB 설계 패턴 (Database Patterns)

이 프로젝트의 MongoDB 설계 및 사용 패턴입니다.

연결 설정

Motor (async driver)

from motor.motor_asyncio import AsyncIOMotorClient

class WikipediaEnrichmentWorker:
    def __init__(self):
        self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017")
        self.db_name = os.getenv("DB_NAME", "ai_writer_db")
        self.db = None

    async def start(self):
        client = AsyncIOMotorClient(self.mongodb_url)
        self.db = client[self.db_name]

PyMongo (sync driver)

from pymongo import MongoClient
from pymongo.database import Database

client: MongoClient = None
db: Database = None

def connect_to_mongo():
    global client, db
    try:
        client = MongoClient(MONGODB_URL)
        db = client[DATABASE_NAME]
        client.admin.command('ping')  # 연결 테스트
        print(f"Successfully connected to MongoDB: {DATABASE_NAME}")
    except Exception as e:
        print(f"Error connecting to MongoDB: {e}")
        raise e

컬렉션 설계

기사 컬렉션 (articles_en)

{
    "_id": "ObjectId",
    "news_id": "unique_id",
    "title": "Article Title",
    "summary": "One-line summary",
    "subtopics": [
        {
            "title": "Subtopic 1",
            "content": ["Paragraph 1", "Paragraph 2"]
        }
    ],
    "categories": ["Category1", "Category2"],
    "entities": {
        "people": [
            {
                "name": "Person Name",
                "context": ["role", "company"],
                "birth_date": "1990-01-15",
                "wikipedia_url": "https://...",
                "image_urls": ["https://..."],
                "verified": true
            }
        ],
        "organizations": [
            {
                "name": "Organization Name",
                "context": ["industry", "type"],
                "founding_date": "2004-02-04",
                "wikipedia_url": "https://...",
                "image_urls": ["https://..."],
                "verified": true
            }
        ]
    },
    "wikipedia_enriched": true,
    "wikipedia_enriched_at": "2024-01-15T10:30:00",
    "created_at": "2024-01-15T10:00:00",
    "updated_at": "2024-01-15T10:30:00"
}

엔티티 캐시 컬렉션 (entity_people)

{
    "_id": "ObjectId",
    "name": "Elon Musk",
    "context": ["Tesla", "SpaceX", "CEO"],
    "birth_date": "1971-06-28",
    "wikipedia_url": "https://en.wikipedia.org/wiki/Elon_Musk",
    "image_urls": ["https://..."],
    "verified": true,
    "created_at": "2024-01-10T00:00:00",
    "updated_at": "2024-01-15T10:30:00"
}

인덱스 설계

인덱스 생성 패턴

class EntityCache:
    async def ensure_indexes(self):
        """인덱스 생성 (이미 존재하면 무시)"""
        try:
            # wikipedia_url이 unique key (동명이인 구분)
            try:
                await self.people_collection.create_index(
                    "wikipedia_url", unique=True, sparse=True
                )
            except Exception:
                pass  # 이미 존재

            # 이름으로 검색용 (동명이인 가능)
            try:
                await self.people_collection.create_index("name")
            except Exception:
                pass

            # context 검색용
            try:
                await self.people_collection.create_index("context")
            except Exception:
                pass

            # TTL 정책용
            try:
                await self.people_collection.create_index("updated_at")
            except Exception:
                pass

            logger.info("Entity cache indexes ensured")
        except Exception as e:
            logger.warning(f"Error ensuring indexes: {e}")

CRUD 패턴

Create (삽입)

async def save_person(self, data: Dict[str, Any]) -> bool:
    """인물 정보 저장/갱신 (wikipedia_url 기준)"""
    now = datetime.now()

    update_doc = {
        "name": data.get("name"),
        "context": data.get("context", []),
        "birth_date": data.get("birth_date"),
        "wikipedia_url": data.get("wikipedia_url"),
        "image_urls": data.get("image_urls", []),
        "verified": data.get("verified", False),
        "updated_at": now
    }

    if data.get("wikipedia_url"):
        # upsert: 있으면 업데이트, 없으면 삽입
        result = await self.people_collection.update_one(
            {"wikipedia_url": data["wikipedia_url"]},
            {
                "$set": update_doc,
                "$setOnInsert": {"created_at": now}
            },
            upsert=True
        )
        return result.modified_count > 0 or result.upserted_id is not None

Read (조회)

async def get_person(self, name: str, context: List[str] = None) -> Tuple[Optional[Dict], bool]:
    """
    인물 정보 조회 (context 기반 최적 매칭)

    Returns:
        Tuple of (cached_data, needs_refresh)
    """
    # 이름으로 모든 후보 검색
    cursor = self.people_collection.find({"name": {"$regex": f"^{name}$", "$options": "i"}})
    candidates = await cursor.to_list(length=10)

    if not candidates:
        return None, True

    # context가 있으면 최적 후보 선택
    if context:
        best_match = None
        best_score = -1

        for candidate in candidates:
            score = self._calculate_context_match_score(
                candidate.get("context", []), context
            )
            if score > best_score:
                best_score = score
                best_match = candidate

        if best_match and best_score >= MIN_CONTEXT_MATCH:
            needs_refresh = not self._is_cache_fresh(best_match)
            return best_match, needs_refresh

    # context 없으면 첫 번째 후보 반환
    candidate = candidates[0]
    needs_refresh = not self._is_cache_fresh(candidate)
    return candidate, needs_refresh

Update (수정)

async def update_article(self, mongodb_id: str, update_data: Dict[str, Any]):
    """기사 정보 업데이트"""
    result = await self.collection.update_one(
        {"_id": ObjectId(mongodb_id)},
        {
            "$set": {
                "entities.people": update_data.get("people", []),
                "entities.organizations": update_data.get("organizations", []),
                "wikipedia_enriched": True,
                "wikipedia_enriched_at": datetime.now().isoformat()
            }
        }
    )
    return result.modified_count > 0

Delete (삭제)

async def delete_old_cache(self, days: int = 30):
    """오래된 캐시 데이터 삭제"""
    cutoff_date = datetime.now() - timedelta(days=days)
    result = await self.people_collection.delete_many({
        "updated_at": {"$lt": cutoff_date}
    })
    return result.deleted_count

캐싱 전략

TTL 기반 캐시

# 캐시 유효 기간 (7일)
CACHE_TTL_DAYS = 7

def _is_cache_fresh(self, cached_data: Dict[str, Any]) -> bool:
    """캐시 데이터가 신선한지 확인"""
    if not cached_data:
        return False

    updated_at = cached_data.get("updated_at")
    if not updated_at:
        return False

    if isinstance(updated_at, str):
        updated_at = datetime.fromisoformat(updated_at)

    expiry_date = updated_at + timedelta(days=CACHE_TTL_DAYS)
    return datetime.now() < expiry_date

갱신 정책

# 정책:
# - 7일이 지나면 갱신 시도 (삭제 아님)
# - API 호출 실패 시 기존 데이터 유지
# - 데이터 동일 시 확인 일자만 갱신

async def save_person(self, new_data: Dict, existing_data: Dict = None):
    """기존 데이터와 비교하여 적절히 처리"""
    if existing_data and existing_data.get("verified"):
        # 기존에 검증된 데이터가 있음
        if not new_data.get("birth_date") and existing_data.get("birth_date"):
            # 새 데이터가 덜 완전하면 기존 데이터 유지, 시간만 갱신
            await self.people_collection.update_one(
                {"wikipedia_url": existing_data["wikipedia_url"]},
                {"$set": {"updated_at": datetime.now()}}
            )
            return
    # 새 데이터로 갱신
    await self._upsert_person(new_data)

GridFS (대용량 파일)

오디오 파일 저장

from motor.motor_asyncio import AsyncIOMotorGridFSBucket

class AudioStorage:
    def __init__(self, db):
        self.fs = AsyncIOMotorGridFSBucket(db, bucket_name="audio")

    async def save_audio(self, audio_data: bytes, filename: str) -> str:
        """오디오 파일 저장"""
        file_id = await self.fs.upload_from_stream(
            filename,
            audio_data,
            metadata={"content_type": "audio/mpeg"}
        )
        return str(file_id)

    async def get_audio(self, file_id: str) -> bytes:
        """오디오 파일 조회"""
        grid_out = await self.fs.open_download_stream(ObjectId(file_id))
        return await grid_out.read()

백업 정책

규칙

  • 주기: 하루에 한 번 (daily)
  • 보관 기간: 최소 7일
  • 백업 위치: 프로젝트 루트의 ./backups/ 디렉토리

MongoDB 백업

# 백업 실행
BACKUP_NAME="mongodb_backup_$(date +%Y%m%d_%H%M%S)"
docker exec {프로젝트}-mongodb mongodump \
    --uri="mongodb://{user}:{password}@localhost:27017" \
    --authenticationDatabase=admin \
    --out="/tmp/$BACKUP_NAME"
docker cp {프로젝트}-mongodb:/tmp/$BACKUP_NAME ./backups/
echo "백업 완료: ./backups/$BACKUP_NAME"

MongoDB 복원

docker cp ./backups/$BACKUP_NAME {프로젝트}-mongodb:/tmp/
docker exec {프로젝트}-mongodb mongorestore \
    --uri="mongodb://{user}:{password}@localhost:27017" \
    --authenticationDatabase=admin \
    "/tmp/$BACKUP_NAME"

자동화 스크립트 (backup-mongodb.sh)

#!/bin/bash
PROJECT_NAME="{프로젝트명}"
BACKUP_DIR="{프로젝트경로}/backups"
BACKUP_NAME="mongodb_backup_$(date +%Y%m%d_%H%M%S)"

# 백업 실행
docker exec ${PROJECT_NAME}-mongodb mongodump \
    --uri="mongodb://{user}:{password}@localhost:27017" \
    --authenticationDatabase=admin \
    --out="/tmp/$BACKUP_NAME"

docker cp ${PROJECT_NAME}-mongodb:/tmp/$BACKUP_NAME $BACKUP_DIR/

# 7일 이상 된 백업 삭제
find $BACKUP_DIR -type d -name "mongodb_backup_*" -mtime +7 -exec rm -rf {} \;

echo "$(date): Backup completed - $BACKUP_NAME" >> $BACKUP_DIR/backup.log

cron 설정

# crontab -e
0 2 * * * /path/to/backup-mongodb.sh  # 매일 새벽 2시

주의사항

  • 새 프로젝트 생성 시 반드시 백업 스크립트 설정
  • 백업 디렉토리는 .gitignore에 추가하여 커밋 제외
  • 중요 데이터는 외부 스토리지에 추가 백업 권장

환경 변수

# .env
MONGODB_URL=mongodb://admin:password123@mongodb:27017/
DB_NAME=ai_writer_db
TARGET_COLLECTION=articles_en

# docker-compose.yml
environment:
  - MONGODB_URL=mongodb://${MONGO_USER}:${MONGO_PASSWORD}@mongodb:27017/
  - DB_NAME=ai_writer_db