#!/usr/bin/env python3 """ Direct dequeue test """ import asyncio import redis.asyncio as redis import json async def test_dequeue(): """Test dequeue directly""" # Connect to Redis redis_client = await redis.from_url( "redis://redis:6379", encoding="utf-8", decode_responses=True ) print("Connected to Redis") # Check queue length length = await redis_client.llen("queue:rss_collection") print(f"Queue length: {length}") if length > 0: # Get the first item item = await redis_client.lrange("queue:rss_collection", 0, 0) print(f"First item preview: {item[0][:200]}...") # Try blpop with timeout print("Trying blpop with timeout=5...") result = await redis_client.blpop("queue:rss_collection", 5) if result: queue, data = result print(f"Successfully dequeued from {queue}") print(f"Data: {data[:200]}...") # Parse the message try: message = json.loads(data) print(f"Message ID: {message.get('message_id')}") print(f"Queue Name: {message.get('queue_name')}") if 'job' in message: job = message['job'] print(f"Job ID: {job.get('job_id')}") print(f"Keyword: {job.get('keyword')}") except Exception as e: print(f"Failed to parse message: {e}") else: print("blpop timed out - no result") else: print("Queue is empty") await redis_client.close() if __name__ == "__main__": asyncio.run(test_dequeue())