Files
2025-09-28 20:41:57 +09:00

362 lines
10 KiB
Python

"""
Search Service with Apache Solr
"""
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import logging
import os
from typing import Optional, List, Dict, Any
from datetime import datetime
from solr_client import SolrClient
from indexer import DataIndexer
import asyncio
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Global instances
solr_client = None
data_indexer = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle"""
global solr_client, data_indexer
# Startup
logger.info("Starting Search Service...")
# Wait for Solr to be ready
solr_url = os.getenv("SOLR_URL", "http://solr:8983/solr")
max_retries = 30
for i in range(max_retries):
try:
solr_client = SolrClient(solr_url=solr_url, core_name="site11")
logger.info("Connected to Solr")
break
except Exception as e:
logger.warning(f"Waiting for Solr... ({i+1}/{max_retries})")
await asyncio.sleep(2)
if solr_client:
# Initialize data indexer
mongodb_url = os.getenv("MONGODB_URL", "mongodb://mongodb:27017")
kafka_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "kafka:9092")
data_indexer = DataIndexer(solr_client, mongodb_url, kafka_servers)
await data_indexer.start()
# Initial data sync
asyncio.create_task(data_indexer.sync_all_data())
yield
# Shutdown
if data_indexer:
await data_indexer.stop()
logger.info("Search Service stopped")
app = FastAPI(
title="Search Service",
description="Full-text search with Apache Solr",
version="1.0.0",
lifespan=lifespan
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"service": "search",
"timestamp": datetime.utcnow().isoformat(),
"solr_connected": solr_client is not None
}
@app.get("/api/search")
async def search(
q: str = Query(..., description="Search query"),
doc_type: Optional[str] = Query(None, description="Filter by document type"),
start: int = Query(0, ge=0, description="Starting offset"),
rows: int = Query(10, ge=1, le=100, description="Number of results"),
sort: Optional[str] = Query(None, description="Sort order (e.g., 'created_at desc')"),
facet: bool = Query(False, description="Enable faceting"),
facet_field: Optional[List[str]] = Query(None, description="Fields to facet on")
):
"""
Search documents across all indexed content
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
# Build filter query
fq = []
if doc_type:
fq.append(f"doc_type:{doc_type}")
# Prepare search parameters
search_params = {
'start': start,
'rows': rows,
'facet': facet
}
if fq:
search_params['fq'] = fq
if sort:
search_params['sort'] = sort
if facet_field:
search_params['facet_field'] = facet_field
# Execute search
results = solr_client.search(q, **search_params)
return {
"query": q,
"total": results['total'],
"start": start,
"rows": rows,
"documents": results['documents'],
"facets": results.get('facets', {}),
"highlighting": results.get('highlighting', {})
}
except Exception as e:
logger.error(f"Search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/search/suggest")
async def suggest(
q: str = Query(..., min_length=1, description="Query prefix"),
field: str = Query("title", description="Field to search in"),
limit: int = Query(10, ge=1, le=50, description="Maximum suggestions")
):
"""
Get autocomplete suggestions
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
suggestions = solr_client.suggest(q, field, limit)
return {
"query": q,
"suggestions": suggestions,
"count": len(suggestions)
}
except Exception as e:
logger.error(f"Suggest failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/search/similar/{doc_id}")
async def find_similar(
doc_id: str,
rows: int = Query(5, ge=1, le=20, description="Number of similar documents")
):
"""
Find documents similar to the given document
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
similar_docs = solr_client.more_like_this(doc_id, rows=rows)
return {
"source_document": doc_id,
"similar_documents": similar_docs,
"count": len(similar_docs)
}
except Exception as e:
logger.error(f"Similar search failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/search/index")
async def index_document(document: Dict[str, Any]):
"""
Index a single document
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
doc_type = document.get('doc_type', 'general')
success = solr_client.index_document(document, doc_type)
if success:
return {
"status": "success",
"message": "Document indexed",
"document_id": document.get('id')
}
else:
raise HTTPException(status_code=500, detail="Failed to index document")
except Exception as e:
logger.error(f"Indexing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/search/bulk-index")
async def bulk_index(documents: List[Dict[str, Any]]):
"""
Bulk index multiple documents
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
indexed = solr_client.bulk_index(documents)
return {
"status": "success",
"message": f"Indexed {indexed} documents",
"count": indexed
}
except Exception as e:
logger.error(f"Bulk indexing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/search/document/{doc_id}")
async def delete_document(doc_id: str):
"""
Delete a document from the index
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
success = solr_client.delete_document(doc_id)
if success:
return {
"status": "success",
"message": "Document deleted",
"document_id": doc_id
}
else:
raise HTTPException(status_code=500, detail="Failed to delete document")
except Exception as e:
logger.error(f"Deletion failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/search/stats")
async def get_stats():
"""
Get search index statistics
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
stats = solr_client.get_stats()
return {
"status": "success",
"statistics": stats,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Failed to get stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/search/reindex/{collection}")
async def reindex_collection(
collection: str,
doc_type: Optional[str] = Query(None, description="Document type for the collection")
):
"""
Reindex a specific collection
"""
if not data_indexer:
raise HTTPException(status_code=503, detail="Indexer service unavailable")
try:
if not doc_type:
# Map collection to doc_type
doc_type_map = {
'users': 'user',
'files': 'file',
'content': 'content'
}
doc_type = doc_type_map.get(collection, collection)
asyncio.create_task(data_indexer.reindex_collection(collection, doc_type))
return {
"status": "success",
"message": f"Reindexing {collection} started",
"collection": collection,
"doc_type": doc_type
}
except Exception as e:
logger.error(f"Reindex failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/search/optimize")
async def optimize_index():
"""
Optimize the search index
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
success = solr_client.optimize_index()
if success:
return {
"status": "success",
"message": "Index optimization started"
}
else:
raise HTTPException(status_code=500, detail="Failed to optimize index")
except Exception as e:
logger.error(f"Optimization failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/search/clear")
async def clear_index():
"""
Clear all documents from the index (DANGER!)
"""
if not solr_client:
raise HTTPException(status_code=503, detail="Search service unavailable")
try:
success = solr_client.clear_index()
if success:
return {
"status": "success",
"message": "Index cleared",
"warning": "All documents have been deleted!"
}
else:
raise HTTPException(status_code=500, detail="Failed to clear index")
except Exception as e:
logger.error(f"Clear index failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)