#!/usr/bin/env python3 """ Test script for File Management Service """ import asyncio import httpx import os import json from datetime import datetime import base64 BASE_URL = "http://localhost:8014" # Sample image for testing (1x1 pixel PNG) TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg==" TEST_IMAGE_DATA = base64.b64decode(TEST_IMAGE_BASE64) async def test_file_api(): """Test file management API endpoints""" async with httpx.AsyncClient() as client: print("\nšŸ“ Testing File Management Service API...") # Test health check print("\n1. Testing health check...") response = await client.get(f"{BASE_URL}/health") print(f"Health check: {response.json()}") # Test file upload print("\n2. Testing file upload...") files = { 'file': ('test_image.png', TEST_IMAGE_DATA, 'image/png') } data = { 'user_id': 'test_user_123', 'bucket': 'default', 'public': 'false', 'generate_thumbnail': 'true', 'tags': json.dumps({"test": "true", "category": "sample"}) } response = await client.post( f"{BASE_URL}/api/files/upload", files=files, data=data ) if response.status_code == 200: upload_result = response.json() print(f"File uploaded: {upload_result}") file_id = upload_result.get("file_id") else: print(f"Upload failed: {response.status_code} - {response.text}") file_id = None # Test multiple file upload print("\n3. Testing multiple file upload...") files = [ ('files', ('test1.png', TEST_IMAGE_DATA, 'image/png')), ('files', ('test2.png', TEST_IMAGE_DATA, 'image/png')), ('files', ('test3.png', TEST_IMAGE_DATA, 'image/png')) ] data = { 'user_id': 'test_user_123', 'bucket': 'default', 'public': 'false' } response = await client.post( f"{BASE_URL}/api/files/upload-multiple", files=files, data=data ) if response.status_code == 200: print(f"Multiple files uploaded: {response.json()}") else: print(f"Multiple upload failed: {response.status_code}") # Test file metadata retrieval if file_id: print("\n4. Testing file metadata retrieval...") response = await client.get(f"{BASE_URL}/api/files/{file_id}/metadata") if response.status_code == 200: print(f"File metadata: {response.json()}") else: print(f"Metadata retrieval failed: {response.status_code}") # Test thumbnail generation print("\n5. Testing thumbnail retrieval...") response = await client.get( f"{BASE_URL}/api/files/{file_id}/thumbnail", params={"width": 100, "height": 100} ) if response.status_code == 200: print(f"Thumbnail retrieved: {len(response.content)} bytes") else: print(f"Thumbnail retrieval failed: {response.status_code}") # Test file download print("\n6. Testing file download...") response = await client.get(f"{BASE_URL}/api/files/{file_id}/download") if response.status_code == 200: print(f"File downloaded: {len(response.content)} bytes") else: print(f"Download failed: {response.status_code}") # Test share link generation print("\n7. Testing share link generation...") response = await client.get( f"{BASE_URL}/api/files/{file_id}/share", params={"expires_in": 3600} ) if response.status_code == 200: share_result = response.json() print(f"Share link generated: {share_result.get('share_url', 'N/A')[:50]}...") else: print(f"Share link generation failed: {response.status_code}") # Test file listing print("\n8. Testing file listing...") response = await client.get( f"{BASE_URL}/api/files", params={ "user_id": "test_user_123", "limit": 10, "offset": 0 } ) if response.status_code == 200: files_list = response.json() print(f"Files found: {files_list.get('total', 0)} files") else: print(f"File listing failed: {response.status_code}") # Test storage statistics print("\n9. Testing storage statistics...") response = await client.get(f"{BASE_URL}/api/storage/stats") if response.status_code == 200: stats = response.json() print(f"Storage stats: {stats}") else: print(f"Storage stats failed: {response.status_code}") # Test bucket operations print("\n10. Testing bucket operations...") response = await client.post( f"{BASE_URL}/api/storage/buckets", params={"bucket_name": "test-bucket", "public": False} ) if response.status_code == 200: print(f"Bucket created: {response.json()}") else: print(f"Bucket creation: {response.status_code}") response = await client.get(f"{BASE_URL}/api/storage/buckets") if response.status_code == 200: print(f"Available buckets: {response.json()}") else: print(f"Bucket listing failed: {response.status_code}") # Test presigned URL generation print("\n11. Testing presigned URL generation...") response = await client.post( f"{BASE_URL}/api/files/presigned-upload", params={ "filename": "test_upload.txt", "content_type": "text/plain", "bucket": "default", "expires_in": 3600 } ) if response.status_code == 200: presigned = response.json() print(f"Presigned upload URL generated: {presigned.get('upload_url', 'N/A')[:50]}...") else: print(f"Presigned URL generation failed: {response.status_code}") # Test file deletion if file_id: print("\n12. Testing file deletion...") response = await client.delete( f"{BASE_URL}/api/files/{file_id}", params={"user_id": "test_user_123"} ) if response.status_code == 200: print(f"File deleted: {response.json()}") else: print(f"File deletion failed: {response.status_code}") async def test_large_file_upload(): """Test large file upload""" print("\n\nšŸ“¦ Testing Large File Upload...") # Create a larger test file (1MB) large_data = b"x" * (1024 * 1024) # 1MB of data async with httpx.AsyncClient(timeout=30.0) as client: files = { 'file': ('large_test.bin', large_data, 'application/octet-stream') } data = { 'user_id': 'test_user_123', 'bucket': 'default', 'public': 'false' } print("Uploading 1MB file...") response = await client.post( f"{BASE_URL}/api/files/upload", files=files, data=data ) if response.status_code == 200: result = response.json() print(f"Large file uploaded successfully: {result.get('file_id')}") print(f"File size: {result.get('size')} bytes") else: print(f"Large file upload failed: {response.status_code}") async def test_duplicate_detection(): """Test duplicate file detection""" print("\n\nšŸ” Testing Duplicate Detection...") async with httpx.AsyncClient() as client: # Upload the same file twice files = { 'file': ('duplicate_test.png', TEST_IMAGE_DATA, 'image/png') } data = { 'user_id': 'test_user_123', 'bucket': 'default', 'public': 'false' } print("Uploading file first time...") response1 = await client.post( f"{BASE_URL}/api/files/upload", files=files, data=data ) if response1.status_code == 200: result1 = response1.json() print(f"First upload: {result1.get('file_id')}") print("Uploading same file again...") response2 = await client.post( f"{BASE_URL}/api/files/upload", files=files, data=data ) if response2.status_code == 200: result2 = response2.json() print(f"Second upload: {result2.get('file_id')}") if result2.get('duplicate'): print("āœ… Duplicate detected successfully!") else: print("āŒ Duplicate not detected") async def main(): """Run all tests""" print("=" * 60) print("FILE MANAGEMENT SERVICE TEST SUITE") print("=" * 60) print(f"Started at: {datetime.now().isoformat()}") # Run tests await test_file_api() await test_large_file_upload() await test_duplicate_detection() print("\n" + "=" * 60) print("āœ… All tests completed!") print(f"Finished at: {datetime.now().isoformat()}") print("=" * 60) if __name__ == "__main__": asyncio.run(main())