Files
site11/services/files/backend/test_files.py
2025-09-28 20:41:57 +09:00

281 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Test script for File Management Service
"""
import asyncio
import httpx
import os
import json
from datetime import datetime
import base64
BASE_URL = "http://localhost:8014"
# Sample image for testing (1x1 pixel PNG)
TEST_IMAGE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg=="
TEST_IMAGE_DATA = base64.b64decode(TEST_IMAGE_BASE64)
async def test_file_api():
"""Test file management API endpoints"""
async with httpx.AsyncClient() as client:
print("\n📁 Testing File Management Service API...")
# Test health check
print("\n1. Testing health check...")
response = await client.get(f"{BASE_URL}/health")
print(f"Health check: {response.json()}")
# Test file upload
print("\n2. Testing file upload...")
files = {
'file': ('test_image.png', TEST_IMAGE_DATA, 'image/png')
}
data = {
'user_id': 'test_user_123',
'bucket': 'default',
'public': 'false',
'generate_thumbnail': 'true',
'tags': json.dumps({"test": "true", "category": "sample"})
}
response = await client.post(
f"{BASE_URL}/api/files/upload",
files=files,
data=data
)
if response.status_code == 200:
upload_result = response.json()
print(f"File uploaded: {upload_result}")
file_id = upload_result.get("file_id")
else:
print(f"Upload failed: {response.status_code} - {response.text}")
file_id = None
# Test multiple file upload
print("\n3. Testing multiple file upload...")
files = [
('files', ('test1.png', TEST_IMAGE_DATA, 'image/png')),
('files', ('test2.png', TEST_IMAGE_DATA, 'image/png')),
('files', ('test3.png', TEST_IMAGE_DATA, 'image/png'))
]
data = {
'user_id': 'test_user_123',
'bucket': 'default',
'public': 'false'
}
response = await client.post(
f"{BASE_URL}/api/files/upload-multiple",
files=files,
data=data
)
if response.status_code == 200:
print(f"Multiple files uploaded: {response.json()}")
else:
print(f"Multiple upload failed: {response.status_code}")
# Test file metadata retrieval
if file_id:
print("\n4. Testing file metadata retrieval...")
response = await client.get(f"{BASE_URL}/api/files/{file_id}/metadata")
if response.status_code == 200:
print(f"File metadata: {response.json()}")
else:
print(f"Metadata retrieval failed: {response.status_code}")
# Test thumbnail generation
print("\n5. Testing thumbnail retrieval...")
response = await client.get(
f"{BASE_URL}/api/files/{file_id}/thumbnail",
params={"width": 100, "height": 100}
)
if response.status_code == 200:
print(f"Thumbnail retrieved: {len(response.content)} bytes")
else:
print(f"Thumbnail retrieval failed: {response.status_code}")
# Test file download
print("\n6. Testing file download...")
response = await client.get(f"{BASE_URL}/api/files/{file_id}/download")
if response.status_code == 200:
print(f"File downloaded: {len(response.content)} bytes")
else:
print(f"Download failed: {response.status_code}")
# Test share link generation
print("\n7. Testing share link generation...")
response = await client.get(
f"{BASE_URL}/api/files/{file_id}/share",
params={"expires_in": 3600}
)
if response.status_code == 200:
share_result = response.json()
print(f"Share link generated: {share_result.get('share_url', 'N/A')[:50]}...")
else:
print(f"Share link generation failed: {response.status_code}")
# Test file listing
print("\n8. Testing file listing...")
response = await client.get(
f"{BASE_URL}/api/files",
params={
"user_id": "test_user_123",
"limit": 10,
"offset": 0
}
)
if response.status_code == 200:
files_list = response.json()
print(f"Files found: {files_list.get('total', 0)} files")
else:
print(f"File listing failed: {response.status_code}")
# Test storage statistics
print("\n9. Testing storage statistics...")
response = await client.get(f"{BASE_URL}/api/storage/stats")
if response.status_code == 200:
stats = response.json()
print(f"Storage stats: {stats}")
else:
print(f"Storage stats failed: {response.status_code}")
# Test bucket operations
print("\n10. Testing bucket operations...")
response = await client.post(
f"{BASE_URL}/api/storage/buckets",
params={"bucket_name": "test-bucket", "public": False}
)
if response.status_code == 200:
print(f"Bucket created: {response.json()}")
else:
print(f"Bucket creation: {response.status_code}")
response = await client.get(f"{BASE_URL}/api/storage/buckets")
if response.status_code == 200:
print(f"Available buckets: {response.json()}")
else:
print(f"Bucket listing failed: {response.status_code}")
# Test presigned URL generation
print("\n11. Testing presigned URL generation...")
response = await client.post(
f"{BASE_URL}/api/files/presigned-upload",
params={
"filename": "test_upload.txt",
"content_type": "text/plain",
"bucket": "default",
"expires_in": 3600
}
)
if response.status_code == 200:
presigned = response.json()
print(f"Presigned upload URL generated: {presigned.get('upload_url', 'N/A')[:50]}...")
else:
print(f"Presigned URL generation failed: {response.status_code}")
# Test file deletion
if file_id:
print("\n12. Testing file deletion...")
response = await client.delete(
f"{BASE_URL}/api/files/{file_id}",
params={"user_id": "test_user_123"}
)
if response.status_code == 200:
print(f"File deleted: {response.json()}")
else:
print(f"File deletion failed: {response.status_code}")
async def test_large_file_upload():
"""Test large file upload"""
print("\n\n📦 Testing Large File Upload...")
# Create a larger test file (1MB)
large_data = b"x" * (1024 * 1024) # 1MB of data
async with httpx.AsyncClient(timeout=30.0) as client:
files = {
'file': ('large_test.bin', large_data, 'application/octet-stream')
}
data = {
'user_id': 'test_user_123',
'bucket': 'default',
'public': 'false'
}
print("Uploading 1MB file...")
response = await client.post(
f"{BASE_URL}/api/files/upload",
files=files,
data=data
)
if response.status_code == 200:
result = response.json()
print(f"Large file uploaded successfully: {result.get('file_id')}")
print(f"File size: {result.get('size')} bytes")
else:
print(f"Large file upload failed: {response.status_code}")
async def test_duplicate_detection():
"""Test duplicate file detection"""
print("\n\n🔍 Testing Duplicate Detection...")
async with httpx.AsyncClient() as client:
# Upload the same file twice
files = {
'file': ('duplicate_test.png', TEST_IMAGE_DATA, 'image/png')
}
data = {
'user_id': 'test_user_123',
'bucket': 'default',
'public': 'false'
}
print("Uploading file first time...")
response1 = await client.post(
f"{BASE_URL}/api/files/upload",
files=files,
data=data
)
if response1.status_code == 200:
result1 = response1.json()
print(f"First upload: {result1.get('file_id')}")
print("Uploading same file again...")
response2 = await client.post(
f"{BASE_URL}/api/files/upload",
files=files,
data=data
)
if response2.status_code == 200:
result2 = response2.json()
print(f"Second upload: {result2.get('file_id')}")
if result2.get('duplicate'):
print("✅ Duplicate detected successfully!")
else:
print("❌ Duplicate not detected")
async def main():
"""Run all tests"""
print("=" * 60)
print("FILE MANAGEMENT SERVICE TEST SUITE")
print("=" * 60)
print(f"Started at: {datetime.now().isoformat()}")
# Run tests
await test_file_api()
await test_large_file_upload()
await test_duplicate_detection()
print("\n" + "=" * 60)
print("✅ All tests completed!")
print(f"Finished at: {datetime.now().isoformat()}")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())