Files
site11/services/search/backend/indexer.py
jungwoo choi dd165454f0 feat: Add Step 13 - Search System with Apache Solr and Data Persistence
- Implemented search service with Apache Solr instead of Elasticsearch
- Added full-text search, faceted search, and autocomplete capabilities
- Created data indexer for synchronizing data from MongoDB/Kafka to Solr
- Configured external volume mounts for all data services:
  - MongoDB, Redis, Kafka, Zookeeper, MinIO, Solr
  - All data now persists in ./data/ directory
- Added comprehensive search API endpoints
- Created documentation for data persistence and backup strategies

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-11 20:27:02 +09:00

286 lines
11 KiB
Python

"""
Data indexer for synchronizing data from other services to Solr
"""
import asyncio
import logging
from typing import Dict, Any, List
from motor.motor_asyncio import AsyncIOMotorClient
from aiokafka import AIOKafkaConsumer
import json
from solr_client import SolrClient
from datetime import datetime
logger = logging.getLogger(__name__)
class DataIndexer:
def __init__(self, solr_client: SolrClient, mongodb_url: str, kafka_servers: str):
self.solr = solr_client
self.mongodb_url = mongodb_url
self.kafka_servers = kafka_servers
self.mongo_client = None
self.kafka_consumer = None
self.running = False
async def start(self):
"""Start the indexer"""
try:
# Connect to MongoDB
self.mongo_client = AsyncIOMotorClient(self.mongodb_url)
# Initialize Kafka consumer
await self._init_kafka_consumer()
# Start background tasks
self.running = True
asyncio.create_task(self._consume_kafka_events())
asyncio.create_task(self._periodic_sync())
logger.info("Data indexer started")
except Exception as e:
logger.error(f"Failed to start indexer: {e}")
async def stop(self):
"""Stop the indexer"""
self.running = False
if self.kafka_consumer:
await self.kafka_consumer.stop()
if self.mongo_client:
self.mongo_client.close()
logger.info("Data indexer stopped")
async def _init_kafka_consumer(self):
"""Initialize Kafka consumer"""
try:
self.kafka_consumer = AIOKafkaConsumer(
'user_events',
'file_events',
'content_events',
bootstrap_servers=self.kafka_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='search_indexer',
auto_offset_reset='latest'
)
await self.kafka_consumer.start()
logger.info("Kafka consumer initialized")
except Exception as e:
logger.warning(f"Kafka consumer initialization failed: {e}")
self.kafka_consumer = None
async def _consume_kafka_events(self):
"""Consume events from Kafka and index them"""
if not self.kafka_consumer:
return
while self.running:
try:
async for msg in self.kafka_consumer:
await self._handle_kafka_event(msg.topic, msg.value)
except Exception as e:
logger.error(f"Kafka consumption error: {e}")
await asyncio.sleep(5)
async def _handle_kafka_event(self, topic: str, event: Dict[str, Any]):
"""Handle a Kafka event"""
try:
event_type = event.get('type')
data = event.get('data', {})
if topic == 'user_events':
await self._index_user_event(event_type, data)
elif topic == 'file_events':
await self._index_file_event(event_type, data)
elif topic == 'content_events':
await self._index_content_event(event_type, data)
except Exception as e:
logger.error(f"Failed to handle event: {e}")
async def _index_user_event(self, event_type: str, data: Dict):
"""Index user-related events"""
if event_type == 'user_created' or event_type == 'user_updated':
user_doc = {
'id': f"user_{data.get('user_id')}",
'doc_type': 'user',
'user_id': data.get('user_id'),
'username': data.get('username'),
'email': data.get('email'),
'name': data.get('name', ''),
'bio': data.get('bio', ''),
'tags': data.get('tags', []),
'created_at': data.get('created_at'),
'updated_at': datetime.utcnow().isoformat()
}
self.solr.index_document(user_doc)
elif event_type == 'user_deleted':
self.solr.delete_document(f"user_{data.get('user_id')}")
async def _index_file_event(self, event_type: str, data: Dict):
"""Index file-related events"""
if event_type == 'file_uploaded':
file_doc = {
'id': f"file_{data.get('file_id')}",
'doc_type': 'file',
'file_id': data.get('file_id'),
'filename': data.get('filename'),
'content_type': data.get('content_type'),
'size': data.get('size'),
'user_id': data.get('user_id'),
'tags': data.get('tags', []),
'description': data.get('description', ''),
'created_at': data.get('created_at'),
'updated_at': datetime.utcnow().isoformat()
}
self.solr.index_document(file_doc)
elif event_type == 'file_deleted':
self.solr.delete_document(f"file_{data.get('file_id')}")
async def _index_content_event(self, event_type: str, data: Dict):
"""Index content-related events"""
if event_type in ['content_created', 'content_updated']:
content_doc = {
'id': f"content_{data.get('content_id')}",
'doc_type': 'content',
'content_id': data.get('content_id'),
'title': data.get('title'),
'content': data.get('content', ''),
'summary': data.get('summary', ''),
'author_id': data.get('author_id'),
'tags': data.get('tags', []),
'category': data.get('category'),
'status': data.get('status', 'draft'),
'created_at': data.get('created_at'),
'updated_at': datetime.utcnow().isoformat()
}
self.solr.index_document(content_doc)
elif event_type == 'content_deleted':
self.solr.delete_document(f"content_{data.get('content_id')}")
async def _periodic_sync(self):
"""Periodically sync data from MongoDB"""
while self.running:
try:
# Sync every 5 minutes
await asyncio.sleep(300)
await self.sync_all_data()
except Exception as e:
logger.error(f"Periodic sync error: {e}")
async def sync_all_data(self):
"""Sync all data from MongoDB to Solr"""
try:
logger.info("Starting full data sync")
# Sync users
await self._sync_users()
# Sync files
await self._sync_files()
# Optimize index after bulk sync
self.solr.optimize_index()
logger.info("Full data sync completed")
except Exception as e:
logger.error(f"Full sync failed: {e}")
async def _sync_users(self):
"""Sync users from MongoDB"""
try:
db = self.mongo_client['users_db']
collection = db['users']
users = []
async for user in collection.find({'deleted_at': None}):
user_doc = {
'id': f"user_{str(user['_id'])}",
'doc_type': 'user',
'user_id': str(user['_id']),
'username': user.get('username'),
'email': user.get('email'),
'name': user.get('name', ''),
'bio': user.get('bio', ''),
'tags': user.get('tags', []),
'created_at': user.get('created_at').isoformat() if user.get('created_at') else None,
'updated_at': datetime.utcnow().isoformat()
}
users.append(user_doc)
# Bulk index every 100 documents
if len(users) >= 100:
self.solr.bulk_index(users, 'user')
users = []
# Index remaining users
if users:
self.solr.bulk_index(users, 'user')
logger.info(f"Synced users to Solr")
except Exception as e:
logger.error(f"Failed to sync users: {e}")
async def _sync_files(self):
"""Sync files from MongoDB"""
try:
db = self.mongo_client['files_db']
collection = db['file_metadata']
files = []
async for file in collection.find({'deleted_at': None}):
file_doc = {
'id': f"file_{str(file['_id'])}",
'doc_type': 'file',
'file_id': str(file['_id']),
'filename': file.get('filename'),
'original_name': file.get('original_name'),
'content_type': file.get('content_type'),
'size': file.get('size'),
'user_id': file.get('user_id'),
'tags': list(file.get('tags', {}).keys()),
'description': file.get('metadata', {}).get('description', ''),
'created_at': file.get('created_at').isoformat() if file.get('created_at') else None,
'updated_at': datetime.utcnow().isoformat()
}
files.append(file_doc)
# Bulk index every 100 documents
if len(files) >= 100:
self.solr.bulk_index(files, 'file')
files = []
# Index remaining files
if files:
self.solr.bulk_index(files, 'file')
logger.info(f"Synced files to Solr")
except Exception as e:
logger.error(f"Failed to sync files: {e}")
async def reindex_collection(self, collection_name: str, doc_type: str):
"""Reindex a specific collection"""
try:
# Delete existing documents of this type
self.solr.delete_by_query(f'doc_type:{doc_type}')
# Sync the collection
if collection_name == 'users':
await self._sync_users()
elif collection_name == 'files':
await self._sync_files()
logger.info(f"Reindexed {collection_name}")
except Exception as e:
logger.error(f"Failed to reindex {collection_name}: {e}")