Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 53 additions & 32 deletions tests/test_routes.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# tests/test_routes.py

# -*- coding: utf-8 -*-
# Copyright ...
# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details.

import os
import unittest
import asyncio
import ujson as json
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, patch
from httpx import AsyncClient, ASGITransport
from starlette.types import ASGIApp
from redis.exceptions import LockError
from fq_server import setup_server
from fq_server.server import FQServer

Expand Down Expand Up @@ -297,16 +296,17 @@ async def test_enqueue_get_queue_length_exception(self):
"interval": 1000,
}

# Mock get_queue_length to fail, but let enqueue succeed normally
with patch.object(self.queue, "get_queue_length", side_effect=Exception("Redis error")):
response = await self.client.post(
"/enqueue/sms/test_queue_3/",
content=json.dumps(request_params),
headers={"Content-Type": "application/json"},
)
# Even if get_queue_length fails, enqueue proceeds (prints error)
# The exception is caught and printed; enqueue still attempts
# Check if response indicates the error
self.assertIn(response.status_code, [201, 400])
# When get_queue_length fails, enqueue still succeeds with current_queue_length=0
self.assertEqual(response.status_code, 201)
self.assertEqual(response.json()["status"], "queued")
self.assertEqual(response.json()["current_queue_length"], 0)

async def test_enqueue_queue_enqueue_exception(self):
"""Test enqueue when queue.enqueue() raises an exception."""
Expand Down Expand Up @@ -416,14 +416,15 @@ async def test_metrics_with_queue_type_exception(self):
self.assertEqual(response.status_code, 400)

async def test_clear_queue_malformed_json(self):
"""Test clear_queue - testing through the server's request body parsing."""
# Note: httpx doesn't easily let us send raw body with DELETE,
# so we test the exception path via mocking instead
with patch.object(self.queue, "clear_queue", side_effect=Exception("Clear error")):
# The server will still try to parse a body even if empty
response = await self.client.delete("/deletequeue/sms/johndoe/")
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()["status"], "failure")
"""Test clear_queue with malformed JSON body."""
response = await self.client.request(
"DELETE",
"/deletequeue/sms/johndoe/",
content=b"invalid json",
headers={"Content-Type": "application/json"},
)
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()["status"], "failure")

async def test_clear_queue_exception(self):
"""Test clear_queue when queue.clear_queue() raises an exception."""
Expand Down Expand Up @@ -455,7 +456,7 @@ async def test_deep_status_exception(self):
"""Test deep_status when queue.deep_status() raises an exception."""
with patch.object(self.queue, "deep_status", side_effect=Exception("Status check failed")):
with self.assertRaises(Exception):
response = await self.client.get("/deepstatus/")
await self.client.get("/deepstatus/")

async def test_deep_status_success(self):
"""Test deep_status successful response."""
Expand Down Expand Up @@ -499,21 +500,35 @@ async def test_requeue_with_lock_disabled(self):

async def test_requeue_with_lock_lock_error(self):
"""Test requeue_with_lock when lock acquisition fails with LockError."""
from redis.exceptions import LockError
server = self.server

# Use a real redis lock that times out
requeue_task = asyncio.create_task(server.requeue_with_lock())

# Let it try to acquire lock and timeout (the default behavior when another process holds it)
await asyncio.sleep(0.15)
# Create an async context manager that raises LockError on enter
class FailingLock:
async def __aenter__(self):
raise LockError("Failed to acquire lock")

async def __aexit__(self, *args):
pass

# Cancel it
requeue_task.cancel()
# Mock redis_client with a lock method that returns the failing lock
mock_redis = AsyncMock()
# Make lock a regular (non-async) function that returns the context manager
mock_redis.lock = lambda *args, **kwargs: FailingLock()

try:
await requeue_task
except asyncio.CancelledError:
pass # Expected behavior
with patch.object(server.queue, "redis_client", return_value=mock_redis):
requeue_task = asyncio.create_task(server.requeue_with_lock())

# Let it try to acquire lock and handle LockError (sleeps and continues)
await asyncio.sleep(0.15)

# Cancel it
requeue_task.cancel()

try:
await requeue_task
except asyncio.CancelledError:
pass # Expected - loop continues after LockError, then cancelled

async def test_requeue_with_lock_inner_exception(self):
"""Test requeue_with_lock when requeue() inside lock context fails."""
Expand Down Expand Up @@ -563,8 +578,9 @@ async def test_lifespan_startup_shutdown(self):
# Exit lifespan (shutdown)
try:
await lifespan_cm.__aexit__(None, None, None)
except Exception:
pass # May raise if task is cancelled
except asyncio.CancelledError:
# Expected if the requeue task is cancelled during shutdown
pass

# Task should be cancelled or done
await asyncio.sleep(0.05)
Expand All @@ -575,17 +591,22 @@ async def test_lifespaninitializes_queue(self):
config_path = os.path.join(os.path.dirname(__file__), "test.conf")
server = setup_server(config_path)

with patch.object(server.queue, "initialize", new_callable=AsyncMock) as mock_init:
# Stub out both queue.initialize and the background requeue task to make
# startup/shutdown deterministic and avoid hitting an uninitialized queue.
with patch.object(server.queue, "initialize", new_callable=AsyncMock) as mock_init, \
patch.object(server, "requeue_with_lock", new_callable=AsyncMock):
lifespan_cm = server._lifespan(server.app)
await lifespan_cm.__aenter__()

mock_init.assert_called_once()

# Cleanup
server._requeue_task.cancel()
if server._requeue_task is not None and not server._requeue_task.done():
server._requeue_task.cancel()
try:
await lifespan_cm.__aexit__(None, None, None)
except:
except asyncio.CancelledError:
# Expected if the requeue task is cancelled during shutdown
pass


Expand Down
Loading