""" Test script for Notification Service """ import asyncio import httpx import websockets import json from datetime import datetime, timedelta BASE_URL = "http://localhost:8013" WS_URL = "ws://localhost:8013/ws/notifications" async def test_notification_api(): """Test notification API endpoints""" async with httpx.AsyncClient() as client: print("\nšŸ”” Testing Notification Service API...") # Test health check print("\n1. Testing health check...") response = await client.get(f"{BASE_URL}/health") print(f"Health check: {response.json()}") # Test sending single notification print("\n2. Testing single notification...") notification_data = { "user_id": "test_user_123", "title": "Welcome to Our App!", "message": "Thank you for joining our platform. We're excited to have you!", "channels": ["in_app", "email"], "priority": "high", "category": "system", "data": { "action_url": "https://example.com/welcome", "icon": "welcome" } } response = await client.post( f"{BASE_URL}/api/notifications/send", json=notification_data ) notification_result = response.json() print(f"Notification sent: {notification_result}") notification_id = notification_result.get("notification_id") # Test bulk notifications print("\n3. Testing bulk notifications...") bulk_data = { "user_ids": ["user1", "user2", "user3"], "title": "System Maintenance Notice", "message": "We will be performing system maintenance tonight from 2-4 AM.", "channels": ["in_app", "push"], "priority": "normal", "category": "update" } response = await client.post( f"{BASE_URL}/api/notifications/send-bulk", json=bulk_data ) print(f"Bulk notifications: {response.json()}") # Test scheduled notification print("\n4. Testing scheduled notification...") scheduled_time = datetime.now() + timedelta(minutes=5) scheduled_data = { "user_id": "test_user_123", "title": "Reminder: Meeting in 5 minutes", "message": "Your scheduled meeting is about to start.", "channels": ["in_app", "push"], "priority": "urgent", "category": "system", "schedule_at": scheduled_time.isoformat() } response = await client.post( f"{BASE_URL}/api/notifications/send", json=scheduled_data ) print(f"Scheduled notification: {response.json()}") # Test get user notifications print("\n5. Testing get user notifications...") response = await client.get( f"{BASE_URL}/api/notifications/user/test_user_123" ) notifications = response.json() print(f"User notifications: Found {notifications['count']} notifications") # Test mark as read if notification_id: print("\n6. Testing mark as read...") response = await client.patch( f"{BASE_URL}/api/notifications/{notification_id}/read" ) print(f"Mark as read: {response.json()}") # Test templates print("\n7. Testing templates...") response = await client.get(f"{BASE_URL}/api/templates") templates = response.json() print(f"Available templates: {len(templates['templates'])} templates") # Test preferences print("\n8. Testing user preferences...") # Get preferences response = await client.get( f"{BASE_URL}/api/preferences/test_user_123" ) print(f"Current preferences: {response.json()}") # Update preferences new_preferences = { "user_id": "test_user_123", "channels": { "email": True, "sms": False, "push": True, "in_app": True }, "categories": { "system": True, "marketing": False, "transaction": True, "social": True, "security": True, "update": True }, "email_frequency": "daily", "timezone": "America/New_York", "language": "en" } response = await client.put( f"{BASE_URL}/api/preferences/test_user_123", json=new_preferences ) print(f"Update preferences: {response.json()}") # Test unsubscribe response = await client.post( f"{BASE_URL}/api/preferences/test_user_123/unsubscribe/marketing" ) print(f"Unsubscribe from marketing: {response.json()}") # Test notification with template print("\n9. Testing notification with template...") template_notification = { "user_id": "test_user_123", "title": "Password Reset Request", "message": "", # Will be filled by template "channels": ["email"], "priority": "high", "category": "security", "template_id": "password_reset", "data": { "user_name": "John Doe", "app_name": "Our App", "reset_link": "https://example.com/reset/abc123", "expiry_hours": 24 } } response = await client.post( f"{BASE_URL}/api/notifications/send", json=template_notification ) print(f"Template notification: {response.json()}") # Test queue status print("\n10. Testing queue status...") response = await client.get(f"{BASE_URL}/api/queue/status") print(f"Queue status: {response.json()}") # Test analytics print("\n11. Testing analytics...") response = await client.get(f"{BASE_URL}/api/analytics") analytics = response.json() print(f"Analytics overview: {analytics}") # Test notification history print("\n12. Testing notification history...") response = await client.get( f"{BASE_URL}/api/history", params={"user_id": "test_user_123", "limit": 10} ) history = response.json() print(f"Notification history: {history['count']} entries") # Test device registration print("\n13. Testing device registration...") response = await client.post( f"{BASE_URL}/api/devices/register", params={ "user_id": "test_user_123", "device_token": "dummy_token_12345", "device_type": "ios" } ) print(f"Device registration: {response.json()}") async def test_websocket(): """Test WebSocket connection for real-time notifications""" print("\n\n🌐 Testing WebSocket Connection...") try: uri = f"{WS_URL}/test_user_123" async with websockets.connect(uri) as websocket: print(f"Connected to WebSocket at {uri}") # Listen for welcome message message = await websocket.recv() data = json.loads(message) print(f"Welcome message: {data}") # Send ping await websocket.send("ping") pong = await websocket.recv() print(f"Ping response: {pong}") # Send notification via API while connected async with httpx.AsyncClient() as client: notification_data = { "user_id": "test_user_123", "title": "Real-time Test", "message": "This should appear in WebSocket!", "channels": ["in_app"], "priority": "normal" } response = await client.post( f"{BASE_URL}/api/notifications/send", json=notification_data ) print(f"Sent notification: {response.json()}") # Wait for real-time notification print("Waiting for real-time notification...") try: notification = await asyncio.wait_for(websocket.recv(), timeout=5.0) print(f"Received real-time notification: {json.loads(notification)}") except asyncio.TimeoutError: print("No real-time notification received (timeout)") print("WebSocket test completed") except Exception as e: print(f"WebSocket error: {e}") async def main(): """Run all tests""" print("=" * 60) print("NOTIFICATION SERVICE TEST SUITE") print("=" * 60) # Test API endpoints await test_notification_api() # Test WebSocket await test_websocket() print("\n" + "=" * 60) print("āœ… All tests completed!") print("=" * 60) if __name__ == "__main__": asyncio.run(main())