385 lines
11 KiB
Markdown
385 lines
11 KiB
Markdown
# MongoDB 설계 패턴 (Database Patterns)
|
|
|
|
이 프로젝트의 MongoDB 설계 및 사용 패턴입니다.
|
|
|
|
## 연결 설정
|
|
|
|
### Motor (async driver)
|
|
```python
|
|
from motor.motor_asyncio import AsyncIOMotorClient
|
|
|
|
class WikipediaEnrichmentWorker:
|
|
def __init__(self):
|
|
self.mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017")
|
|
self.db_name = os.getenv("DB_NAME", "ai_writer_db")
|
|
self.db = None
|
|
|
|
async def start(self):
|
|
client = AsyncIOMotorClient(self.mongodb_url)
|
|
self.db = client[self.db_name]
|
|
```
|
|
|
|
### PyMongo (sync driver)
|
|
```python
|
|
from pymongo import MongoClient
|
|
from pymongo.database import Database
|
|
|
|
client: MongoClient = None
|
|
db: Database = None
|
|
|
|
def connect_to_mongo():
|
|
global client, db
|
|
try:
|
|
client = MongoClient(MONGODB_URL)
|
|
db = client[DATABASE_NAME]
|
|
client.admin.command('ping') # 연결 테스트
|
|
print(f"Successfully connected to MongoDB: {DATABASE_NAME}")
|
|
except Exception as e:
|
|
print(f"Error connecting to MongoDB: {e}")
|
|
raise e
|
|
```
|
|
|
|
## 컬렉션 설계
|
|
|
|
### 기사 컬렉션 (articles_en)
|
|
```json
|
|
{
|
|
"_id": "ObjectId",
|
|
"news_id": "unique_id",
|
|
"title": "Article Title",
|
|
"summary": "One-line summary",
|
|
"subtopics": [
|
|
{
|
|
"title": "Subtopic 1",
|
|
"content": ["Paragraph 1", "Paragraph 2"]
|
|
}
|
|
],
|
|
"categories": ["Category1", "Category2"],
|
|
"entities": {
|
|
"people": [
|
|
{
|
|
"name": "Person Name",
|
|
"context": ["role", "company"],
|
|
"birth_date": "1990-01-15",
|
|
"wikipedia_url": "https://...",
|
|
"image_urls": ["https://..."],
|
|
"verified": true
|
|
}
|
|
],
|
|
"organizations": [
|
|
{
|
|
"name": "Organization Name",
|
|
"context": ["industry", "type"],
|
|
"founding_date": "2004-02-04",
|
|
"wikipedia_url": "https://...",
|
|
"image_urls": ["https://..."],
|
|
"verified": true
|
|
}
|
|
]
|
|
},
|
|
"wikipedia_enriched": true,
|
|
"wikipedia_enriched_at": "2024-01-15T10:30:00",
|
|
"created_at": "2024-01-15T10:00:00",
|
|
"updated_at": "2024-01-15T10:30:00"
|
|
}
|
|
```
|
|
|
|
### 엔티티 캐시 컬렉션 (entity_people)
|
|
```json
|
|
{
|
|
"_id": "ObjectId",
|
|
"name": "Elon Musk",
|
|
"context": ["Tesla", "SpaceX", "CEO"],
|
|
"birth_date": "1971-06-28",
|
|
"wikipedia_url": "https://en.wikipedia.org/wiki/Elon_Musk",
|
|
"image_urls": ["https://..."],
|
|
"verified": true,
|
|
"created_at": "2024-01-10T00:00:00",
|
|
"updated_at": "2024-01-15T10:30:00"
|
|
}
|
|
```
|
|
|
|
## 인덱스 설계
|
|
|
|
### 인덱스 생성 패턴
|
|
```python
|
|
class EntityCache:
|
|
async def ensure_indexes(self):
|
|
"""인덱스 생성 (이미 존재하면 무시)"""
|
|
try:
|
|
# wikipedia_url이 unique key (동명이인 구분)
|
|
try:
|
|
await self.people_collection.create_index(
|
|
"wikipedia_url", unique=True, sparse=True
|
|
)
|
|
except Exception:
|
|
pass # 이미 존재
|
|
|
|
# 이름으로 검색용 (동명이인 가능)
|
|
try:
|
|
await self.people_collection.create_index("name")
|
|
except Exception:
|
|
pass
|
|
|
|
# context 검색용
|
|
try:
|
|
await self.people_collection.create_index("context")
|
|
except Exception:
|
|
pass
|
|
|
|
# TTL 정책용
|
|
try:
|
|
await self.people_collection.create_index("updated_at")
|
|
except Exception:
|
|
pass
|
|
|
|
logger.info("Entity cache indexes ensured")
|
|
except Exception as e:
|
|
logger.warning(f"Error ensuring indexes: {e}")
|
|
```
|
|
|
|
## CRUD 패턴
|
|
|
|
### Create (삽입)
|
|
```python
|
|
async def save_person(self, data: Dict[str, Any]) -> bool:
|
|
"""인물 정보 저장/갱신 (wikipedia_url 기준)"""
|
|
now = datetime.now()
|
|
|
|
update_doc = {
|
|
"name": data.get("name"),
|
|
"context": data.get("context", []),
|
|
"birth_date": data.get("birth_date"),
|
|
"wikipedia_url": data.get("wikipedia_url"),
|
|
"image_urls": data.get("image_urls", []),
|
|
"verified": data.get("verified", False),
|
|
"updated_at": now
|
|
}
|
|
|
|
if data.get("wikipedia_url"):
|
|
# upsert: 있으면 업데이트, 없으면 삽입
|
|
result = await self.people_collection.update_one(
|
|
{"wikipedia_url": data["wikipedia_url"]},
|
|
{
|
|
"$set": update_doc,
|
|
"$setOnInsert": {"created_at": now}
|
|
},
|
|
upsert=True
|
|
)
|
|
return result.modified_count > 0 or result.upserted_id is not None
|
|
```
|
|
|
|
### Read (조회)
|
|
```python
|
|
async def get_person(self, name: str, context: List[str] = None) -> Tuple[Optional[Dict], bool]:
|
|
"""
|
|
인물 정보 조회 (context 기반 최적 매칭)
|
|
|
|
Returns:
|
|
Tuple of (cached_data, needs_refresh)
|
|
"""
|
|
# 이름으로 모든 후보 검색
|
|
cursor = self.people_collection.find({"name": {"$regex": f"^{name}$", "$options": "i"}})
|
|
candidates = await cursor.to_list(length=10)
|
|
|
|
if not candidates:
|
|
return None, True
|
|
|
|
# context가 있으면 최적 후보 선택
|
|
if context:
|
|
best_match = None
|
|
best_score = -1
|
|
|
|
for candidate in candidates:
|
|
score = self._calculate_context_match_score(
|
|
candidate.get("context", []), context
|
|
)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_match = candidate
|
|
|
|
if best_match and best_score >= MIN_CONTEXT_MATCH:
|
|
needs_refresh = not self._is_cache_fresh(best_match)
|
|
return best_match, needs_refresh
|
|
|
|
# context 없으면 첫 번째 후보 반환
|
|
candidate = candidates[0]
|
|
needs_refresh = not self._is_cache_fresh(candidate)
|
|
return candidate, needs_refresh
|
|
```
|
|
|
|
### Update (수정)
|
|
```python
|
|
async def update_article(self, mongodb_id: str, update_data: Dict[str, Any]):
|
|
"""기사 정보 업데이트"""
|
|
result = await self.collection.update_one(
|
|
{"_id": ObjectId(mongodb_id)},
|
|
{
|
|
"$set": {
|
|
"entities.people": update_data.get("people", []),
|
|
"entities.organizations": update_data.get("organizations", []),
|
|
"wikipedia_enriched": True,
|
|
"wikipedia_enriched_at": datetime.now().isoformat()
|
|
}
|
|
}
|
|
)
|
|
return result.modified_count > 0
|
|
```
|
|
|
|
### Delete (삭제)
|
|
```python
|
|
async def delete_old_cache(self, days: int = 30):
|
|
"""오래된 캐시 데이터 삭제"""
|
|
cutoff_date = datetime.now() - timedelta(days=days)
|
|
result = await self.people_collection.delete_many({
|
|
"updated_at": {"$lt": cutoff_date}
|
|
})
|
|
return result.deleted_count
|
|
```
|
|
|
|
## 캐싱 전략
|
|
|
|
### TTL 기반 캐시
|
|
```python
|
|
# 캐시 유효 기간 (7일)
|
|
CACHE_TTL_DAYS = 7
|
|
|
|
def _is_cache_fresh(self, cached_data: Dict[str, Any]) -> bool:
|
|
"""캐시 데이터가 신선한지 확인"""
|
|
if not cached_data:
|
|
return False
|
|
|
|
updated_at = cached_data.get("updated_at")
|
|
if not updated_at:
|
|
return False
|
|
|
|
if isinstance(updated_at, str):
|
|
updated_at = datetime.fromisoformat(updated_at)
|
|
|
|
expiry_date = updated_at + timedelta(days=CACHE_TTL_DAYS)
|
|
return datetime.now() < expiry_date
|
|
```
|
|
|
|
### 갱신 정책
|
|
```python
|
|
# 정책:
|
|
# - 7일이 지나면 갱신 시도 (삭제 아님)
|
|
# - API 호출 실패 시 기존 데이터 유지
|
|
# - 데이터 동일 시 확인 일자만 갱신
|
|
|
|
async def save_person(self, new_data: Dict, existing_data: Dict = None):
|
|
"""기존 데이터와 비교하여 적절히 처리"""
|
|
if existing_data and existing_data.get("verified"):
|
|
# 기존에 검증된 데이터가 있음
|
|
if not new_data.get("birth_date") and existing_data.get("birth_date"):
|
|
# 새 데이터가 덜 완전하면 기존 데이터 유지, 시간만 갱신
|
|
await self.people_collection.update_one(
|
|
{"wikipedia_url": existing_data["wikipedia_url"]},
|
|
{"$set": {"updated_at": datetime.now()}}
|
|
)
|
|
return
|
|
# 새 데이터로 갱신
|
|
await self._upsert_person(new_data)
|
|
```
|
|
|
|
## GridFS (대용량 파일)
|
|
|
|
### 오디오 파일 저장
|
|
```python
|
|
from motor.motor_asyncio import AsyncIOMotorGridFSBucket
|
|
|
|
class AudioStorage:
|
|
def __init__(self, db):
|
|
self.fs = AsyncIOMotorGridFSBucket(db, bucket_name="audio")
|
|
|
|
async def save_audio(self, audio_data: bytes, filename: str) -> str:
|
|
"""오디오 파일 저장"""
|
|
file_id = await self.fs.upload_from_stream(
|
|
filename,
|
|
audio_data,
|
|
metadata={"content_type": "audio/mpeg"}
|
|
)
|
|
return str(file_id)
|
|
|
|
async def get_audio(self, file_id: str) -> bytes:
|
|
"""오디오 파일 조회"""
|
|
grid_out = await self.fs.open_download_stream(ObjectId(file_id))
|
|
return await grid_out.read()
|
|
```
|
|
|
|
## 백업 정책
|
|
|
|
### 규칙
|
|
- **주기**: 하루에 한 번 (daily)
|
|
- **보관 기간**: 최소 7일
|
|
- **백업 위치**: 프로젝트 루트의 `./backups/` 디렉토리
|
|
|
|
### MongoDB 백업
|
|
```bash
|
|
# 백업 실행
|
|
BACKUP_NAME="mongodb_backup_$(date +%Y%m%d_%H%M%S)"
|
|
docker exec {프로젝트}-mongodb mongodump \
|
|
--uri="mongodb://{user}:{password}@localhost:27017" \
|
|
--authenticationDatabase=admin \
|
|
--out="/tmp/$BACKUP_NAME"
|
|
docker cp {프로젝트}-mongodb:/tmp/$BACKUP_NAME ./backups/
|
|
echo "백업 완료: ./backups/$BACKUP_NAME"
|
|
```
|
|
|
|
### MongoDB 복원
|
|
```bash
|
|
docker cp ./backups/$BACKUP_NAME {프로젝트}-mongodb:/tmp/
|
|
docker exec {프로젝트}-mongodb mongorestore \
|
|
--uri="mongodb://{user}:{password}@localhost:27017" \
|
|
--authenticationDatabase=admin \
|
|
"/tmp/$BACKUP_NAME"
|
|
```
|
|
|
|
### 자동화 스크립트 (backup-mongodb.sh)
|
|
```bash
|
|
#!/bin/bash
|
|
PROJECT_NAME="{프로젝트명}"
|
|
BACKUP_DIR="{프로젝트경로}/backups"
|
|
BACKUP_NAME="mongodb_backup_$(date +%Y%m%d_%H%M%S)"
|
|
|
|
# 백업 실행
|
|
docker exec ${PROJECT_NAME}-mongodb mongodump \
|
|
--uri="mongodb://{user}:{password}@localhost:27017" \
|
|
--authenticationDatabase=admin \
|
|
--out="/tmp/$BACKUP_NAME"
|
|
|
|
docker cp ${PROJECT_NAME}-mongodb:/tmp/$BACKUP_NAME $BACKUP_DIR/
|
|
|
|
# 7일 이상 된 백업 삭제
|
|
find $BACKUP_DIR -type d -name "mongodb_backup_*" -mtime +7 -exec rm -rf {} \;
|
|
|
|
echo "$(date): Backup completed - $BACKUP_NAME" >> $BACKUP_DIR/backup.log
|
|
```
|
|
|
|
### cron 설정
|
|
```bash
|
|
# crontab -e
|
|
0 2 * * * /path/to/backup-mongodb.sh # 매일 새벽 2시
|
|
```
|
|
|
|
### 주의사항
|
|
- 새 프로젝트 생성 시 반드시 백업 스크립트 설정
|
|
- 백업 디렉토리는 .gitignore에 추가하여 커밋 제외
|
|
- 중요 데이터는 외부 스토리지에 추가 백업 권장
|
|
|
|
---
|
|
|
|
## 환경 변수
|
|
|
|
```bash
|
|
# .env
|
|
MONGODB_URL=mongodb://admin:password123@mongodb:27017/
|
|
DB_NAME=ai_writer_db
|
|
TARGET_COLLECTION=articles_en
|
|
|
|
# docker-compose.yml
|
|
environment:
|
|
- MONGODB_URL=mongodb://${MONGO_USER}:${MONGO_PASSWORD}@mongodb:27017/
|
|
- DB_NAME=ai_writer_db
|
|
```
|