""" Data indexer for synchronizing data from other services to Solr """ import asyncio import logging from typing import Dict, Any, List from motor.motor_asyncio import AsyncIOMotorClient from aiokafka import AIOKafkaConsumer import json from solr_client import SolrClient from datetime import datetime logger = logging.getLogger(__name__) class DataIndexer: def __init__(self, solr_client: SolrClient, mongodb_url: str, kafka_servers: str): self.solr = solr_client self.mongodb_url = mongodb_url self.kafka_servers = kafka_servers self.mongo_client = None self.kafka_consumer = None self.running = False async def start(self): """Start the indexer""" try: # Connect to MongoDB self.mongo_client = AsyncIOMotorClient(self.mongodb_url) # Initialize Kafka consumer await self._init_kafka_consumer() # Start background tasks self.running = True asyncio.create_task(self._consume_kafka_events()) asyncio.create_task(self._periodic_sync()) logger.info("Data indexer started") except Exception as e: logger.error(f"Failed to start indexer: {e}") async def stop(self): """Stop the indexer""" self.running = False if self.kafka_consumer: await self.kafka_consumer.stop() if self.mongo_client: self.mongo_client.close() logger.info("Data indexer stopped") async def _init_kafka_consumer(self): """Initialize Kafka consumer""" try: self.kafka_consumer = AIOKafkaConsumer( 'user_events', 'file_events', 'content_events', bootstrap_servers=self.kafka_servers, value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='search_indexer', auto_offset_reset='latest' ) await self.kafka_consumer.start() logger.info("Kafka consumer initialized") except Exception as e: logger.warning(f"Kafka consumer initialization failed: {e}") self.kafka_consumer = None async def _consume_kafka_events(self): """Consume events from Kafka and index them""" if not self.kafka_consumer: return while self.running: try: async for msg in self.kafka_consumer: await self._handle_kafka_event(msg.topic, msg.value) except Exception as e: logger.error(f"Kafka consumption error: {e}") await asyncio.sleep(5) async def _handle_kafka_event(self, topic: str, event: Dict[str, Any]): """Handle a Kafka event""" try: event_type = event.get('type') data = event.get('data', {}) if topic == 'user_events': await self._index_user_event(event_type, data) elif topic == 'file_events': await self._index_file_event(event_type, data) elif topic == 'content_events': await self._index_content_event(event_type, data) except Exception as e: logger.error(f"Failed to handle event: {e}") async def _index_user_event(self, event_type: str, data: Dict): """Index user-related events""" if event_type == 'user_created' or event_type == 'user_updated': user_doc = { 'id': f"user_{data.get('user_id')}", 'doc_type': 'user', 'user_id': data.get('user_id'), 'username': data.get('username'), 'email': data.get('email'), 'name': data.get('name', ''), 'bio': data.get('bio', ''), 'tags': data.get('tags', []), 'created_at': data.get('created_at'), 'updated_at': datetime.utcnow().isoformat() } self.solr.index_document(user_doc) elif event_type == 'user_deleted': self.solr.delete_document(f"user_{data.get('user_id')}") async def _index_file_event(self, event_type: str, data: Dict): """Index file-related events""" if event_type == 'file_uploaded': file_doc = { 'id': f"file_{data.get('file_id')}", 'doc_type': 'file', 'file_id': data.get('file_id'), 'filename': data.get('filename'), 'content_type': data.get('content_type'), 'size': data.get('size'), 'user_id': data.get('user_id'), 'tags': data.get('tags', []), 'description': data.get('description', ''), 'created_at': data.get('created_at'), 'updated_at': datetime.utcnow().isoformat() } self.solr.index_document(file_doc) elif event_type == 'file_deleted': self.solr.delete_document(f"file_{data.get('file_id')}") async def _index_content_event(self, event_type: str, data: Dict): """Index content-related events""" if event_type in ['content_created', 'content_updated']: content_doc = { 'id': f"content_{data.get('content_id')}", 'doc_type': 'content', 'content_id': data.get('content_id'), 'title': data.get('title'), 'content': data.get('content', ''), 'summary': data.get('summary', ''), 'author_id': data.get('author_id'), 'tags': data.get('tags', []), 'category': data.get('category'), 'status': data.get('status', 'draft'), 'created_at': data.get('created_at'), 'updated_at': datetime.utcnow().isoformat() } self.solr.index_document(content_doc) elif event_type == 'content_deleted': self.solr.delete_document(f"content_{data.get('content_id')}") async def _periodic_sync(self): """Periodically sync data from MongoDB""" while self.running: try: # Sync every 5 minutes await asyncio.sleep(300) await self.sync_all_data() except Exception as e: logger.error(f"Periodic sync error: {e}") async def sync_all_data(self): """Sync all data from MongoDB to Solr""" try: logger.info("Starting full data sync") # Sync users await self._sync_users() # Sync files await self._sync_files() # Optimize index after bulk sync self.solr.optimize_index() logger.info("Full data sync completed") except Exception as e: logger.error(f"Full sync failed: {e}") async def _sync_users(self): """Sync users from MongoDB""" try: db = self.mongo_client['users_db'] collection = db['users'] users = [] async for user in collection.find({'deleted_at': None}): user_doc = { 'id': f"user_{str(user['_id'])}", 'doc_type': 'user', 'user_id': str(user['_id']), 'username': user.get('username'), 'email': user.get('email'), 'name': user.get('name', ''), 'bio': user.get('bio', ''), 'tags': user.get('tags', []), 'created_at': user.get('created_at').isoformat() if user.get('created_at') else None, 'updated_at': datetime.utcnow().isoformat() } users.append(user_doc) # Bulk index every 100 documents if len(users) >= 100: self.solr.bulk_index(users, 'user') users = [] # Index remaining users if users: self.solr.bulk_index(users, 'user') logger.info(f"Synced users to Solr") except Exception as e: logger.error(f"Failed to sync users: {e}") async def _sync_files(self): """Sync files from MongoDB""" try: db = self.mongo_client['files_db'] collection = db['file_metadata'] files = [] async for file in collection.find({'deleted_at': None}): file_doc = { 'id': f"file_{str(file['_id'])}", 'doc_type': 'file', 'file_id': str(file['_id']), 'filename': file.get('filename'), 'original_name': file.get('original_name'), 'content_type': file.get('content_type'), 'size': file.get('size'), 'user_id': file.get('user_id'), 'tags': list(file.get('tags', {}).keys()), 'description': file.get('metadata', {}).get('description', ''), 'created_at': file.get('created_at').isoformat() if file.get('created_at') else None, 'updated_at': datetime.utcnow().isoformat() } files.append(file_doc) # Bulk index every 100 documents if len(files) >= 100: self.solr.bulk_index(files, 'file') files = [] # Index remaining files if files: self.solr.bulk_index(files, 'file') logger.info(f"Synced files to Solr") except Exception as e: logger.error(f"Failed to sync files: {e}") async def reindex_collection(self, collection_name: str, doc_type: str): """Reindex a specific collection""" try: # Delete existing documents of this type self.solr.delete_by_query(f'doc_type:{doc_type}') # Sync the collection if collection_name == 'users': await self._sync_users() elif collection_name == 'files': await self._sync_files() logger.info(f"Reindexed {collection_name}") except Exception as e: logger.error(f"Failed to reindex {collection_name}: {e}")