diff --git a/docker-compose.yml b/docker-compose.yml index f22ce3f..bc26e5e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.9" - services: redis: image: redis:7-alpine diff --git a/src/fq/queue.py b/src/fq/queue.py index 1c150bb..6434dae 100644 --- a/src/fq/queue.py +++ b/src/fq/queue.py @@ -87,12 +87,44 @@ async def _initialize(self): else: raise FQException("Unknown redis conn_type: %s" % redis_connection_type) + await self._validate_redis_connection() self._load_lua_scripts() + async def _validate_redis_connection(self): + """Ping redis once to surface bad connection details early.""" + if self._r is None: + raise FQException("Redis client is not initialized") + + ping = getattr(self._r, "ping", None) + if not callable(ping): + return + + try: + result = await ping() + except Exception as exc: + raise FQException("Failed to connect to Redis: %s" % exc) from exc + + if result is False: + raise FQException("Failed to connect to Redis: ping returned False") + def _load_config(self): """Read the configuration file and load it into memory.""" + if not os.path.isfile(self.config_path): + raise FQException("Config file not found: %s" % self.config_path) + self._config = configparser.ConfigParser() - self._config.read(self.config_path) + read_files = self._config.read(self.config_path) + + if not read_files: + raise FQException("Unable to read config file: %s" % self.config_path) + + if not self._config.has_section("redis") or not self._config.has_section( + "fq" + ): + raise FQException( + "Config file missing required sections: redis, fq (path: %s)" + % self.config_path + ) def redis_client(self): return self._r diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py new file mode 100644 index 0000000..87c8721 --- /dev/null +++ b/tests/test_edge_cases.py @@ -0,0 +1,274 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. + + +import os +import tempfile +import unittest +from unittest.mock import patch + +from fq import FQ +from fq.utils import is_valid_identifier +from fq.exceptions import BadArgumentException, FQException + + +class FakeCluster: + def __init__(self, startup_nodes=None, decode_responses=False, socket_timeout=None): + self.startup_nodes = startup_nodes or [] + self.decode_responses = decode_responses + self.socket_timeout = socket_timeout + + def register_script(self, _): + async def _runner(*args, **kwargs): + return [] + + return _runner + + async def ping(self): + return True + + +class FakeRedisForClose: + def __init__(self): + self.closed = False + self.waited = False + self.disconnected = False + self.connection_pool = self + + async def close(self): + self.closed = True + + async def wait_closed(self): + self.waited = True + + async def disconnect(self): + self.disconnected = True + + +class FakeRedisForDeepStatus: + def __init__(self): + self.key_set = None + + async def set(self, key, value): + self.key_set = (key, value) + return True + + +class FakeRedisConnectionFailure: + def __init__(self, *args, **kwargs): + pass + + async def ping(self): + raise ConnectionError("boom") + + def register_script(self, _): + async def _runner(*args, **kwargs): + return [] + + return _runner + + +class FakeLuaDequeue: + def __init__(self): + self.called = False + + async def __call__(self, keys=None, args=None): + self.called = True + return [b"q1", b"j1", None, b"0"] + + +class FakePipe: + def __init__(self): + self.hdel_calls = [] + self.deleted = [] + self.executed = False + + def hdel(self, *args): + self.hdel_calls.append(args) + + def delete(self, *args): + self.deleted.append(args) + + async def execute(self): + self.executed = True + + +class FakeRedisForClear: + def __init__(self): + self.pipe = FakePipe() + self.deleted_keys = [] + + async def zrem(self, _primary_set, _queue_id): + return 1 + + async def lrange(self, _key, _start, _end): + return [None, b"job-bytes", "job-str"] + + def pipeline(self): + return self.pipe + + async def delete(self, key): + self.deleted_keys.append(key) + + +class TestEdgeCases(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + cwd = os.path.dirname(os.path.realpath(__file__)) + self.config_path = os.path.join(cwd, "test.conf") + self.fq_instance = None + + async def asyncTearDown(self): + """Clean up Redis state and close connections after each test.""" + # If a test initialized FQ with real Redis, clean up + if self.fq_instance is not None: + try: + if self.fq_instance._r is not None: + await self.fq_instance._r.flushdb() + await self.fq_instance.close() + except Exception: + # Ignore errors during cleanup - tests may have mocked or closed connections + # This prevents tearDown failures from masking test failures + pass + self.fq_instance = None + + def test_missing_config_file_raises(self): + with self.assertRaisesRegex(FQException, "Config file not found"): + FQ("/tmp/does-not-exist.conf") + + async def test_initialize_fails_fast_on_bad_redis(self): + with patch("fq.queue.Redis", FakeRedisConnectionFailure): + fq = FQ(self.config_path) + with self.assertRaisesRegex(FQException, "Failed to connect to Redis"): + await fq.initialize() + + async def test_cluster_initialization(self): + """Covers clustered Redis path (queue.py lines 69-75, 104-106).""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: + f.write( + """[fq] +job_expire_interval : 5000 +job_requeue_interval : 5000 +default_job_requeue_limit : -1 + +[redis] +db : 0 +key_prefix : test_fq_cluster +conn_type : tcp_sock +host : 127.0.0.1 +port : 6379 +clustered : true +password : +""" + ) + config_path = f.name + + try: + with patch("fq.queue.RedisCluster", FakeCluster): + fq = FQ(config_path) + await fq._initialize() + self.assertIsInstance(fq.redis_client(), FakeCluster) + await fq.close() + finally: + os.unlink(config_path) + + async def test_dequeue_payload_none(self): + """Covers dequeue branch where payload is None (queue.py line 212).""" + fq = FQ(self.config_path) + self.fq_instance = fq + await fq._initialize() + fake_dequeue = FakeLuaDequeue() + fq._lua_dequeue = fake_dequeue + result = await fq.dequeue() + self.assertEqual(result["status"], "failure") + self.assertTrue(fake_dequeue.called) + + async def test_clear_queue_delete_only(self): + """Covers clear_queue else branch (queue.py lines 499, 502).""" + fq = FQ(self.config_path) + self.fq_instance = fq + await fq._initialize() + await fq._r.flushdb() + response = await fq.clear_queue(queue_type="noqueue", queue_id="missing") + self.assertEqual(response["status"], "Failure") + + async def test_close_fallback_paths(self): + """Covers close() fallback paths (queue.py lines 528-549).""" + fq = FQ(self.config_path) + fq._r = FakeRedisForClose() + await fq.close() + self.assertIsNone(fq._r) + + async def test_deep_status_calls_set(self): + """Covers deep_status (queue.py line 420).""" + fq = FQ(self.config_path) + fq._key_prefix = fq._config.get("redis", "key_prefix") + fq._r = FakeRedisForDeepStatus() + await fq.deep_status() + self.assertEqual( + fq._r.key_set, + ("fq:deep_status:{}".format(fq._key_prefix), "sharq_deep_status"), + ) + + def test_is_valid_identifier_non_string(self): + """Covers utils.is_valid_identifier non-string check (utils.py line 22).""" + self.assertFalse(is_valid_identifier(123)) + self.assertFalse(is_valid_identifier(None)) + self.assertFalse(is_valid_identifier(["a"])) + + async def test_reload_config_with_new_path(self): + """Covers reload_config branch (queue.py lines 104-106).""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: + f.write( + """[fq] +job_expire_interval : 5000 +job_requeue_interval : 5000 +default_job_requeue_limit : -1 + +[redis] +db : 0 +key_prefix : new_prefix +conn_type : tcp_sock +port : 6379 +host : 127.0.0.1 +clustered : false +password : +""" + ) + new_config = f.name + + try: + fq = FQ(self.config_path) + fq.reload_config(new_config) + self.assertEqual(fq.config_path, new_config) + self.assertEqual(fq._config.get("redis", "key_prefix"), "new_prefix") + finally: + os.unlink(new_config) + + async def test_clear_queue_purge_all_with_mixed_job_ids(self): + """Covers purge_all loop branches (queue.py lines 463-468, 474-479).""" + fq = FQ(self.config_path) + fq._key_prefix = fq._config.get("redis", "key_prefix") + fq._r = FakeRedisForClear() + response = await fq.clear_queue("qt", "qid", purge_all=True) + self.assertEqual(response["status"], "Success") + self.assertTrue(fq._r.pipe.executed) + + async def test_get_queue_length_invalid_params(self): + """Covers validation branches (queue.py lines 499, 502).""" + fq = FQ(self.config_path) + with self.assertRaises(BadArgumentException): + await fq.get_queue_length("bad type", "qid") + with self.assertRaises(BadArgumentException): + await fq.get_queue_length("qtype", "bad id") + + async def test_deep_status_real_redis(self): + """Covers deep_status with real redis (queue.py line 420).""" + fq = FQ(self.config_path) + self.fq_instance = fq + await fq._initialize() + result = await fq.deep_status() + self.assertTrue(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_func.py b/tests/test_func.py index 776a8b8..0116167 100644 --- a/tests/test_func.py +++ b/tests/test_func.py @@ -7,7 +7,10 @@ import asyncio import unittest import msgpack +import tempfile +from unittest.mock import AsyncMock, MagicMock from fq import FQ +from fq.exceptions import FQException from fq.utils import generate_epoch, deserialize_payload @@ -1724,6 +1727,226 @@ async def test_clear_queue_with_non_existing_queue_id_with_purge(self): self.assertEqual(queue_clear_response["status"], "Failure") self.assertEqual(queue_clear_response["message"], "No queued calls found") + async def test_deep_status(self): + """Test deep_status method for Redis availability check.""" + result = await self.queue.deep_status() + self.assertIsNotNone(result) + + async def test_initialize_public_method(self): + """Test the public initialize() method.""" + cwd = os.path.dirname(os.path.realpath(__file__)) + config_path = os.path.join(cwd, "test.conf") + fq = FQ(config_path) + + # Public initialize() should work + await fq.initialize() + + # Verify initialization succeeded + self.assertIsNotNone(fq._r) + self.assertIsNotNone(fq._lua_enqueue) + + # Cleanup + await fq.close() + + async def test_reload_lua_scripts(self): + """Test reload_lua_scripts method.""" + # Just verify it doesn't crash and scripts work after reload + self.queue.reload_lua_scripts() + + # Verify scripts are still functional + job_id = self._get_job_id() + response = await self.queue.enqueue( + payload=self._test_payload_1, + interval=1000, + job_id=job_id, + queue_id=self._test_queue_id, + queue_type=self._test_queue_type, + ) + self.assertEqual(response["status"], "queued") + + async def test_get_queue_length(self): + """Test get_queue_length method.""" + # Initially empty + length = await self.queue.get_queue_length( + self._test_queue_type, self._test_queue_id + ) + self.assertEqual(length, 0) + + # Add a job + job_id = self._get_job_id() + await self.queue.enqueue( + payload=self._test_payload_1, + interval=1000, + job_id=job_id, + queue_id=self._test_queue_id, + queue_type=self._test_queue_type, + ) + + # Check length + length = await self.queue.get_queue_length( + self._test_queue_type, self._test_queue_id + ) + self.assertEqual(length, 1) + + async def test_redis_client_getter(self): + """Test redis_client() method.""" + client = self.queue.redis_client() + self.assertIsNotNone(client) + # Verify it's the same client + self.assertIs(client, self.queue._r) + + async def test_close_properly_closes_connection(self): + """Test close() method properly closes Redis connection.""" + cwd = os.path.dirname(os.path.realpath(__file__)) + config_path = os.path.join(cwd, "test.conf") + fq = FQ(config_path) + await fq._initialize() + + self.assertIsNotNone(fq._r) + await fq.close() + self.assertIsNone(fq._r) + + async def test_close_with_none_client(self): + """Test close() when redis client is None.""" + cwd = os.path.dirname(os.path.realpath(__file__)) + config_path = os.path.join(cwd, "test.conf") + fq = FQ(config_path) + # Don't initialize, so _r is None + await fq.close() # Should not crash + self.assertIsNone(fq._r) + + async def test_initialize_unix_socket_connection(self): + """Test initialization with Unix socket connection - tests line 59.""" + # Create a temporary config with unix_sock + with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f: + f.write("""[fq] +job_expire_interval : 5000 +job_requeue_interval : 5000 +default_job_requeue_limit : -1 + +[redis] +db : 0 +key_prefix : test_fq_unix +conn_type : unix_sock +unix_socket_path : /tmp/redis_nonexistent.sock +""") + config_path = f.name + + try: + # Create a mock Redis class to capture initialization parameters + mock_redis_instance = MagicMock() + mock_redis_instance.ping = AsyncMock(return_value=True) + mock_redis_instance.register_script = MagicMock(return_value=MagicMock()) + mock_redis_instance.aclose = AsyncMock() + + redis_init_kwargs = {} + + def mock_redis_constructor(**kwargs): + redis_init_kwargs.update(kwargs) + return mock_redis_instance + + # Patch Redis to intercept the initialization + with unittest.mock.patch('fq.queue.Redis', side_effect=mock_redis_constructor): + fq = FQ(config_path) + await fq._initialize() + + # Verify that Redis was initialized with unix_socket_path + self.assertIn('unix_socket_path', redis_init_kwargs) + self.assertEqual(redis_init_kwargs['unix_socket_path'], '/tmp/redis_nonexistent.sock') + self.assertEqual(int(redis_init_kwargs['db']), 0) + + await fq.close() + finally: + os.unlink(config_path) + + async def test_initialize_unknown_connection_type(self): + """Test initialization with invalid connection type raises error - tests line 88.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f: + f.write("""[fq] +job_expire_interval : 5000 +job_requeue_interval : 5000 +default_job_requeue_limit : -1 + +[redis] +db : 0 +key_prefix : test_fq +conn_type : invalid_type +""") + config_path = f.name + + try: + fq = FQ(config_path) + # This tests line 88 - unknown conn_type + with self.assertRaisesRegex(FQException, "Unknown redis conn_type"): + await fq._initialize() + finally: + os.unlink(config_path) + + async def test_clear_queue_with_purge_all_and_string_job_uuid(self): + """Test clear_queue with purge_all=True handles string job UUIDs - tests lines 464, 468.""" + job_id = self._get_job_id() + await self.queue.enqueue( + payload=self._test_payload_1, + interval=10000, + job_id=job_id, + queue_id=self._test_queue_id, + queue_type=self._test_queue_type, + ) + + # Clear with purge_all + result = await self.queue.clear_queue( + queue_type=self._test_queue_type, + queue_id=self._test_queue_id, + purge_all=True + ) + self.assertEqual(result["status"], "Success") + self.assertIn("purged", result["message"]) + + async def test_deserialize_payload_old_format(self): + """Test deserialize_payload with old quote-wrapped format - tests utils.py line 63.""" + test_data = {"key": "value", "number": 42} + # Simulate old format: msgpack wrapped in quotes + packed = msgpack.packb(test_data, use_bin_type=True) + old_format_payload = b'"' + packed + b'"' + + result = deserialize_payload(old_format_payload) + self.assertEqual(result, test_data) + + async def test_deserialize_payload_new_format(self): + """Test deserialize_payload handles new unwrapped format - tests utils.py line 63.""" + test_data = {"key": "value", "nested": {"inner": "data"}} + packed = msgpack.packb(test_data, use_bin_type=True) + + result = deserialize_payload(packed) + self.assertEqual(result, test_data) + + async def test_dequeue_empty_queue_returns_failure(self): + """Test dequeue on empty queue returns failure status - tests queue.py line 212.""" + result = await self.queue.dequeue(queue_type="nonexistent_type") + self.assertEqual(result["status"], "failure") + # Verify no payload key in response + self.assertNotIn("payload", result) + + async def test_deep_status_redis_availability(self): + """Test deep_status method checks Redis availability - tests queue.py line 420.""" + result = await self.queue.deep_status() + # Should succeed with Redis running + self.assertIsNotNone(result) + + async def test_convert_to_str_with_mixed_types(self): + """Test convert_to_str handles both bytes and strings.""" + from fq.utils import convert_to_str + + # Mixed bytes and string set + mixed_set = {b"key1", "key2", b"key3"} + result = convert_to_str(mixed_set) + + # All should be strings + self.assertTrue(all(isinstance(x, str) for x in result)) + self.assertIn("key1", result) + self.assertIn("key2", result) + self.assertIn("key3", result) + async def asyncTearDown(self): await self.queue._r.flushdb() await self.queue.close() diff --git a/uv.lock b/uv.lock index ec3d39c..b1ce756 100644 --- a/uv.lock +++ b/uv.lock @@ -257,7 +257,6 @@ wheels = [ [[package]] name = "flowdacity-queue" -version = "0.1.0" source = { editable = "." } dependencies = [ { name = "msgpack" },