# MongoDB 설계 패턴 (Database Patterns) 이 프로젝트의 MongoDB 설계 및 사용 패턴입니다. ## 연결 설정 ### Motor (async driver) ```python from motor.motor_asyncio import AsyncIOMotorClient class WikipediaEnrichmentWorker: def __init__(self): self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017") self.db_name = os.getenv("DB_NAME", "ai_writer_db") self.db = None async def start(self): client = AsyncIOMotorClient(self.mongodb_url) self.db = client[self.db_name] ``` ### PyMongo (sync driver) ```python from pymongo import MongoClient from pymongo.database import Database client: MongoClient = None db: Database = None def connect_to_mongo(): global client, db try: client = MongoClient(MONGODB_URL) db = client[DATABASE_NAME] client.admin.command('ping') # 연결 테스트 print(f"Successfully connected to MongoDB: {DATABASE_NAME}") except Exception as e: print(f"Error connecting to MongoDB: {e}") raise e ``` ## 컬렉션 설계 ### 기사 컬렉션 (articles_en) ```json { "_id": "ObjectId", "news_id": "unique_id", "title": "Article Title", "summary": "One-line summary", "subtopics": [ { "title": "Subtopic 1", "content": ["Paragraph 1", "Paragraph 2"] } ], "categories": ["Category1", "Category2"], "entities": { "people": [ { "name": "Person Name", "context": ["role", "company"], "birth_date": "1990-01-15", "wikipedia_url": "https://...", "image_urls": ["https://..."], "verified": true } ], "organizations": [ { "name": "Organization Name", "context": ["industry", "type"], "founding_date": "2004-02-04", "wikipedia_url": "https://...", "image_urls": ["https://..."], "verified": true } ] }, "wikipedia_enriched": true, "wikipedia_enriched_at": "2024-01-15T10:30:00", "created_at": "2024-01-15T10:00:00", "updated_at": "2024-01-15T10:30:00" } ``` ### 엔티티 캐시 컬렉션 (entity_people) ```json { "_id": "ObjectId", "name": "Elon Musk", "context": ["Tesla", "SpaceX", "CEO"], "birth_date": "1971-06-28", "wikipedia_url": "https://en.wikipedia.org/wiki/Elon_Musk", "image_urls": ["https://..."], "verified": true, "created_at": "2024-01-10T00:00:00", "updated_at": "2024-01-15T10:30:00" } ``` ## 인덱스 설계 ### 인덱스 생성 패턴 ```python class EntityCache: async def ensure_indexes(self): """인덱스 생성 (이미 존재하면 무시)""" try: # wikipedia_url이 unique key (동명이인 구분) try: await self.people_collection.create_index( "wikipedia_url", unique=True, sparse=True ) except Exception: pass # 이미 존재 # 이름으로 검색용 (동명이인 가능) try: await self.people_collection.create_index("name") except Exception: pass # context 검색용 try: await self.people_collection.create_index("context") except Exception: pass # TTL 정책용 try: await self.people_collection.create_index("updated_at") except Exception: pass logger.info("Entity cache indexes ensured") except Exception as e: logger.warning(f"Error ensuring indexes: {e}") ``` ## CRUD 패턴 ### Create (삽입) ```python async def save_person(self, data: Dict[str, Any]) -> bool: """인물 정보 저장/갱신 (wikipedia_url 기준)""" now = datetime.now() update_doc = { "name": data.get("name"), "context": data.get("context", []), "birth_date": data.get("birth_date"), "wikipedia_url": data.get("wikipedia_url"), "image_urls": data.get("image_urls", []), "verified": data.get("verified", False), "updated_at": now } if data.get("wikipedia_url"): # upsert: 있으면 업데이트, 없으면 삽입 result = await self.people_collection.update_one( {"wikipedia_url": data["wikipedia_url"]}, { "$set": update_doc, "$setOnInsert": {"created_at": now} }, upsert=True ) return result.modified_count > 0 or result.upserted_id is not None ``` ### Read (조회) ```python async def get_person(self, name: str, context: List[str] = None) -> Tuple[Optional[Dict], bool]: """ 인물 정보 조회 (context 기반 최적 매칭) Returns: Tuple of (cached_data, needs_refresh) """ # 이름으로 모든 후보 검색 cursor = self.people_collection.find({"name": {"$regex": f"^{name}$", "$options": "i"}}) candidates = await cursor.to_list(length=10) if not candidates: return None, True # context가 있으면 최적 후보 선택 if context: best_match = None best_score = -1 for candidate in candidates: score = self._calculate_context_match_score( candidate.get("context", []), context ) if score > best_score: best_score = score best_match = candidate if best_match and best_score >= MIN_CONTEXT_MATCH: needs_refresh = not self._is_cache_fresh(best_match) return best_match, needs_refresh # context 없으면 첫 번째 후보 반환 candidate = candidates[0] needs_refresh = not self._is_cache_fresh(candidate) return candidate, needs_refresh ``` ### Update (수정) ```python async def update_article(self, mongodb_id: str, update_data: Dict[str, Any]): """기사 정보 업데이트""" result = await self.collection.update_one( {"_id": ObjectId(mongodb_id)}, { "$set": { "entities.people": update_data.get("people", []), "entities.organizations": update_data.get("organizations", []), "wikipedia_enriched": True, "wikipedia_enriched_at": datetime.now().isoformat() } } ) return result.modified_count > 0 ``` ### Delete (삭제) ```python async def delete_old_cache(self, days: int = 30): """오래된 캐시 데이터 삭제""" cutoff_date = datetime.now() - timedelta(days=days) result = await self.people_collection.delete_many({ "updated_at": {"$lt": cutoff_date} }) return result.deleted_count ``` ## 캐싱 전략 ### TTL 기반 캐시 ```python # 캐시 유효 기간 (7일) CACHE_TTL_DAYS = 7 def _is_cache_fresh(self, cached_data: Dict[str, Any]) -> bool: """캐시 데이터가 신선한지 확인""" if not cached_data: return False updated_at = cached_data.get("updated_at") if not updated_at: return False if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at) expiry_date = updated_at + timedelta(days=CACHE_TTL_DAYS) return datetime.now() < expiry_date ``` ### 갱신 정책 ```python # 정책: # - 7일이 지나면 갱신 시도 (삭제 아님) # - API 호출 실패 시 기존 데이터 유지 # - 데이터 동일 시 확인 일자만 갱신 async def save_person(self, new_data: Dict, existing_data: Dict = None): """기존 데이터와 비교하여 적절히 처리""" if existing_data and existing_data.get("verified"): # 기존에 검증된 데이터가 있음 if not new_data.get("birth_date") and existing_data.get("birth_date"): # 새 데이터가 덜 완전하면 기존 데이터 유지, 시간만 갱신 await self.people_collection.update_one( {"wikipedia_url": existing_data["wikipedia_url"]}, {"$set": {"updated_at": datetime.now()}} ) return # 새 데이터로 갱신 await self._upsert_person(new_data) ``` ## GridFS (대용량 파일) ### 오디오 파일 저장 ```python from motor.motor_asyncio import AsyncIOMotorGridFSBucket class AudioStorage: def __init__(self, db): self.fs = AsyncIOMotorGridFSBucket(db, bucket_name="audio") async def save_audio(self, audio_data: bytes, filename: str) -> str: """오디오 파일 저장""" file_id = await self.fs.upload_from_stream( filename, audio_data, metadata={"content_type": "audio/mpeg"} ) return str(file_id) async def get_audio(self, file_id: str) -> bytes: """오디오 파일 조회""" grid_out = await self.fs.open_download_stream(ObjectId(file_id)) return await grid_out.read() ``` ## 백업 정책 ### 규칙 - **주기**: 하루에 한 번 (daily) - **보관 기간**: 최소 7일 - **백업 위치**: 프로젝트 루트의 `./backups/` 디렉토리 ### MongoDB 백업 ```bash # 백업 실행 BACKUP_NAME="mongodb_backup_$(date +%Y%m%d_%H%M%S)" docker exec {프로젝트}-mongodb mongodump \ --uri="mongodb://{user}:{password}@localhost:27017" \ --authenticationDatabase=admin \ --out="/tmp/$BACKUP_NAME" docker cp {프로젝트}-mongodb:/tmp/$BACKUP_NAME ./backups/ echo "백업 완료: ./backups/$BACKUP_NAME" ``` ### MongoDB 복원 ```bash docker cp ./backups/$BACKUP_NAME {프로젝트}-mongodb:/tmp/ docker exec {프로젝트}-mongodb mongorestore \ --uri="mongodb://{user}:{password}@localhost:27017" \ --authenticationDatabase=admin \ "/tmp/$BACKUP_NAME" ``` ### 자동화 스크립트 (backup-mongodb.sh) ```bash #!/bin/bash PROJECT_NAME="{프로젝트명}" BACKUP_DIR="{프로젝트경로}/backups" BACKUP_NAME="mongodb_backup_$(date +%Y%m%d_%H%M%S)" # 백업 실행 docker exec ${PROJECT_NAME}-mongodb mongodump \ --uri="mongodb://{user}:{password}@localhost:27017" \ --authenticationDatabase=admin \ --out="/tmp/$BACKUP_NAME" docker cp ${PROJECT_NAME}-mongodb:/tmp/$BACKUP_NAME $BACKUP_DIR/ # 7일 이상 된 백업 삭제 find $BACKUP_DIR -type d -name "mongodb_backup_*" -mtime +7 -exec rm -rf {} \; echo "$(date): Backup completed - $BACKUP_NAME" >> $BACKUP_DIR/backup.log ``` ### cron 설정 ```bash # crontab -e 0 2 * * * /path/to/backup-mongodb.sh # 매일 새벽 2시 ``` ### 주의사항 - 새 프로젝트 생성 시 반드시 백업 스크립트 설정 - 백업 디렉토리는 .gitignore에 추가하여 커밋 제외 - 중요 데이터는 외부 스토리지에 추가 백업 권장 --- ## 환경 변수 ```bash # .env MONGODB_URL=mongodb://admin:password123@mongodb:27017/ DB_NAME=ai_writer_db TARGET_COLLECTION=articles_en # docker-compose.yml environment: - MONGODB_URL=mongodb://${MONGO_USER}:${MONGO_PASSWORD}@mongodb:27017/ - DB_NAME=ai_writer_db ```