diff --git a/.gitignore b/.gitignore index 0e44b5b..468a283 100644 --- a/.gitignore +++ b/.gitignore @@ -64,4 +64,4 @@ temp/ *.pem *.key *.crt -secrets/ \ No newline at end of file +secrets/data/ diff --git a/docker-compose.yml b/docker-compose.yml index 01c70b5..858e099 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -76,7 +76,7 @@ services: - CONVERT_TO_WEBP=true volumes: - ./services/images/backend:/app - - images_cache:/app/cache + - ./data/images-cache:/app/cache networks: - site11_network restart: unless-stopped @@ -118,8 +118,8 @@ services: ports: - "${MONGODB_PORT}:27017" volumes: - - mongodb_data:/data/db - - mongodb_config:/data/configdb + - ./data/mongodb:/data/db + - ./data/mongodb/configdb:/data/configdb networks: - site11_network restart: unless-stopped @@ -135,7 +135,7 @@ services: ports: - "${REDIS_PORT}:6379" volumes: - - redis_data:/data + - ./data/redis:/data networks: - site11_network restart: unless-stopped @@ -154,8 +154,8 @@ services: ports: - "${KAFKA_ZOOKEEPER_PORT}:2181" volumes: - - zookeeper_data:/var/lib/zookeeper/data - - zookeeper_logs:/var/lib/zookeeper/log + - ./data/zookeeper/data:/var/lib/zookeeper/data + - ./data/zookeeper/logs:/var/lib/zookeeper/log networks: - site11_network restart: unless-stopped @@ -181,7 +181,7 @@ services: KAFKA_JMX_HOSTNAME: localhost KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' volumes: - - kafka_data:/var/lib/kafka/data + - ./data/kafka:/var/lib/kafka/data networks: - site11_network restart: unless-stopped @@ -234,7 +234,7 @@ services: - MINIO_ROOT_USER=${MINIO_ROOT_USER:-minioadmin} - MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD:-minioadmin} volumes: - - minio_data:/data + - ./data/minio:/data command: server /data --console-address ":9001" networks: - site11_network @@ -264,7 +264,7 @@ services: - MINIO_SECURE=false volumes: - ./services/files/backend:/app - - files_temp:/tmp + - ./data/files-temp:/tmp networks: - site11_network restart: unless-stopped @@ -277,6 +277,57 @@ services: timeout: 10s retries: 3 + # Apache Solr Search Engine + solr: + image: solr:9.4 + container_name: ${COMPOSE_PROJECT_NAME}_solr + ports: + - "8983:8983" + volumes: + - ./data/solr:/var/solr + - ./services/search/solr-config:/opt/solr/server/solr/configsets/site11_config + command: + - solr-precreate + - site11 + - /opt/solr/server/solr/configsets/site11_config + networks: + - site11_network + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8983/solr/site11/admin/ping"] + interval: 30s + timeout: 10s + retries: 3 + + # Search Service + search-backend: + build: + context: ./services/search/backend + dockerfile: Dockerfile + container_name: ${COMPOSE_PROJECT_NAME}_search_backend + ports: + - "8015:8000" + environment: + - ENV=${ENV} + - PORT=8000 + - SOLR_URL=http://solr:8983/solr + - MONGODB_URL=${MONGODB_URL} + - KAFKA_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS} + volumes: + - ./services/search/backend:/app + networks: + - site11_network + restart: unless-stopped + depends_on: + - solr + - mongodb + - kafka + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + # Statistics Service statistics-backend: build: @@ -308,13 +359,15 @@ networks: driver: bridge name: site11_network -volumes: - mongodb_data: - mongodb_config: - redis_data: - images_cache: - zookeeper_data: - zookeeper_logs: - kafka_data: - minio_data: - files_temp: \ No newline at end of file +# Named volumes are replaced with bind mounts in ./data/ directory +# volumes: +# mongodb_data: +# mongodb_config: +# redis_data: +# images_cache: +# zookeeper_data: +# zookeeper_logs: +# kafka_data: +# minio_data: +# files_temp: +# solr_data: \ No newline at end of file diff --git a/docs/DATA_PERSISTENCE.md b/docs/DATA_PERSISTENCE.md new file mode 100644 index 0000000..eccb185 --- /dev/null +++ b/docs/DATA_PERSISTENCE.md @@ -0,0 +1,140 @@ +# Data Persistence Configuration + +## Overview +All data services are configured to use bind mounts to local directories for data persistence. This ensures data survives container restarts and rebuilds. + +## Directory Structure +``` +data/ +├── mongodb/ # MongoDB database files +├── redis/ # Redis persistence files +├── kafka/ # Kafka log data +├── zookeeper/ # Zookeeper data and logs +│ ├── data/ +│ └── logs/ +├── minio/ # MinIO object storage +├── solr/ # Solr search index +├── files-temp/ # Temporary file storage +└── images-cache/ # Image processing cache +``` + +## Volume Mappings + +### MongoDB +- `./data/mongodb:/data/db` - Database files +- `./data/mongodb/configdb:/data/configdb` - Configuration database + +### Redis +- `./data/redis:/data` - RDB snapshots and AOF logs + +### Kafka +- `./data/kafka:/var/lib/kafka/data` - Message logs + +### Zookeeper +- `./data/zookeeper/data:/var/lib/zookeeper/data` - Coordination data +- `./data/zookeeper/logs:/var/lib/zookeeper/log` - Transaction logs + +### MinIO +- `./data/minio:/data` - Object storage buckets + +### Solr +- `./data/solr:/var/solr` - Search index and configuration + +### Application Caches +- `./data/files-temp:/tmp` - Temporary file processing +- `./data/images-cache:/app/cache` - Processed image cache + +## Backup and Restore + +### Backup All Data +```bash +# Stop services +docker-compose down + +# Create backup +tar -czf backup-$(date +%Y%m%d).tar.gz data/ + +# Restart services +docker-compose up -d +``` + +### Restore Data +```bash +# Stop services +docker-compose down + +# Extract backup +tar -xzf backup-YYYYMMDD.tar.gz + +# Restart services +docker-compose up -d +``` + +### Individual Service Backups + +#### MongoDB Backup +```bash +docker exec site11_mongodb mongodump --out /data/db/backup +tar -czf mongodb-backup.tar.gz data/mongodb/backup/ +``` + +#### Redis Backup +```bash +docker exec site11_redis redis-cli BGSAVE +# Wait for completion +cp data/redis/dump.rdb redis-backup-$(date +%Y%m%d).rdb +``` + +## Permissions +Ensure proper permissions for data directories: +```bash +# Set appropriate permissions +chmod -R 755 data/ +``` + +## Disk Space Monitoring +Monitor disk usage regularly: +```bash +# Check data directory size +du -sh data/* + +# Check individual services +du -sh data/mongodb +du -sh data/minio +du -sh data/kafka +``` + +## Clean Up Old Data + +### Clear Kafka Logs (older than 7 days) +```bash +docker exec site11_kafka kafka-log-dirs.sh --describe --bootstrap-server localhost:9092 +``` + +### Clear Image Cache +```bash +rm -rf data/images-cache/* +``` + +### Clear Temporary Files +```bash +rm -rf data/files-temp/* +``` + +## Migration from Docker Volumes +If migrating from named Docker volumes to bind mounts: + +1. Export data from Docker volumes: +```bash +docker run --rm -v site11_mongodb_data:/source -v $(pwd)/data/mongodb:/dest alpine cp -av /source/. /dest/ +``` + +2. Update docker-compose.yml (already done) + +3. Restart services with new configuration + +## Notes +- The `data/` directory is excluded from git via .gitignore +- Ensure sufficient disk space for data growth +- Consider setting up automated backups for production +- Monitor disk I/O performance for database services \ No newline at end of file diff --git a/services/search/backend/Dockerfile b/services/search/backend/Dockerfile new file mode 100644 index 0000000..f1904f4 --- /dev/null +++ b/services/search/backend/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create necessary directories +RUN mkdir -p /app/logs + +# Run the application +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] \ No newline at end of file diff --git a/services/search/backend/indexer.py b/services/search/backend/indexer.py new file mode 100644 index 0000000..f989c0b --- /dev/null +++ b/services/search/backend/indexer.py @@ -0,0 +1,286 @@ +""" +Data indexer for synchronizing data from other services to Solr +""" +import asyncio +import logging +from typing import Dict, Any, List +from motor.motor_asyncio import AsyncIOMotorClient +from aiokafka import AIOKafkaConsumer +import json +from solr_client import SolrClient +from datetime import datetime + +logger = logging.getLogger(__name__) + +class DataIndexer: + def __init__(self, solr_client: SolrClient, mongodb_url: str, kafka_servers: str): + self.solr = solr_client + self.mongodb_url = mongodb_url + self.kafka_servers = kafka_servers + self.mongo_client = None + self.kafka_consumer = None + self.running = False + + async def start(self): + """Start the indexer""" + try: + # Connect to MongoDB + self.mongo_client = AsyncIOMotorClient(self.mongodb_url) + + # Initialize Kafka consumer + await self._init_kafka_consumer() + + # Start background tasks + self.running = True + asyncio.create_task(self._consume_kafka_events()) + asyncio.create_task(self._periodic_sync()) + + logger.info("Data indexer started") + + except Exception as e: + logger.error(f"Failed to start indexer: {e}") + + async def stop(self): + """Stop the indexer""" + self.running = False + + if self.kafka_consumer: + await self.kafka_consumer.stop() + + if self.mongo_client: + self.mongo_client.close() + + logger.info("Data indexer stopped") + + async def _init_kafka_consumer(self): + """Initialize Kafka consumer""" + try: + self.kafka_consumer = AIOKafkaConsumer( + 'user_events', + 'file_events', + 'content_events', + bootstrap_servers=self.kafka_servers, + value_deserializer=lambda m: json.loads(m.decode('utf-8')), + group_id='search_indexer', + auto_offset_reset='latest' + ) + await self.kafka_consumer.start() + logger.info("Kafka consumer initialized") + + except Exception as e: + logger.warning(f"Kafka consumer initialization failed: {e}") + self.kafka_consumer = None + + async def _consume_kafka_events(self): + """Consume events from Kafka and index them""" + if not self.kafka_consumer: + return + + while self.running: + try: + async for msg in self.kafka_consumer: + await self._handle_kafka_event(msg.topic, msg.value) + + except Exception as e: + logger.error(f"Kafka consumption error: {e}") + await asyncio.sleep(5) + + async def _handle_kafka_event(self, topic: str, event: Dict[str, Any]): + """Handle a Kafka event""" + try: + event_type = event.get('type') + data = event.get('data', {}) + + if topic == 'user_events': + await self._index_user_event(event_type, data) + elif topic == 'file_events': + await self._index_file_event(event_type, data) + elif topic == 'content_events': + await self._index_content_event(event_type, data) + + except Exception as e: + logger.error(f"Failed to handle event: {e}") + + async def _index_user_event(self, event_type: str, data: Dict): + """Index user-related events""" + if event_type == 'user_created' or event_type == 'user_updated': + user_doc = { + 'id': f"user_{data.get('user_id')}", + 'doc_type': 'user', + 'user_id': data.get('user_id'), + 'username': data.get('username'), + 'email': data.get('email'), + 'name': data.get('name', ''), + 'bio': data.get('bio', ''), + 'tags': data.get('tags', []), + 'created_at': data.get('created_at'), + 'updated_at': datetime.utcnow().isoformat() + } + self.solr.index_document(user_doc) + + elif event_type == 'user_deleted': + self.solr.delete_document(f"user_{data.get('user_id')}") + + async def _index_file_event(self, event_type: str, data: Dict): + """Index file-related events""" + if event_type == 'file_uploaded': + file_doc = { + 'id': f"file_{data.get('file_id')}", + 'doc_type': 'file', + 'file_id': data.get('file_id'), + 'filename': data.get('filename'), + 'content_type': data.get('content_type'), + 'size': data.get('size'), + 'user_id': data.get('user_id'), + 'tags': data.get('tags', []), + 'description': data.get('description', ''), + 'created_at': data.get('created_at'), + 'updated_at': datetime.utcnow().isoformat() + } + self.solr.index_document(file_doc) + + elif event_type == 'file_deleted': + self.solr.delete_document(f"file_{data.get('file_id')}") + + async def _index_content_event(self, event_type: str, data: Dict): + """Index content-related events""" + if event_type in ['content_created', 'content_updated']: + content_doc = { + 'id': f"content_{data.get('content_id')}", + 'doc_type': 'content', + 'content_id': data.get('content_id'), + 'title': data.get('title'), + 'content': data.get('content', ''), + 'summary': data.get('summary', ''), + 'author_id': data.get('author_id'), + 'tags': data.get('tags', []), + 'category': data.get('category'), + 'status': data.get('status', 'draft'), + 'created_at': data.get('created_at'), + 'updated_at': datetime.utcnow().isoformat() + } + self.solr.index_document(content_doc) + + elif event_type == 'content_deleted': + self.solr.delete_document(f"content_{data.get('content_id')}") + + async def _periodic_sync(self): + """Periodically sync data from MongoDB""" + while self.running: + try: + # Sync every 5 minutes + await asyncio.sleep(300) + await self.sync_all_data() + + except Exception as e: + logger.error(f"Periodic sync error: {e}") + + async def sync_all_data(self): + """Sync all data from MongoDB to Solr""" + try: + logger.info("Starting full data sync") + + # Sync users + await self._sync_users() + + # Sync files + await self._sync_files() + + # Optimize index after bulk sync + self.solr.optimize_index() + + logger.info("Full data sync completed") + + except Exception as e: + logger.error(f"Full sync failed: {e}") + + async def _sync_users(self): + """Sync users from MongoDB""" + try: + db = self.mongo_client['users_db'] + collection = db['users'] + + users = [] + async for user in collection.find({'deleted_at': None}): + user_doc = { + 'id': f"user_{str(user['_id'])}", + 'doc_type': 'user', + 'user_id': str(user['_id']), + 'username': user.get('username'), + 'email': user.get('email'), + 'name': user.get('name', ''), + 'bio': user.get('bio', ''), + 'tags': user.get('tags', []), + 'created_at': user.get('created_at').isoformat() if user.get('created_at') else None, + 'updated_at': datetime.utcnow().isoformat() + } + users.append(user_doc) + + # Bulk index every 100 documents + if len(users) >= 100: + self.solr.bulk_index(users, 'user') + users = [] + + # Index remaining users + if users: + self.solr.bulk_index(users, 'user') + + logger.info(f"Synced users to Solr") + + except Exception as e: + logger.error(f"Failed to sync users: {e}") + + async def _sync_files(self): + """Sync files from MongoDB""" + try: + db = self.mongo_client['files_db'] + collection = db['file_metadata'] + + files = [] + async for file in collection.find({'deleted_at': None}): + file_doc = { + 'id': f"file_{str(file['_id'])}", + 'doc_type': 'file', + 'file_id': str(file['_id']), + 'filename': file.get('filename'), + 'original_name': file.get('original_name'), + 'content_type': file.get('content_type'), + 'size': file.get('size'), + 'user_id': file.get('user_id'), + 'tags': list(file.get('tags', {}).keys()), + 'description': file.get('metadata', {}).get('description', ''), + 'created_at': file.get('created_at').isoformat() if file.get('created_at') else None, + 'updated_at': datetime.utcnow().isoformat() + } + files.append(file_doc) + + # Bulk index every 100 documents + if len(files) >= 100: + self.solr.bulk_index(files, 'file') + files = [] + + # Index remaining files + if files: + self.solr.bulk_index(files, 'file') + + logger.info(f"Synced files to Solr") + + except Exception as e: + logger.error(f"Failed to sync files: {e}") + + async def reindex_collection(self, collection_name: str, doc_type: str): + """Reindex a specific collection""" + try: + # Delete existing documents of this type + self.solr.delete_by_query(f'doc_type:{doc_type}') + + # Sync the collection + if collection_name == 'users': + await self._sync_users() + elif collection_name == 'files': + await self._sync_files() + + logger.info(f"Reindexed {collection_name}") + + except Exception as e: + logger.error(f"Failed to reindex {collection_name}: {e}") \ No newline at end of file diff --git a/services/search/backend/main.py b/services/search/backend/main.py new file mode 100644 index 0000000..db4e25a --- /dev/null +++ b/services/search/backend/main.py @@ -0,0 +1,362 @@ +""" +Search Service with Apache Solr +""" +from fastapi import FastAPI, Query, HTTPException +from fastapi.responses import JSONResponse +from contextlib import asynccontextmanager +import logging +import os +from typing import Optional, List, Dict, Any +from datetime import datetime +from solr_client import SolrClient +from indexer import DataIndexer +import asyncio +import time + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Global instances +solr_client = None +data_indexer = None + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage application lifecycle""" + global solr_client, data_indexer + + # Startup + logger.info("Starting Search Service...") + + # Wait for Solr to be ready + solr_url = os.getenv("SOLR_URL", "http://solr:8983/solr") + max_retries = 30 + + for i in range(max_retries): + try: + solr_client = SolrClient(solr_url=solr_url, core_name="site11") + logger.info("Connected to Solr") + break + except Exception as e: + logger.warning(f"Waiting for Solr... ({i+1}/{max_retries})") + await asyncio.sleep(2) + + if solr_client: + # Initialize data indexer + mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017") + kafka_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092") + + data_indexer = DataIndexer(solr_client, mongodb_url, kafka_servers) + await data_indexer.start() + + # Initial data sync + asyncio.create_task(data_indexer.sync_all_data()) + + yield + + # Shutdown + if data_indexer: + await data_indexer.stop() + + logger.info("Search Service stopped") + +app = FastAPI( + title="Search Service", + description="Full-text search with Apache Solr", + version="1.0.0", + lifespan=lifespan +) + +@app.get("/health") +async def health_check(): + """Health check endpoint""" + return { + "status": "healthy", + "service": "search", + "timestamp": datetime.utcnow().isoformat(), + "solr_connected": solr_client is not None + } + +@app.get("/api/search") +async def search( + q: str = Query(..., description="Search query"), + doc_type: Optional[str] = Query(None, description="Filter by document type"), + start: int = Query(0, ge=0, description="Starting offset"), + rows: int = Query(10, ge=1, le=100, description="Number of results"), + sort: Optional[str] = Query(None, description="Sort order (e.g., 'created_at desc')"), + facet: bool = Query(False, description="Enable faceting"), + facet_field: Optional[List[str]] = Query(None, description="Fields to facet on") +): + """ + Search documents across all indexed content + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + # Build filter query + fq = [] + if doc_type: + fq.append(f"doc_type:{doc_type}") + + # Prepare search parameters + search_params = { + 'start': start, + 'rows': rows, + 'facet': facet + } + + if fq: + search_params['fq'] = fq + + if sort: + search_params['sort'] = sort + + if facet_field: + search_params['facet_field'] = facet_field + + # Execute search + results = solr_client.search(q, **search_params) + + return { + "query": q, + "total": results['total'], + "start": start, + "rows": rows, + "documents": results['documents'], + "facets": results.get('facets', {}), + "highlighting": results.get('highlighting', {}) + } + + except Exception as e: + logger.error(f"Search failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/search/suggest") +async def suggest( + q: str = Query(..., min_length=1, description="Query prefix"), + field: str = Query("title", description="Field to search in"), + limit: int = Query(10, ge=1, le=50, description="Maximum suggestions") +): + """ + Get autocomplete suggestions + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + suggestions = solr_client.suggest(q, field, limit) + + return { + "query": q, + "suggestions": suggestions, + "count": len(suggestions) + } + + except Exception as e: + logger.error(f"Suggest failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/search/similar/{doc_id}") +async def find_similar( + doc_id: str, + rows: int = Query(5, ge=1, le=20, description="Number of similar documents") +): + """ + Find documents similar to the given document + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + similar_docs = solr_client.more_like_this(doc_id, rows=rows) + + return { + "source_document": doc_id, + "similar_documents": similar_docs, + "count": len(similar_docs) + } + + except Exception as e: + logger.error(f"Similar search failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/search/index") +async def index_document(document: Dict[str, Any]): + """ + Index a single document + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + doc_type = document.get('doc_type', 'general') + success = solr_client.index_document(document, doc_type) + + if success: + return { + "status": "success", + "message": "Document indexed", + "document_id": document.get('id') + } + else: + raise HTTPException(status_code=500, detail="Failed to index document") + + except Exception as e: + logger.error(f"Indexing failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/search/bulk-index") +async def bulk_index(documents: List[Dict[str, Any]]): + """ + Bulk index multiple documents + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + indexed = solr_client.bulk_index(documents) + + return { + "status": "success", + "message": f"Indexed {indexed} documents", + "count": indexed + } + + except Exception as e: + logger.error(f"Bulk indexing failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.delete("/api/search/document/{doc_id}") +async def delete_document(doc_id: str): + """ + Delete a document from the index + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + success = solr_client.delete_document(doc_id) + + if success: + return { + "status": "success", + "message": "Document deleted", + "document_id": doc_id + } + else: + raise HTTPException(status_code=500, detail="Failed to delete document") + + except Exception as e: + logger.error(f"Deletion failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/search/stats") +async def get_stats(): + """ + Get search index statistics + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + stats = solr_client.get_stats() + + return { + "status": "success", + "statistics": stats, + "timestamp": datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Failed to get stats: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/search/reindex/{collection}") +async def reindex_collection( + collection: str, + doc_type: Optional[str] = Query(None, description="Document type for the collection") +): + """ + Reindex a specific collection + """ + if not data_indexer: + raise HTTPException(status_code=503, detail="Indexer service unavailable") + + try: + if not doc_type: + # Map collection to doc_type + doc_type_map = { + 'users': 'user', + 'files': 'file', + 'content': 'content' + } + doc_type = doc_type_map.get(collection, collection) + + asyncio.create_task(data_indexer.reindex_collection(collection, doc_type)) + + return { + "status": "success", + "message": f"Reindexing {collection} started", + "collection": collection, + "doc_type": doc_type + } + + except Exception as e: + logger.error(f"Reindex failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/search/optimize") +async def optimize_index(): + """ + Optimize the search index + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + success = solr_client.optimize_index() + + if success: + return { + "status": "success", + "message": "Index optimization started" + } + else: + raise HTTPException(status_code=500, detail="Failed to optimize index") + + except Exception as e: + logger.error(f"Optimization failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/search/clear") +async def clear_index(): + """ + Clear all documents from the index (DANGER!) + """ + if not solr_client: + raise HTTPException(status_code=503, detail="Search service unavailable") + + try: + success = solr_client.clear_index() + + if success: + return { + "status": "success", + "message": "Index cleared", + "warning": "All documents have been deleted!" + } + else: + raise HTTPException(status_code=500, detail="Failed to clear index") + + except Exception as e: + logger.error(f"Clear index failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/services/search/backend/requirements.txt b/services/search/backend/requirements.txt new file mode 100644 index 0000000..08411d3 --- /dev/null +++ b/services/search/backend/requirements.txt @@ -0,0 +1,10 @@ +fastapi==0.109.0 +uvicorn[standard]==0.27.0 +pydantic==2.5.3 +python-dotenv==1.0.0 +pysolr==3.9.0 +httpx==0.25.2 +motor==3.5.1 +pymongo==4.6.1 +aiokafka==0.10.0 +redis==5.0.1 \ No newline at end of file diff --git a/services/search/backend/solr_client.py b/services/search/backend/solr_client.py new file mode 100644 index 0000000..4c555b0 --- /dev/null +++ b/services/search/backend/solr_client.py @@ -0,0 +1,303 @@ +""" +Apache Solr client for search operations +""" +import pysolr +import logging +from typing import Dict, List, Any, Optional +from datetime import datetime +import json + +logger = logging.getLogger(__name__) + +class SolrClient: + def __init__(self, solr_url: str = "http://solr:8983/solr", core_name: str = "site11"): + self.solr_url = f"{solr_url}/{core_name}" + self.core_name = core_name + self.solr = None + self.connect() + + def connect(self): + """Connect to Solr instance""" + try: + self.solr = pysolr.Solr( + self.solr_url, + always_commit=True, + timeout=10 + ) + # Test connection + self.solr.ping() + logger.info(f"Connected to Solr at {self.solr_url}") + except Exception as e: + logger.error(f"Failed to connect to Solr: {e}") + raise + + def index_document(self, document: Dict[str, Any], doc_type: str = None) -> bool: + """Index a single document""" + try: + # Add metadata + if doc_type: + document["doc_type"] = doc_type + + if "id" not in document: + document["id"] = f"{doc_type}_{document.get('_id', '')}" + + # Add indexing timestamp + document["indexed_at"] = datetime.utcnow().isoformat() + + # Index the document + self.solr.add([document]) + logger.info(f"Indexed document: {document.get('id')}") + return True + + except Exception as e: + logger.error(f"Failed to index document: {e}") + return False + + def bulk_index(self, documents: List[Dict[str, Any]], doc_type: str = None) -> int: + """Bulk index multiple documents""" + try: + indexed = 0 + for doc in documents: + if doc_type: + doc["doc_type"] = doc_type + + if "id" not in doc: + doc["id"] = f"{doc_type}_{doc.get('_id', '')}" + + doc["indexed_at"] = datetime.utcnow().isoformat() + + self.solr.add(documents) + indexed = len(documents) + logger.info(f"Bulk indexed {indexed} documents") + return indexed + + except Exception as e: + logger.error(f"Failed to bulk index: {e}") + return 0 + + def search(self, query: str, **kwargs) -> Dict[str, Any]: + """ + Search documents + + Args: + query: Search query string + **kwargs: Additional search parameters + - fq: Filter queries + - fl: Fields to return + - start: Starting offset + - rows: Number of rows + - sort: Sort order + - facet: Enable faceting + - facet.field: Fields to facet on + """ + try: + # Default parameters + params = { + 'q': query, + 'start': kwargs.get('start', 0), + 'rows': kwargs.get('rows', 10), + 'fl': kwargs.get('fl', '*,score'), + 'defType': 'edismax', + 'qf': 'title^3 content^2 tags description name', # Boost fields + 'mm': '2<-25%', # Minimum match + 'hl': 'true', # Highlighting + 'hl.fl': 'title,content,description', + 'hl.simple.pre': '', + 'hl.simple.post': '' + } + + # Add filter queries + if 'fq' in kwargs: + params['fq'] = kwargs['fq'] + + # Add sorting + if 'sort' in kwargs: + params['sort'] = kwargs['sort'] + + # Add faceting + if kwargs.get('facet'): + params.update({ + 'facet': 'true', + 'facet.field': kwargs.get('facet_field', ['doc_type', 'tags', 'status']), + 'facet.mincount': 1 + }) + + # Execute search + results = self.solr.search(**params) + + # Format response + response = { + 'total': results.hits, + 'documents': [], + 'facets': {}, + 'highlighting': {} + } + + # Add documents + for doc in results.docs: + response['documents'].append(doc) + + # Add facets if available + if hasattr(results, 'facets') and results.facets: + if 'facet_fields' in results.facets: + for field, values in results.facets['facet_fields'].items(): + response['facets'][field] = [ + {'value': values[i], 'count': values[i+1]} + for i in range(0, len(values), 2) + ] + + # Add highlighting if available + if hasattr(results, 'highlighting'): + response['highlighting'] = results.highlighting + + return response + + except Exception as e: + logger.error(f"Search failed: {e}") + return {'total': 0, 'documents': [], 'error': str(e)} + + def suggest(self, prefix: str, field: str = "suggest", limit: int = 10) -> List[str]: + """Get autocomplete suggestions""" + try: + params = { + 'q': f'{field}:{prefix}*', + 'fl': field, + 'rows': limit, + 'start': 0 + } + + results = self.solr.search(**params) + suggestions = [] + + for doc in results.docs: + if field in doc: + value = doc[field] + if isinstance(value, list): + suggestions.extend(value) + else: + suggestions.append(value) + + # Remove duplicates and limit + seen = set() + unique_suggestions = [] + for s in suggestions: + if s not in seen: + seen.add(s) + unique_suggestions.append(s) + if len(unique_suggestions) >= limit: + break + + return unique_suggestions + + except Exception as e: + logger.error(f"Suggest failed: {e}") + return [] + + def more_like_this(self, doc_id: str, mlt_fields: List[str] = None, rows: int = 5) -> List[Dict]: + """Find similar documents""" + try: + if not mlt_fields: + mlt_fields = ['title', 'content', 'tags', 'description'] + + params = { + 'q': f'id:{doc_id}', + 'mlt': 'true', + 'mlt.fl': ','.join(mlt_fields), + 'mlt.mindf': 1, + 'mlt.mintf': 1, + 'mlt.count': rows, + 'fl': '*,score' + } + + results = self.solr.search(**params) + + if results.docs: + # The MLT results are in the moreLikeThis section + if hasattr(results, 'moreLikeThis'): + mlt_results = results.moreLikeThis.get(doc_id, {}) + if 'docs' in mlt_results: + return mlt_results['docs'] + + return [] + + except Exception as e: + logger.error(f"More like this failed: {e}") + return [] + + def delete_document(self, doc_id: str) -> bool: + """Delete a document by ID""" + try: + self.solr.delete(id=doc_id) + logger.info(f"Deleted document: {doc_id}") + return True + except Exception as e: + logger.error(f"Failed to delete document: {e}") + return False + + def delete_by_query(self, query: str) -> bool: + """Delete documents matching a query""" + try: + self.solr.delete(q=query) + logger.info(f"Deleted documents matching: {query}") + return True + except Exception as e: + logger.error(f"Failed to delete by query: {e}") + return False + + def clear_index(self) -> bool: + """Clear all documents from index""" + try: + self.solr.delete(q='*:*') + logger.info("Cleared all documents from index") + return True + except Exception as e: + logger.error(f"Failed to clear index: {e}") + return False + + def get_stats(self) -> Dict[str, Any]: + """Get index statistics""" + try: + # Get document count + results = self.solr.search(q='*:*', rows=0) + + # Get facet counts for doc_type + facet_results = self.solr.search( + q='*:*', + rows=0, + facet='true', + **{'facet.field': ['doc_type', 'status']} + ) + + stats = { + 'total_documents': results.hits, + 'doc_types': {}, + 'status_counts': {} + } + + if hasattr(facet_results, 'facets') and facet_results.facets: + if 'facet_fields' in facet_results.facets: + # Parse doc_type facets + doc_type_facets = facet_results.facets['facet_fields'].get('doc_type', []) + for i in range(0, len(doc_type_facets), 2): + stats['doc_types'][doc_type_facets[i]] = doc_type_facets[i+1] + + # Parse status facets + status_facets = facet_results.facets['facet_fields'].get('status', []) + for i in range(0, len(status_facets), 2): + stats['status_counts'][status_facets[i]] = status_facets[i+1] + + return stats + + except Exception as e: + logger.error(f"Failed to get stats: {e}") + return {'error': str(e)} + + def optimize_index(self) -> bool: + """Optimize the Solr index""" + try: + self.solr.optimize() + logger.info("Index optimized") + return True + except Exception as e: + logger.error(f"Failed to optimize index: {e}") + return False \ No newline at end of file diff --git a/services/search/backend/test_search.py b/services/search/backend/test_search.py new file mode 100644 index 0000000..095a2b5 --- /dev/null +++ b/services/search/backend/test_search.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +""" +Test script for Search Service with Apache Solr +""" +import asyncio +import httpx +import json +from datetime import datetime + +BASE_URL = "http://localhost:8015" + +async def test_search_api(): + """Test search API endpoints""" + async with httpx.AsyncClient() as client: + print("\n🔍 Testing Search Service API...") + + # Test health check + print("\n1. Testing health check...") + response = await client.get(f"{BASE_URL}/health") + print(f"Health check: {response.json()}") + + # Test index sample documents + print("\n2. Indexing sample documents...") + + # Index user document + user_doc = { + "id": "user_test_001", + "doc_type": "user", + "user_id": "test_001", + "username": "john_doe", + "email": "john@example.com", + "name": "John Doe", + "bio": "Software developer passionate about Python and microservices", + "tags": ["python", "developer", "backend"], + "created_at": datetime.utcnow().isoformat() + } + + response = await client.post(f"{BASE_URL}/api/search/index", json=user_doc) + print(f"Indexed user: {response.json()}") + + # Index file documents + file_docs = [ + { + "id": "file_test_001", + "doc_type": "file", + "file_id": "test_file_001", + "filename": "architecture_diagram.png", + "content_type": "image/png", + "size": 1024000, + "user_id": "test_001", + "tags": ["architecture", "design", "documentation"], + "description": "System architecture diagram showing microservices", + "created_at": datetime.utcnow().isoformat() + }, + { + "id": "file_test_002", + "doc_type": "file", + "file_id": "test_file_002", + "filename": "user_manual.pdf", + "content_type": "application/pdf", + "size": 2048000, + "user_id": "test_001", + "tags": ["documentation", "manual", "guide"], + "description": "Complete user manual for the application", + "created_at": datetime.utcnow().isoformat() + } + ] + + response = await client.post(f"{BASE_URL}/api/search/bulk-index", json=file_docs) + print(f"Bulk indexed files: {response.json()}") + + # Index content documents + content_docs = [ + { + "id": "content_test_001", + "doc_type": "content", + "content_id": "test_content_001", + "title": "Getting Started with Microservices", + "content": "Microservices architecture is a method of developing software applications as a suite of independently deployable services.", + "summary": "Introduction to microservices architecture patterns", + "author_id": "test_001", + "tags": ["microservices", "architecture", "tutorial"], + "category": "technology", + "status": "published", + "created_at": datetime.utcnow().isoformat() + }, + { + "id": "content_test_002", + "doc_type": "content", + "content_id": "test_content_002", + "title": "Python Best Practices", + "content": "Learn the best practices for writing clean, maintainable Python code including PEP 8 style guide.", + "summary": "Essential Python coding standards and practices", + "author_id": "test_001", + "tags": ["python", "programming", "best-practices"], + "category": "programming", + "status": "published", + "created_at": datetime.utcnow().isoformat() + } + ] + + response = await client.post(f"{BASE_URL}/api/search/bulk-index", json=content_docs) + print(f"Bulk indexed content: {response.json()}") + + # Wait for indexing + await asyncio.sleep(2) + + # Test basic search + print("\n3. Testing basic search...") + response = await client.get( + f"{BASE_URL}/api/search", + params={"q": "microservices"} + ) + results = response.json() + print(f"Search for 'microservices': Found {results['total']} results") + if results['documents']: + print(f"First result: {results['documents'][0].get('title', results['documents'][0].get('filename', 'N/A'))}") + + # Test search with filters + print("\n4. Testing filtered search...") + response = await client.get( + f"{BASE_URL}/api/search", + params={ + "q": "*:*", + "doc_type": "file", + "rows": 5 + } + ) + results = response.json() + print(f"Files search: Found {results['total']} files") + + # Test faceted search + print("\n5. Testing faceted search...") + response = await client.get( + f"{BASE_URL}/api/search", + params={ + "q": "*:*", + "facet": "true", + "facet_field": ["doc_type", "tags", "category", "status"] + } + ) + results = response.json() + print(f"Facets: {json.dumps(results['facets'], indent=2)}") + + # Test autocomplete/suggest + print("\n6. Testing autocomplete...") + response = await client.get( + f"{BASE_URL}/api/search/suggest", + params={ + "q": "micro", + "field": "title", + "limit": 5 + } + ) + suggestions = response.json() + print(f"Suggestions for 'micro': {suggestions['suggestions']}") + + # Test similar documents + print("\n7. Testing similar documents...") + response = await client.get(f"{BASE_URL}/api/search/similar/content_test_001") + if response.status_code == 200: + similar = response.json() + print(f"Found {similar['count']} similar documents") + else: + print(f"Similar search: {response.status_code}") + + # Test search with highlighting + print("\n8. Testing search with highlighting...") + response = await client.get( + f"{BASE_URL}/api/search", + params={"q": "Python"} + ) + results = response.json() + if results['highlighting']: + print(f"Highlighting results: {len(results['highlighting'])} documents highlighted") + + # Test search statistics + print("\n9. Testing search statistics...") + response = await client.get(f"{BASE_URL}/api/search/stats") + if response.status_code == 200: + stats = response.json() + print(f"Index stats: {stats['statistics']}") + + # Test complex query + print("\n10. Testing complex query...") + response = await client.get( + f"{BASE_URL}/api/search", + params={ + "q": "architecture OR python", + "doc_type": "content", + "sort": "created_at desc", + "rows": 10 + } + ) + results = response.json() + print(f"Complex query: Found {results['total']} results") + + # Test delete document + print("\n11. Testing document deletion...") + response = await client.delete(f"{BASE_URL}/api/search/document/content_test_002") + if response.status_code == 200: + print(f"Deleted document: {response.json()}") + + # Verify deletion + await asyncio.sleep(1) + response = await client.get( + f"{BASE_URL}/api/search", + params={"q": "id:content_test_002"} + ) + results = response.json() + print(f"Verify deletion: Found {results['total']} results (should be 0)") + +async def test_performance(): + """Test search performance""" + print("\n\n⚡ Testing Search Performance...") + + async with httpx.AsyncClient(timeout=30.0) as client: + # Index many documents + print("Indexing 100 test documents...") + docs = [] + for i in range(100): + docs.append({ + "id": f"perf_test_{i}", + "doc_type": "content", + "title": f"Test Document {i}", + "content": f"This is test content for document {i} with various keywords like search, Solr, Python, microservices", + "tags": [f"tag{i%10}", f"category{i%5}"], + "created_at": datetime.utcnow().isoformat() + }) + + response = await client.post(f"{BASE_URL}/api/search/bulk-index", json=docs) + print(f"Indexed {response.json().get('count', 0)} documents") + + # Wait for indexing + await asyncio.sleep(2) + + # Test search speed + print("\nTesting search response times...") + import time + + queries = ["search", "Python", "document", "test", "microservices"] + for query in queries: + start = time.time() + response = await client.get( + f"{BASE_URL}/api/search", + params={"q": query, "rows": 20} + ) + elapsed = time.time() - start + results = response.json() + print(f"Query '{query}': {results['total']} results in {elapsed:.3f}s") + +async def test_reindex(): + """Test reindexing from MongoDB""" + print("\n\n🔄 Testing Reindex Functionality...") + + async with httpx.AsyncClient() as client: + # Trigger reindex for users collection + print("Triggering reindex for users collection...") + response = await client.post( + f"{BASE_URL}/api/search/reindex/users", + params={"doc_type": "user"} + ) + if response.status_code == 200: + print(f"Reindex started: {response.json()}") + else: + print(f"Reindex failed: {response.status_code}") + + # Test index optimization + print("\nTesting index optimization...") + response = await client.post(f"{BASE_URL}/api/search/optimize") + if response.status_code == 200: + print(f"Optimization: {response.json()}") + +async def main(): + """Run all tests""" + print("=" * 60) + print("SEARCH SERVICE TEST SUITE (Apache Solr)") + print("=" * 60) + print(f"Started at: {datetime.now().isoformat()}") + + # Run tests + await test_search_api() + await test_performance() + await test_reindex() + + print("\n" + "=" * 60) + print("✅ All search tests completed!") + print(f"Finished at: {datetime.now().isoformat()}") + print("=" * 60) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/services/search/solr-config/conf/managed-schema.xml b/services/search/solr-config/conf/managed-schema.xml new file mode 100644 index 0000000..e3a02ef --- /dev/null +++ b/services/search/solr-config/conf/managed-schema.xml @@ -0,0 +1,105 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + id + \ No newline at end of file diff --git a/services/search/solr-config/conf/solrconfig.xml b/services/search/solr-config/conf/solrconfig.xml new file mode 100644 index 0000000..e16dcb7 --- /dev/null +++ b/services/search/solr-config/conf/solrconfig.xml @@ -0,0 +1,154 @@ + + + 9.4.0 + + + ${solr.data.dir:} + + + + 100 + 1000 + + 10 + 10 + + + + + + + ${solr.ulog.dir:} + ${solr.ulog.numVersionBuckets:65536} + + + ${solr.autoCommit.maxTime:15000} + false + + + ${solr.autoSoftCommit.maxTime:1000} + + + + + + 1024 + + + + true + 20 + 200 + + + + + + + + + + + + + + explicit + 10 + content + OR + edismax + + title^3.0 name^2.5 content^2.0 description^1.5 summary^1.5 + filename^1.5 tags^1.2 category username email bio + + + title^4.0 name^3.0 content^2.5 description^2.0 + + 2<-25% + true + title,content,description,summary + <mark> + </mark> + true + 1 + + + + + + + + + + true + + + + + + + solrpingquery + + + all + + + + + + + true + 10 + suggest + + + suggest + + + + + + text_general + + default + content + solr.DirectSolrSpellChecker + internal + 0.5 + 2 + 1 + 5 + 4 + 0.01 + + + + + + + suggest + FuzzyLookupFactory + DocumentDictionaryFactory + suggest + text_suggest + false + + + + + + + title,content,description,tags + 1 + 1 + 10 + + + + + + + + + \ No newline at end of file