diff --git a/backend/app/core/metrics/database.py b/backend/app/core/metrics/database.py index 22ef837f..4590309c 100644 --- a/backend/app/core/metrics/database.py +++ b/backend/app/core/metrics/database.py @@ -89,14 +89,13 @@ def record_idempotency_duplicate_blocked(self, event_type: str) -> None: def record_idempotency_processing_duration(self, duration_seconds: float, operation: str) -> None: self.idempotency_processing_duration.record(duration_seconds, attributes={"operation": operation}) - def update_idempotency_keys_active(self, count: int, prefix: str) -> None: - # Track the delta for gauge-like behavior - key = f"_idempotency_keys_{prefix}" - current_val = getattr(self, key, 0) - delta = count - current_val - if delta != 0: - self.idempotency_keys_active.add(delta, attributes={"key_prefix": prefix}) - setattr(self, key, count) + def increment_idempotency_keys(self, prefix: str) -> None: + """Increment active idempotency keys count when a new key is created.""" + self.idempotency_keys_active.add(1, attributes={"key_prefix": prefix}) + + def decrement_idempotency_keys(self, prefix: str) -> None: + """Decrement active idempotency keys count when a key is removed.""" + self.idempotency_keys_active.add(-1, attributes={"key_prefix": prefix}) def record_idempotent_event_processed(self, event_type: str, result: str) -> None: self.event_store_operations.add( diff --git a/backend/app/core/providers.py b/backend/app/core/providers.py index b616506c..fa1c2bd9 100644 --- a/backend/app/core/providers.py +++ b/backend/app/core/providers.py @@ -61,8 +61,7 @@ from app.services.execution_service import ExecutionService from app.services.grafana_alert_processor import GrafanaAlertProcessor from app.services.idempotency import IdempotencyConfig, IdempotencyManager -from app.services.idempotency.idempotency_manager import create_idempotency_manager -from app.services.idempotency.middleware import IdempotentConsumerWrapper +from app.services.idempotency.middleware import IdempotentEventDispatcher from app.services.idempotency.redis_repository import RedisIdempotencyRepository from app.services.k8s_worker import KubernetesWorker from app.services.kafka_event_service import KafkaEventService @@ -185,17 +184,10 @@ def get_idempotency_repository(self, redis_client: redis.Redis) -> RedisIdempote return RedisIdempotencyRepository(redis_client, key_prefix="idempotency") @provide - async def get_idempotency_manager( + def get_idempotency_manager( self, repo: RedisIdempotencyRepository, logger: logging.Logger, database_metrics: DatabaseMetrics - ) -> AsyncIterator[IdempotencyManager]: - manager = create_idempotency_manager( - repository=repo, config=IdempotencyConfig(), logger=logger, database_metrics=database_metrics - ) - await manager.initialize() - try: - yield manager - finally: - await manager.close() + ) -> IdempotencyManager: + return IdempotencyManager(IdempotencyConfig(), repo, logger, database_metrics) class EventProvider(Provider): @@ -579,7 +571,6 @@ async def _provide_saga_orchestrator( schema_registry: SchemaRegistryManager, settings: Settings, event_store: EventStore, - idempotency_manager: IdempotencyManager, resource_allocation_repository: ResourceAllocationRepository, logger: logging.Logger, event_metrics: EventMetrics, @@ -591,7 +582,6 @@ async def _provide_saga_orchestrator( schema_registry_manager=schema_registry, settings=settings, event_store=event_store, - idempotency_manager=idempotency_manager, resource_allocation_repository=resource_allocation_repository, config=_create_default_saga_config(), logger=logger, @@ -612,13 +602,19 @@ async def _provide_execution_coordinator( event_metrics: EventMetrics, ) -> AsyncIterator[ExecutionCoordinator]: """Shared factory for ExecutionCoordinator with lifecycle management.""" + dispatcher = IdempotentEventDispatcher( + logger=logger, + idempotency_manager=idempotency_manager, + key_strategy=KeyStrategy.EVENT_BASED, + ttl_seconds=7200, + ) async with ExecutionCoordinator( producer=kafka_producer, schema_registry_manager=schema_registry, settings=settings, event_store=event_store, execution_repository=execution_repository, - idempotency_manager=idempotency_manager, + dispatcher=dispatcher, logger=logger, coordinator_metrics=coordinator_metrics, event_metrics=event_metrics, @@ -715,9 +711,16 @@ class K8sWorkerProvider(Provider): scope = Scope.APP @provide - def get_k8s_worker_dispatcher(self, logger: logging.Logger) -> EventDispatcher: - """Create EventDispatcher for K8s worker.""" - return EventDispatcher(logger=logger) + def get_k8s_worker_dispatcher( + self, logger: logging.Logger, idempotency_manager: IdempotencyManager + ) -> EventDispatcher: + """Create idempotent EventDispatcher for K8s worker.""" + return IdempotentEventDispatcher( + logger=logger, + idempotency_manager=idempotency_manager, + key_strategy=KeyStrategy.CONTENT_HASH, + ttl_seconds=3600, + ) @provide def get_kubernetes_worker( @@ -746,10 +749,9 @@ async def get_k8s_worker_consumer( dispatcher: EventDispatcher, schema_registry: SchemaRegistryManager, settings: Settings, - idempotency_manager: IdempotencyManager, logger: logging.Logger, event_metrics: EventMetrics, - ) -> AsyncIterator[IdempotentConsumerWrapper]: + ) -> AsyncIterator[UnifiedConsumer]: """Create and start consumer for K8s worker.""" consumer_config = ConsumerConfig( bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, @@ -770,24 +772,14 @@ async def get_k8s_worker_consumer( event_metrics=event_metrics, ) - idempotent_consumer = IdempotentConsumerWrapper( - consumer=consumer, - idempotency_manager=idempotency_manager, - dispatcher=dispatcher, - logger=logger, - default_key_strategy=KeyStrategy.CONTENT_HASH, - default_ttl_seconds=3600, - enable_for_all_handlers=True, - ) - - await idempotent_consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.K8S_WORKER])) + await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.K8S_WORKER])) logger.info("K8s worker consumer started") try: - yield idempotent_consumer + yield consumer finally: await worker.wait_for_active_creations() - await idempotent_consumer.stop() + await consumer.stop() logger.info("K8s worker consumer stopped") diff --git a/backend/app/domain/idempotency/__init__.py b/backend/app/domain/idempotency/__init__.py index 8529b2de..9e9ffcd0 100644 --- a/backend/app/domain/idempotency/__init__.py +++ b/backend/app/domain/idempotency/__init__.py @@ -1,6 +1,5 @@ from .models import ( IdempotencyRecord, - IdempotencyStats, IdempotencyStatus, KeyStrategy, ) @@ -8,6 +7,5 @@ __all__ = [ "IdempotencyStatus", "IdempotencyRecord", - "IdempotencyStats", "KeyStrategy", ] diff --git a/backend/app/domain/idempotency/models.py b/backend/app/domain/idempotency/models.py index 6d4eca45..839d07d1 100644 --- a/backend/app/domain/idempotency/models.py +++ b/backend/app/domain/idempotency/models.py @@ -11,7 +11,6 @@ class IdempotencyStatus(StringEnum): PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" - EXPIRED = "expired" class KeyStrategy(StringEnum): @@ -34,10 +33,3 @@ class IdempotencyRecord: processing_duration_ms: int | None = None error: str | None = None result_json: str | None = None - - -@dataclass -class IdempotencyStats: - total_keys: int - status_counts: dict[IdempotencyStatus, int] - prefix: str diff --git a/backend/app/events/core/dispatcher.py b/backend/app/events/core/dispatcher.py index bc69a4a3..c521d6bf 100644 --- a/backend/app/events/core/dispatcher.py +++ b/backend/app/events/core/dispatcher.py @@ -6,7 +6,6 @@ from app.domain.enums.events import EventType from app.domain.events.typed import DomainEvent -from app.infrastructure.kafka.mappings import get_event_class_for_type T = TypeVar("T", bound=DomainEvent) EventHandler: TypeAlias = Callable[[DomainEvent], Awaitable[None]] @@ -18,20 +17,20 @@ class EventDispatcher: This dispatcher eliminates the need for manual if/elif routing by maintaining a direct mapping from event types to their handlers. + + Subclasses may override ``_wrap_handler`` to intercept handler registration + (e.g. ``IdempotentEventDispatcher`` adds idempotency protection). """ def __init__(self, logger: logging.Logger) -> None: self.logger = logger + # Map event types to their handlers self._handlers: dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]] = defaultdict(list) - # Map topics to event types that can appear on them - self._topic_event_types: dict[str, set[type[DomainEvent]]] = defaultdict(set) - - # Metrics per event type - self._event_metrics: dict[EventType, dict[str, int]] = defaultdict( - lambda: {"processed": 0, "failed": 0, "skipped": 0} - ) + def _wrap_handler(self, handler: EventHandler) -> EventHandler: + """Hook for subclasses to wrap handlers at registration time.""" + return handler def register( self, event_type: EventType @@ -51,7 +50,7 @@ async def handle_execution(event: ExecutionRequestedEvent) -> None: def decorator(handler: Callable[[T], Awaitable[None]]) -> Callable[[T], Awaitable[None]]: self.logger.info(f"Registering handler '{handler.__name__}' for event type '{event_type}'") # Safe: dispatch() routes by event_type, guaranteeing correct types at runtime - self._handlers[event_type].append(handler) # type: ignore[arg-type] + self._handlers[event_type].append(self._wrap_handler(handler)) # type: ignore[arg-type] return handler return decorator @@ -65,27 +64,7 @@ def register_handler(self, event_type: EventType, handler: EventHandler) -> None handler: The async handler function """ self.logger.info(f"Registering handler '{handler.__name__}' for event type '{event_type}'") - self._handlers[event_type].append(handler) - - def remove_handler(self, event_type: EventType, handler: EventHandler) -> bool: - """ - Remove a specific handler for an event type. - - Args: - event_type: The event type to remove handler from - handler: The handler function to remove - - Returns: - True if handler was found and removed, False otherwise - """ - if event_type in self._handlers and handler in self._handlers[event_type]: - self._handlers[event_type].remove(handler) - self.logger.info(f"Removed handler '{handler.__name__}' for event type '{event_type}'") - # Clean up empty lists - if not self._handlers[event_type]: - del self._handlers[event_type] - return True - return False + self._handlers[event_type].append(self._wrap_handler(handler)) async def dispatch(self, event: DomainEvent) -> None: """ @@ -102,25 +81,14 @@ async def dispatch(self, event: DomainEvent) -> None: ) if not handlers: - self._event_metrics[event_type]["skipped"] += 1 self.logger.debug(f"No handlers registered for event type {event_type}") return self.logger.debug(f"Dispatching {event_type} to {len(handlers)} handler(s)") # Run handlers concurrently for better performance - tasks = [] - for handler in handlers: - tasks.append(self._execute_handler(handler, event)) - - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Count successes and failures - for result in results: - if isinstance(result, Exception): - self._event_metrics[event_type]["failed"] += 1 - else: - self._event_metrics[event_type]["processed"] += 1 + tasks = [self._execute_handler(handler, event) for handler in handlers] + await asyncio.gather(*tasks) async def _execute_handler(self, handler: EventHandler, event: DomainEvent) -> None: """ @@ -140,38 +108,3 @@ async def _execute_handler(self, handler: EventHandler, event: DomainEvent) -> N ) raise - def get_topics_for_registered_handlers(self) -> set[str]: - """ - Get all topics that have registered handlers. - - Returns: - Set of topic names that should be subscribed to - """ - topics = set() - for event_type in self._handlers.keys(): - # Find event class for this type - event_class = get_event_class_for_type(event_type) - if event_class and hasattr(event_class, "topic"): - topics.add(str(event_class.topic)) - return topics - - def get_metrics(self) -> dict[str, dict[str, int]]: - """Get processing metrics for all event types.""" - return {event_type: metrics for event_type, metrics in self._event_metrics.items()} - - def clear_handlers(self) -> None: - """Clear all registered handlers (useful for testing).""" - self._handlers.clear() - self.logger.info("All event handlers cleared") - - def get_handlers(self, event_type: EventType) -> list[Callable[[DomainEvent], Awaitable[None]]]: - """Get all handlers for a specific event type.""" - return self._handlers.get(event_type, []).copy() - - def get_all_handlers(self) -> dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]]: - """Get all registered handlers (returns a copy).""" - return {k: v.copy() for k, v in self._handlers.items()} - - def replace_handlers(self, event_type: EventType, handlers: list[Callable[[DomainEvent], Awaitable[None]]]) -> None: - """Replace all handlers for a specific event type.""" - self._handlers[event_type] = handlers diff --git a/backend/app/services/coordinator/coordinator.py b/backend/app/services/coordinator/coordinator.py index e9e0591b..f938cac8 100644 --- a/backend/app/services/coordinator/coordinator.py +++ b/backend/app/services/coordinator/coordinator.py @@ -20,7 +20,6 @@ ExecutionFailedEvent, ExecutionRequestedEvent, ) -from app.domain.idempotency import KeyStrategy from app.events.core import ConsumerConfig, EventDispatcher, UnifiedConsumer, UnifiedProducer from app.events.event_store import EventStore from app.events.schema.schema_registry import ( @@ -28,8 +27,6 @@ ) from app.services.coordinator.queue_manager import QueueManager, QueuePriority from app.services.coordinator.resource_manager import ResourceAllocation, ResourceManager -from app.services.idempotency import IdempotencyManager -from app.services.idempotency.middleware import IdempotentConsumerWrapper from app.settings import Settings EventHandler: TypeAlias = Coroutine[Any, Any, None] @@ -55,7 +52,7 @@ def __init__( settings: Settings, event_store: EventStore, execution_repository: ExecutionRepository, - idempotency_manager: IdempotencyManager, + dispatcher: EventDispatcher, logger: logging.Logger, coordinator_metrics: CoordinatorMetrics, event_metrics: EventMetrics, @@ -92,12 +89,10 @@ def __init__( # Kafka components self.consumer: UnifiedConsumer | None = None - self.idempotent_consumer: IdempotentConsumerWrapper | None = None self.producer: UnifiedProducer = producer # Persistence via repositories self.execution_repository = execution_repository - self.idempotency_manager = idempotency_manager self._event_store = event_store # Scheduling @@ -110,7 +105,7 @@ def __init__( self._active_executions: set[str] = set() self._execution_resources: ExecutionMap = {} self._schema_registry_manager = schema_registry_manager - self.dispatcher = EventDispatcher(logger=self.logger) + self.dispatcher = dispatcher async def _on_start(self) -> None: """Start the coordinator service.""" @@ -118,8 +113,6 @@ async def _on_start(self) -> None: await self.queue_manager.start() - await self.idempotency_manager.initialize() - consumer_config = ConsumerConfig( bootstrap_servers=self.kafka_servers, group_id=self.consumer_group, @@ -159,19 +152,9 @@ async def handle_failed(event: ExecutionFailedEvent) -> None: async def handle_cancelled(event: ExecutionCancelledEvent) -> None: await self._route_execution_event(event) - self.idempotent_consumer = IdempotentConsumerWrapper( - consumer=self.consumer, - idempotency_manager=self.idempotency_manager, - dispatcher=self.dispatcher, - logger=self.logger, - default_key_strategy=KeyStrategy.EVENT_BASED, # Use event ID for deduplication - default_ttl_seconds=7200, # 2 hours TTL for coordinator events - enable_for_all_handlers=True, # Enable idempotency for ALL handlers - ) + self.logger.info("COORDINATOR: Event handlers registered") - self.logger.info("COORDINATOR: Event handlers registered with idempotency protection") - - await self.idempotent_consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.EXECUTION_COORDINATOR])) + await self.consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.EXECUTION_COORDINATOR])) # Start scheduling task self._scheduling_task = asyncio.create_task(self._scheduling_loop()) @@ -190,16 +173,11 @@ async def _on_stop(self) -> None: except asyncio.CancelledError: pass - # Stop consumer (idempotent wrapper only) - if self.idempotent_consumer: - await self.idempotent_consumer.stop() + if self.consumer: + await self.consumer.stop() await self.queue_manager.stop() - # Close idempotency manager - if hasattr(self, "idempotency_manager") and self.idempotency_manager: - await self.idempotency_manager.close() - self.logger.info(f"ExecutionCoordinator service stopped. Active executions: {len(self._active_executions)}") async def _route_execution_event(self, event: ExecutionRequestedEvent | ExecutionCancelledEvent) -> None: diff --git a/backend/app/services/idempotency/__init__.py b/backend/app/services/idempotency/__init__.py index 82af12f0..484465db 100644 --- a/backend/app/services/idempotency/__init__.py +++ b/backend/app/services/idempotency/__init__.py @@ -1,21 +1,16 @@ from app.domain.idempotency import IdempotencyStatus from app.services.idempotency.idempotency_manager import ( IdempotencyConfig, - IdempotencyKeyStrategy, IdempotencyManager, IdempotencyResult, - create_idempotency_manager, ) -from app.services.idempotency.middleware import IdempotentConsumerWrapper, IdempotentEventHandler, idempotent_handler +from app.services.idempotency.middleware import IdempotentEventDispatcher, IdempotentEventHandler __all__ = [ - "IdempotencyManager", "IdempotencyConfig", + "IdempotencyManager", "IdempotencyResult", "IdempotencyStatus", - "IdempotencyKeyStrategy", - "create_idempotency_manager", + "IdempotentEventDispatcher", "IdempotentEventHandler", - "idempotent_handler", - "IdempotentConsumerWrapper", ] diff --git a/backend/app/services/idempotency/idempotency_manager.py b/backend/app/services/idempotency/idempotency_manager.py index af952289..41dc64ac 100644 --- a/backend/app/services/idempotency/idempotency_manager.py +++ b/backend/app/services/idempotency/idempotency_manager.py @@ -1,16 +1,15 @@ -import asyncio import hashlib import json import logging from datetime import datetime, timedelta, timezone -from typing import Protocol from pydantic import BaseModel from pymongo.errors import DuplicateKeyError from app.core.metrics import DatabaseMetrics from app.domain.events.typed import BaseEvent -from app.domain.idempotency import IdempotencyRecord, IdempotencyStats, IdempotencyStatus, KeyStrategy +from app.domain.idempotency import IdempotencyRecord, IdempotencyStatus, KeyStrategy +from app.services.idempotency.redis_repository import RedisIdempotencyRepository class IdempotencyResult(BaseModel): @@ -30,79 +29,38 @@ class IdempotencyConfig(BaseModel): processing_timeout_seconds: int = 300 enable_result_caching: bool = True max_result_size_bytes: int = 1048576 - enable_metrics: bool = True - collection_name: str = "idempotency_keys" - - -class IdempotencyKeyStrategy: - @staticmethod - def event_based(event: BaseEvent) -> str: - return f"{event.event_type}:{event.event_id}" - - @staticmethod - def content_hash(event: BaseEvent, fields: set[str] | None = None) -> str: - event_dict = event.model_dump(mode="json") - event_dict.pop("event_id", None) - event_dict.pop("timestamp", None) - event_dict.pop("metadata", None) - - if fields: - event_dict = {k: v for k, v in event_dict.items() if k in fields} - - content = json.dumps(event_dict, sort_keys=True) - return hashlib.sha256(content.encode()).hexdigest() - - @staticmethod - def custom(event: BaseEvent, custom_key: str) -> str: - return f"{event.event_type}:{custom_key}" - - -class IdempotencyRepoProtocol(Protocol): - async def find_by_key(self, key: str) -> IdempotencyRecord | None: ... - async def insert_processing(self, record: IdempotencyRecord) -> None: ... - async def update_record(self, record: IdempotencyRecord) -> int: ... - async def delete_key(self, key: str) -> int: ... - async def aggregate_status_counts(self, key_prefix: str) -> dict[str, int]: ... - async def health_check(self) -> None: ... class IdempotencyManager: def __init__( self, config: IdempotencyConfig, - repository: IdempotencyRepoProtocol, + repository: RedisIdempotencyRepository, logger: logging.Logger, database_metrics: DatabaseMetrics, ) -> None: self.config = config self.metrics = database_metrics - self._repo: IdempotencyRepoProtocol = repository - self._stats_update_task: asyncio.Task[None] | None = None + self._repo = repository self.logger = logger - - async def initialize(self) -> None: - if self.config.enable_metrics and self._stats_update_task is None: - self._stats_update_task = asyncio.create_task(self._update_stats_loop()) - self.logger.info("Idempotency manager ready") - - async def close(self) -> None: - if self._stats_update_task: - self._stats_update_task.cancel() - try: - await self._stats_update_task - except asyncio.CancelledError: - pass - self.logger.info("Closed idempotency manager") + self.logger.info("Idempotency manager initialized") def _generate_key( self, event: BaseEvent, key_strategy: KeyStrategy, custom_key: str | None = None, fields: set[str] | None = None ) -> str: if key_strategy == KeyStrategy.EVENT_BASED: - key = IdempotencyKeyStrategy.event_based(event) + key = f"{event.event_type}:{event.event_id}" elif key_strategy == KeyStrategy.CONTENT_HASH: - key = IdempotencyKeyStrategy.content_hash(event, fields) + event_dict = event.model_dump(mode="json") + event_dict.pop("event_id", None) + event_dict.pop("timestamp", None) + event_dict.pop("metadata", None) + if fields: + event_dict = {k: v for k, v in event_dict.items() if k in fields} + content = json.dumps(event_dict, sort_keys=True) + key = hashlib.sha256(content.encode()).hexdigest() elif key_strategy == KeyStrategy.CUSTOM and custom_key: - key = IdempotencyKeyStrategy.custom(event, custom_key) + key = f"{event.event_type}:{custom_key}" else: raise ValueError(f"Invalid key strategy: {key_strategy}") return f"{self.config.key_prefix}:{key}" @@ -188,6 +146,7 @@ async def _create_new_key(self, full_key: str, event: BaseEvent, ttl: int) -> Id ttl_seconds=ttl, ) await self._repo.insert_processing(record) + self.metrics.increment_idempotency_keys(self.config.key_prefix) return IdempotencyResult( is_duplicate=False, status=IdempotencyStatus.PROCESSING, created_at=created_at, key=full_key ) @@ -240,7 +199,6 @@ async def mark_completed( if not existing: self.logger.warning(f"Idempotency key {full_key} not found when marking completed") return False - # mark_completed does not accept arbitrary result today; use mark_completed_with_cache for cached payloads return await self._update_key_status(full_key, existing, IdempotencyStatus.COMPLETED, cached_json=None) async def mark_failed( @@ -282,50 +240,3 @@ async def get_cached_json( existing = await self._repo.find_by_key(full_key) assert existing and existing.result_json is not None, "Invariant: cached result must exist when requested" return existing.result_json - - async def remove( - self, - event: BaseEvent, - key_strategy: KeyStrategy = KeyStrategy.EVENT_BASED, - custom_key: str | None = None, - fields: set[str] | None = None, - ) -> bool: - full_key = self._generate_key(event, key_strategy, custom_key, fields) - try: - deleted = await self._repo.delete_key(full_key) - return deleted > 0 - except Exception as e: - self.logger.error(f"Failed to remove idempotency key: {e}") - return False - - async def get_stats(self) -> IdempotencyStats: - counts_raw = await self._repo.aggregate_status_counts(self.config.key_prefix) - status_counts: dict[IdempotencyStatus, int] = { - IdempotencyStatus.PROCESSING: counts_raw.get(IdempotencyStatus.PROCESSING, 0), - IdempotencyStatus.COMPLETED: counts_raw.get(IdempotencyStatus.COMPLETED, 0), - IdempotencyStatus.FAILED: counts_raw.get(IdempotencyStatus.FAILED, 0), - } - total = sum(status_counts.values()) - return IdempotencyStats(total_keys=total, status_counts=status_counts, prefix=self.config.key_prefix) - - async def _update_stats_loop(self) -> None: - while True: - try: - stats = await self.get_stats() - self.metrics.update_idempotency_keys_active(stats.total_keys, self.config.key_prefix) - await asyncio.sleep(60) - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Failed to update idempotency stats: {e}") - await asyncio.sleep(300) - - -def create_idempotency_manager( - *, - repository: IdempotencyRepoProtocol, - config: IdempotencyConfig | None = None, - logger: logging.Logger, - database_metrics: DatabaseMetrics, -) -> IdempotencyManager: - return IdempotencyManager(config or IdempotencyConfig(), repository, logger, database_metrics) diff --git a/backend/app/services/idempotency/middleware.py b/backend/app/services/idempotency/middleware.py index 04a4f931..13b8b59a 100644 --- a/backend/app/services/idempotency/middleware.py +++ b/backend/app/services/idempotency/middleware.py @@ -1,18 +1,14 @@ -import asyncio import logging -from collections.abc import Awaitable -from typing import Any, Callable +from collections.abc import Awaitable, Callable -from app.domain.enums.events import EventType -from app.domain.enums.kafka import KafkaTopic from app.domain.events.typed import DomainEvent from app.domain.idempotency import KeyStrategy -from app.events.core import EventDispatcher, UnifiedConsumer +from app.events.core.dispatcher import EventDispatcher, EventHandler from app.services.idempotency.idempotency_manager import IdempotencyManager class IdempotentEventHandler: - """Wrapper for event handlers with idempotency support""" + """Wraps a single event handler with idempotency check-and-reserve logic.""" def __init__( self, @@ -20,260 +16,73 @@ def __init__( idempotency_manager: IdempotencyManager, logger: logging.Logger, key_strategy: KeyStrategy = KeyStrategy.EVENT_BASED, - custom_key_func: Callable[[DomainEvent], str] | None = None, fields: set[str] | None = None, ttl_seconds: int | None = None, - cache_result: bool = True, - on_duplicate: Callable[[DomainEvent, Any], Any] | None = None, ): self.handler = handler self.idempotency_manager = idempotency_manager self.logger = logger self.key_strategy = key_strategy - self.custom_key_func = custom_key_func self.fields = fields self.ttl_seconds = ttl_seconds - self.cache_result = cache_result - self.on_duplicate = on_duplicate + self.__name__ = handler.__name__ async def __call__(self, event: DomainEvent) -> None: - """Process event with idempotency check""" + """Process event with idempotency check.""" self.logger.info( f"IdempotentEventHandler called for event {event.event_type}, " - f"id={event.event_id}, handler={self.handler.__name__}" + f"id={event.event_id}, handler={self.__name__}" ) - # Generate custom key if function provided - custom_key = None - if self.key_strategy == KeyStrategy.CUSTOM and self.custom_key_func: - custom_key = self.custom_key_func(event) - # Check idempotency idempotency_result = await self.idempotency_manager.check_and_reserve( event=event, key_strategy=self.key_strategy, - custom_key=custom_key, ttl_seconds=self.ttl_seconds, fields=self.fields, ) if idempotency_result.is_duplicate: - # Handle duplicate self.logger.info( f"Duplicate event detected: {event.event_type} ({event.event_id}), status: {idempotency_result.status}" ) - - # Call duplicate handler if provided - if self.on_duplicate: - if asyncio.iscoroutinefunction(self.on_duplicate): - await self.on_duplicate(event, idempotency_result) - else: - await asyncio.to_thread(self.on_duplicate, event, idempotency_result) - - # For duplicate, just return without error return - # Not a duplicate, process the event try: - # Call the actual handler - it returns None await self.handler(event) - - # Mark as completed await self.idempotency_manager.mark_completed( - event=event, key_strategy=self.key_strategy, custom_key=custom_key, fields=self.fields + event=event, key_strategy=self.key_strategy, fields=self.fields ) - except Exception as e: - # Mark as failed await self.idempotency_manager.mark_failed( - event=event, error=str(e), key_strategy=self.key_strategy, custom_key=custom_key, fields=self.fields + event=event, error=str(e), key_strategy=self.key_strategy, fields=self.fields ) raise -def idempotent_handler( - idempotency_manager: IdempotencyManager, - logger: logging.Logger, - key_strategy: KeyStrategy = KeyStrategy.EVENT_BASED, - custom_key_func: Callable[[DomainEvent], str] | None = None, - fields: set[str] | None = None, - ttl_seconds: int | None = None, - cache_result: bool = True, - on_duplicate: Callable[[DomainEvent, Any], Any] | None = None, -) -> Callable[[Callable[[DomainEvent], Awaitable[None]]], Callable[[DomainEvent], Awaitable[None]]]: - """Decorator for making event handlers idempotent""" - - def decorator(func: Callable[[DomainEvent], Awaitable[None]]) -> Callable[[DomainEvent], Awaitable[None]]: - handler = IdempotentEventHandler( - handler=func, - idempotency_manager=idempotency_manager, - logger=logger, - key_strategy=key_strategy, - custom_key_func=custom_key_func, - fields=fields, - ttl_seconds=ttl_seconds, - cache_result=cache_result, - on_duplicate=on_duplicate, - ) - return handler # IdempotentEventHandler is already callable with the right signature - - return decorator - +class IdempotentEventDispatcher(EventDispatcher): + """EventDispatcher that automatically wraps every handler with idempotency. -class IdempotentConsumerWrapper: - """Wrapper for Kafka consumer with automatic idempotency""" + Drop-in replacement for ``EventDispatcher`` — DI providers create this + subclass for services that need idempotent event handling. + """ def __init__( self, - consumer: UnifiedConsumer, - idempotency_manager: IdempotencyManager, - dispatcher: EventDispatcher, logger: logging.Logger, - default_key_strategy: KeyStrategy = KeyStrategy.EVENT_BASED, - default_ttl_seconds: int = 3600, - enable_for_all_handlers: bool = True, - ): - self.consumer = consumer - self.idempotency_manager = idempotency_manager - self.dispatcher = dispatcher - self.logger = logger - self.default_key_strategy = default_key_strategy - self.default_ttl_seconds = default_ttl_seconds - self.enable_for_all_handlers = enable_for_all_handlers - self._original_handlers: dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]] = {} - - def make_handlers_idempotent(self) -> None: - """Wrap all registered handlers with idempotency""" - self.logger.info( - f"make_handlers_idempotent called: enable_for_all={self.enable_for_all_handlers}, " - f"dispatcher={self.dispatcher is not None}" - ) - if not self.enable_for_all_handlers or not self.dispatcher: - self.logger.warning("Skipping handler wrapping - conditions not met") - return - - # Store original handlers using public API - self._original_handlers = self.dispatcher.get_all_handlers() - self.logger.info(f"Got {len(self._original_handlers)} event types with handlers to wrap") - - # Wrap each handler - for event_type, handlers in self._original_handlers.items(): - wrapped_handlers: list[Callable[[DomainEvent], Awaitable[None]]] = [] - for handler in handlers: - # Wrap with idempotency - IdempotentEventHandler is callable with the right signature - wrapped = IdempotentEventHandler( - handler=handler, - idempotency_manager=self.idempotency_manager, - logger=self.logger, - key_strategy=self.default_key_strategy, - ttl_seconds=self.default_ttl_seconds, - ) - wrapped_handlers.append(wrapped) - - # Replace handlers using public API - self.logger.info( - f"Replacing {len(handlers)} handlers for {event_type} with {len(wrapped_handlers)} wrapped handlers" - ) - self.dispatcher.replace_handlers(event_type, wrapped_handlers) - - self.logger.info("Handler wrapping complete") - - def subscribe_idempotent_handler( - self, - event_type: str, - handler: Callable[[DomainEvent], Awaitable[None]], - key_strategy: KeyStrategy | None = None, - custom_key_func: Callable[[DomainEvent], str] | None = None, - fields: set[str] | None = None, - ttl_seconds: int | None = None, - cache_result: bool = True, - on_duplicate: Callable[[DomainEvent, Any], Any] | None = None, + idempotency_manager: IdempotencyManager, + key_strategy: KeyStrategy = KeyStrategy.EVENT_BASED, + ttl_seconds: int = 3600, ) -> None: - """Subscribe an idempotent handler for specific event type""" - # Create the idempotent handler wrapper - idempotent_wrapper = IdempotentEventHandler( + super().__init__(logger=logger) + self._idempotency_manager = idempotency_manager + self._key_strategy = key_strategy + self._ttl_seconds = ttl_seconds + + def _wrap_handler(self, handler: EventHandler) -> EventHandler: + return IdempotentEventHandler( handler=handler, - idempotency_manager=self.idempotency_manager, + idempotency_manager=self._idempotency_manager, logger=self.logger, - key_strategy=key_strategy or self.default_key_strategy, - custom_key_func=custom_key_func, - fields=fields, - ttl_seconds=ttl_seconds or self.default_ttl_seconds, - cache_result=cache_result, - on_duplicate=on_duplicate, + key_strategy=self._key_strategy, + ttl_seconds=self._ttl_seconds, ) - - # Create an async handler that processes the message - async def async_handler(message: Any) -> Any: - self.logger.info(f"IDEMPOTENT HANDLER CALLED for {event_type}") - - # Extract event from confluent-kafka Message - if not hasattr(message, "value"): - self.logger.error(f"Received non-Message object for {event_type}: {type(message)}") - return None - - # Debug log to check message details - self.logger.info( - f"Handler for {event_type} - Message type: {type(message)}, " - f"has key: {hasattr(message, 'key')}, " - f"has topic: {hasattr(message, 'topic')}" - ) - - raw_value = message.value - - # Debug the raw value - self.logger.info(f"Raw value extracted: {raw_value[:100] if raw_value else 'None or empty'}") - - # Handle tombstone messages (null value for log compaction) - if raw_value is None: - self.logger.warning(f"Received empty message for {event_type} - tombstone or consumed value") - return None - - # Handle empty messages - if not raw_value: - self.logger.warning(f"Received empty message for {event_type} - empty bytes") - return None - - try: - # Deserialize using schema registry if available - event = await self.consumer._schema_registry.deserialize_event(raw_value, message.topic) - if not event: - self.logger.error(f"Failed to deserialize event for {event_type}") - return None - - # Call the idempotent wrapper directly in async context - await idempotent_wrapper(event) - - self.logger.debug(f"Successfully processed {event_type} event: {event.event_id}") - return None - except Exception as e: - self.logger.error(f"Failed to process message for {event_type}: {e}", exc_info=True) - raise - - # Register with the dispatcher if available - if self.dispatcher: - # Create wrapper for EventDispatcher - async def dispatch_handler(event: DomainEvent) -> None: - await idempotent_wrapper(event) - - self.dispatcher.register(EventType(event_type))(dispatch_handler) - else: - # Fallback to direct consumer registration if no dispatcher - self.logger.error(f"No EventDispatcher available for registering idempotent handler for {event_type}") - - async def start(self, topics: list[KafkaTopic]) -> None: - """Start the consumer with idempotency""" - self.logger.info(f"IdempotentConsumerWrapper.start called with topics: {topics}") - # Make handlers idempotent before starting - self.make_handlers_idempotent() - - # Start the consumer with required topics parameter - await self.consumer.start(topics) - self.logger.info("IdempotentConsumerWrapper started successfully") - - async def stop(self) -> None: - """Stop the consumer""" - await self.consumer.stop() - - # Delegate other methods to the wrapped consumer - def __getattr__(self, name: str) -> Any: - return getattr(self.consumer, name) diff --git a/backend/app/services/idempotency/redis_repository.py b/backend/app/services/idempotency/redis_repository.py index 54b9eb1a..f649a737 100644 --- a/backend/app/services/idempotency/redis_repository.py +++ b/backend/app/services/idempotency/redis_repository.py @@ -121,21 +121,5 @@ async def delete_key(self, key: str) -> int: k = self._full_key(key) return int(await self._r.delete(k) or 0) - async def aggregate_status_counts(self, key_prefix: str) -> dict[str, int]: - pattern = f"{key_prefix.rstrip(':')}:*" - counts: dict[str, int] = {} - # SCAN to avoid blocking Redis - async for k in self._r.scan_iter(match=pattern, count=200): - try: - raw = await self._r.get(k) - if not raw: - continue - doc = json.loads(raw) - status = str(doc.get("status", "")) - counts[status] = counts.get(status, 0) + 1 - except Exception: - continue - return counts - async def health_check(self) -> None: await self._r.ping() # type: ignore[misc] # redis-py returns Awaitable[bool] | bool diff --git a/backend/app/services/result_processor/processor.py b/backend/app/services/result_processor/processor.py index 464584c7..3d5361ec 100644 --- a/backend/app/services/result_processor/processor.py +++ b/backend/app/services/result_processor/processor.py @@ -22,11 +22,8 @@ ResultStoredEvent, ) from app.domain.execution import ExecutionNotFoundError, ExecutionResultDomain -from app.domain.idempotency import KeyStrategy from app.events.core import ConsumerConfig, EventDispatcher, UnifiedConsumer, UnifiedProducer from app.events.schema.schema_registry import SchemaRegistryManager -from app.services.idempotency import IdempotencyManager -from app.services.idempotency.middleware import IdempotentConsumerWrapper from app.settings import Settings @@ -61,7 +58,7 @@ def __init__( producer: UnifiedProducer, schema_registry: SchemaRegistryManager, settings: Settings, - idempotency_manager: IdempotencyManager, + dispatcher: EventDispatcher, logger: logging.Logger, execution_metrics: ExecutionMetrics, event_metrics: EventMetrics, @@ -75,24 +72,19 @@ def __init__( self._settings = settings self._metrics = execution_metrics self._event_metrics = event_metrics - self._idempotency_manager: IdempotencyManager = idempotency_manager + self._dispatcher = dispatcher self._state = ProcessingState.IDLE - self._consumer: IdempotentConsumerWrapper | None = None - self._dispatcher: EventDispatcher | None = None + self._consumer: UnifiedConsumer | None = None self.logger = logger async def _on_start(self) -> None: """Start the result processor.""" self.logger.info("Starting ResultProcessor...") - # Initialize idempotency manager (safe to call multiple times) - await self._idempotency_manager.initialize() - self.logger.info("Idempotency manager initialized for ResultProcessor") - - self._dispatcher = self._create_dispatcher() + self._register_handlers() self._consumer = await self._create_consumer() self._state = ProcessingState.PROCESSING - self.logger.info("ResultProcessor started successfully with idempotency protection") + self.logger.info("ResultProcessor started successfully") async def _on_stop(self) -> None: """Stop the result processor.""" @@ -102,23 +94,17 @@ async def _on_stop(self) -> None: if self._consumer: await self._consumer.stop() - await self._idempotency_manager.close() # Note: producer is managed by DI container, not stopped here self.logger.info("ResultProcessor stopped") - def _create_dispatcher(self) -> EventDispatcher: - """Create and configure event dispatcher with handlers.""" - dispatcher = EventDispatcher(logger=self.logger) - - # Register handlers for specific event types - dispatcher.register_handler(EventType.EXECUTION_COMPLETED, self._handle_completed_wrapper) - dispatcher.register_handler(EventType.EXECUTION_FAILED, self._handle_failed_wrapper) - dispatcher.register_handler(EventType.EXECUTION_TIMEOUT, self._handle_timeout_wrapper) - - return dispatcher + def _register_handlers(self) -> None: + """Register event handlers on the dispatcher.""" + self._dispatcher.register_handler(EventType.EXECUTION_COMPLETED, self._handle_completed_wrapper) + self._dispatcher.register_handler(EventType.EXECUTION_FAILED, self._handle_failed_wrapper) + self._dispatcher.register_handler(EventType.EXECUTION_TIMEOUT, self._handle_timeout_wrapper) - async def _create_consumer(self) -> IdempotentConsumerWrapper: - """Create and configure idempotent Kafka consumer.""" + async def _create_consumer(self) -> UnifiedConsumer: + """Create and start Kafka consumer.""" consumer_config = ConsumerConfig( bootstrap_servers=self._settings.KAFKA_BOOTSTRAP_SERVERS, group_id=self.config.consumer_group, @@ -131,11 +117,7 @@ async def _create_consumer(self) -> IdempotentConsumerWrapper: request_timeout_ms=self._settings.KAFKA_REQUEST_TIMEOUT_MS, ) - # Create consumer with schema registry and dispatcher - if not self._dispatcher: - raise RuntimeError("Event dispatcher not initialized") - - base_consumer = UnifiedConsumer( + consumer = UnifiedConsumer( consumer_config, event_dispatcher=self._dispatcher, schema_registry=self._schema_registry, @@ -143,17 +125,8 @@ async def _create_consumer(self) -> IdempotentConsumerWrapper: logger=self.logger, event_metrics=self._event_metrics, ) - wrapper = IdempotentConsumerWrapper( - consumer=base_consumer, - idempotency_manager=self._idempotency_manager, - dispatcher=self._dispatcher, - logger=self.logger, - default_key_strategy=KeyStrategy.CONTENT_HASH, - default_ttl_seconds=7200, - enable_for_all_handlers=True, - ) - await wrapper.start(self.config.topics) - return wrapper + await consumer.start(self.config.topics) + return consumer # Wrappers accepting DomainEvent to satisfy dispatcher typing diff --git a/backend/app/services/saga/saga_orchestrator.py b/backend/app/services/saga/saga_orchestrator.py index 7d607bb6..d1f3d9b8 100644 --- a/backend/app/services/saga/saga_orchestrator.py +++ b/backend/app/services/saga/saga_orchestrator.py @@ -14,14 +14,11 @@ from app.domain.enums.events import EventType from app.domain.enums.saga import SagaState from app.domain.events.typed import DomainEvent, EventMetadata, SagaCancelledEvent -from app.domain.idempotency import KeyStrategy from app.domain.saga.models import Saga, SagaConfig from app.events.core import ConsumerConfig, EventDispatcher, UnifiedConsumer, UnifiedProducer from app.events.event_store import EventStore from app.events.schema.schema_registry import SchemaRegistryManager from app.infrastructure.kafka.mappings import get_topic_for_event -from app.services.idempotency import IdempotentConsumerWrapper -from app.services.idempotency.idempotency_manager import IdempotencyManager from app.settings import Settings from .base_saga import BaseSaga @@ -40,7 +37,6 @@ def __init__( schema_registry_manager: SchemaRegistryManager, settings: Settings, event_store: EventStore, - idempotency_manager: IdempotencyManager, resource_allocation_repository: ResourceAllocationRepository, logger: logging.Logger, event_metrics: EventMetrics, @@ -49,8 +45,7 @@ def __init__( self.config = config self._sagas: dict[str, type[BaseSaga]] = {} self._running_instances: dict[str, Saga] = {} - self._consumer: IdempotentConsumerWrapper | None = None - self._idempotency_manager: IdempotencyManager = idempotency_manager + self._consumer: UnifiedConsumer | None = None self._producer = producer self._schema_registry_manager = schema_registry_manager self._settings = settings @@ -89,8 +84,6 @@ async def _on_stop(self) -> None: if self._consumer: await self._consumer.stop() - await self._idempotency_manager.close() - for task in self._tasks: if not task.done(): task.cancel() @@ -147,7 +140,7 @@ async def _start_consumer(self) -> None: dispatcher.register_handler(event_type, self._handle_event) self.logger.info(f"Registered handler for event type: {event_type}") - base_consumer = UnifiedConsumer( + self._consumer = UnifiedConsumer( config=consumer_config, event_dispatcher=dispatcher, schema_registry=self._schema_registry_manager, @@ -155,17 +148,7 @@ async def _start_consumer(self) -> None: logger=self.logger, event_metrics=self._event_metrics, ) - self._consumer = IdempotentConsumerWrapper( - consumer=base_consumer, - idempotency_manager=self._idempotency_manager, - dispatcher=dispatcher, - logger=self.logger, - default_key_strategy=KeyStrategy.EVENT_BASED, - default_ttl_seconds=7200, - enable_for_all_handlers=False, - ) - assert self._consumer is not None await self._consumer.start(list(topics)) self.logger.info(f"Saga consumer started for topics: {topics}") @@ -611,7 +594,6 @@ def create_saga_orchestrator( schema_registry_manager: SchemaRegistryManager, settings: Settings, event_store: EventStore, - idempotency_manager: IdempotencyManager, resource_allocation_repository: ResourceAllocationRepository, config: SagaConfig, logger: logging.Logger, @@ -625,7 +607,6 @@ def create_saga_orchestrator( schema_registry_manager: Schema registry manager for event serialization settings: Application settings event_store: Event store instance for event sourcing - idempotency_manager: Manager for idempotent event processing resource_allocation_repository: Repository for resource allocations config: Saga configuration logger: Logger instance @@ -641,7 +622,6 @@ def create_saga_orchestrator( schema_registry_manager=schema_registry_manager, settings=settings, event_store=event_store, - idempotency_manager=idempotency_manager, resource_allocation_repository=resource_allocation_repository, logger=logger, event_metrics=event_metrics, diff --git a/backend/tests/e2e/idempotency/test_consumer_idempotent.py b/backend/tests/e2e/idempotency/test_consumer_idempotent.py index fd943edd..5b500d90 100644 --- a/backend/tests/e2e/idempotency/test_consumer_idempotent.py +++ b/backend/tests/e2e/idempotency/test_consumer_idempotent.py @@ -91,26 +91,6 @@ async def test_idempotency_manager_mark_failed(scope: AsyncContainer) -> None: assert result2.status == IdempotencyStatus.FAILED -@pytest.mark.asyncio -async def test_idempotency_manager_remove_key(scope: AsyncContainer) -> None: - """Test removing an idempotency key allows reprocessing.""" - idm: IdempotencyManager = await scope.get(IdempotencyManager) - - event = make_execution_requested_event(execution_id=f"e-{uuid.uuid4().hex[:8]}") - - # Reserve the key - result1 = await idm.check_and_reserve(event, key_strategy=KeyStrategy.EVENT_BASED) - assert result1.is_duplicate is False - - # Remove the key - removed = await idm.remove(event, key_strategy=KeyStrategy.EVENT_BASED) - assert removed is True - - # Now the same event can be processed again - result2 = await idm.check_and_reserve(event, key_strategy=KeyStrategy.EVENT_BASED) - assert result2.is_duplicate is False - - @pytest.mark.asyncio async def test_idempotency_content_hash_strategy(scope: AsyncContainer) -> None: """Test content hash strategy blocks events with same content but different IDs.""" diff --git a/backend/tests/e2e/idempotency/test_decorator_idempotent.py b/backend/tests/e2e/idempotency/test_decorator_idempotent.py deleted file mode 100644 index 9638c4a3..00000000 --- a/backend/tests/e2e/idempotency/test_decorator_idempotent.py +++ /dev/null @@ -1,53 +0,0 @@ -import logging - -import pytest -from app.domain.events.typed import DomainEvent -from app.domain.idempotency import KeyStrategy -from app.services.idempotency.idempotency_manager import IdempotencyManager -from app.services.idempotency.middleware import idempotent_handler -from dishka import AsyncContainer - -from tests.conftest import make_execution_requested_event - -_test_logger = logging.getLogger("test.idempotency.decorator_idempotent") - - -pytestmark = [pytest.mark.e2e] - - -@pytest.mark.asyncio -async def test_decorator_blocks_duplicate_event(scope: AsyncContainer) -> None: - idm: IdempotencyManager = await scope.get(IdempotencyManager) - - calls = {"n": 0} - - @idempotent_handler(idempotency_manager=idm, key_strategy=KeyStrategy.EVENT_BASED, logger=_test_logger) - async def h(ev: DomainEvent) -> None: - calls["n"] += 1 - - ev = make_execution_requested_event(execution_id="exec-deco-1") - - await h(ev) - await h(ev) # duplicate - assert calls["n"] == 1 - - -@pytest.mark.asyncio -async def test_decorator_custom_key_blocks(scope: AsyncContainer) -> None: - idm: IdempotencyManager = await scope.get(IdempotencyManager) - - calls = {"n": 0} - - def fixed_key(_ev: DomainEvent) -> str: - return "fixed-key" - - @idempotent_handler(idempotency_manager=idm, key_strategy=KeyStrategy.CUSTOM, custom_key_func=fixed_key, logger=_test_logger) - async def h(ev: DomainEvent) -> None: - calls["n"] += 1 - - e1 = make_execution_requested_event(execution_id="exec-deco-2a") - e2 = make_execution_requested_event(execution_id="exec-deco-2b") - - await h(e1) - await h(e2) # different event ids but same custom key - assert calls["n"] == 1 diff --git a/backend/tests/e2e/idempotency/test_idempotency.py b/backend/tests/e2e/idempotency/test_idempotency.py index 9dc444ec..6019f9ce 100644 --- a/backend/tests/e2e/idempotency/test_idempotency.py +++ b/backend/tests/e2e/idempotency/test_idempotency.py @@ -2,9 +2,7 @@ import json import logging import uuid -from collections.abc import AsyncGenerator from datetime import datetime, timedelta, timezone -from typing import Any import pytest import redis.asyncio as redis @@ -12,7 +10,7 @@ from app.domain.events.typed import DomainEvent from app.domain.idempotency import IdempotencyRecord, IdempotencyStatus, KeyStrategy from app.services.idempotency.idempotency_manager import IdempotencyConfig, IdempotencyManager -from app.services.idempotency.middleware import IdempotentEventHandler, idempotent_handler +from app.services.idempotency.middleware import IdempotentEventHandler from app.services.idempotency.redis_repository import RedisIdempotencyRepository from app.settings import Settings @@ -28,7 +26,7 @@ class TestIdempotencyManager: """IdempotencyManager backed by real Redis repository (DI-provided client).""" @pytest.fixture - async def manager(self, redis_client: redis.Redis, test_settings: Settings) -> AsyncGenerator[IdempotencyManager, None]: + def manager(self, redis_client: redis.Redis, test_settings: Settings) -> IdempotencyManager: prefix = f"idemp_ut:{uuid.uuid4().hex[:6]}" cfg = IdempotencyConfig( key_prefix=prefix, @@ -36,16 +34,10 @@ async def manager(self, redis_client: redis.Redis, test_settings: Settings) -> A processing_timeout_seconds=5, enable_result_caching=True, max_result_size_bytes=1024, - enable_metrics=False, ) repo = RedisIdempotencyRepository(redis_client, key_prefix=prefix) database_metrics = DatabaseMetrics(test_settings) - m = IdempotencyManager(cfg, repo, _test_logger, database_metrics=database_metrics) - await m.initialize() - try: - yield m - finally: - await m.close() + return IdempotencyManager(cfg, repo, _test_logger, database_metrics=database_metrics) @pytest.mark.asyncio async def test_complete_flow_new_event(self, manager: IdempotencyManager) -> None: @@ -200,74 +192,17 @@ async def test_result_caching(self, manager: IdempotencyManager) -> None: assert duplicate_result.is_duplicate is True assert duplicate_result.has_cached_result is True - @pytest.mark.asyncio - async def test_stats_aggregation(self, manager: IdempotencyManager) -> None: - """Test statistics aggregation""" - # Create various events with different statuses - events = [] - for i in range(10): - event = make_execution_requested_event( - execution_id=f"exec-{i}", - script=f"print({i})", - service_name="test-service", - ) - events.append(event) - - # Process events with different outcomes - for i, event in enumerate(events): - await manager.check_and_reserve(event, key_strategy=KeyStrategy.EVENT_BASED) - - if i < 6: - await manager.mark_completed(event, key_strategy=KeyStrategy.EVENT_BASED) - elif i < 8: - await manager.mark_failed(event, "Test error", key_strategy=KeyStrategy.EVENT_BASED) - # Leave rest in processing - - # Get stats - stats = await manager.get_stats() - - assert stats.total_keys == 10 - assert stats.status_counts[IdempotencyStatus.COMPLETED] == 6 - assert stats.status_counts[IdempotencyStatus.FAILED] == 2 - assert stats.status_counts[IdempotencyStatus.PROCESSING] == 2 - assert stats.prefix == manager.config.key_prefix - - @pytest.mark.asyncio - async def test_remove_key(self, manager: IdempotencyManager) -> None: - """Test removing idempotency keys""" - real_event = make_execution_requested_event(execution_id="exec-remove-1") - # Add a key - result = await manager.check_and_reserve(real_event, key_strategy=KeyStrategy.EVENT_BASED) - assert result.is_duplicate is False - - # Remove it - removed = await manager.remove(real_event, key_strategy=KeyStrategy.EVENT_BASED) - assert removed is True - - # Verify it's gone - record = await manager._repo.find_by_key(result.key) - assert record is None - - # Can process again - result2 = await manager.check_and_reserve(real_event, key_strategy=KeyStrategy.EVENT_BASED) - assert result2.is_duplicate is False - class TestIdempotentEventHandlerIntegration: """Test IdempotentEventHandler with real components""" @pytest.fixture - async def manager(self, redis_client: redis.Redis, test_settings: Settings) -> AsyncGenerator[IdempotencyManager, None]: + def manager(self, redis_client: redis.Redis, test_settings: Settings) -> IdempotencyManager: prefix = f"handler_test:{uuid.uuid4().hex[:6]}" - config = IdempotencyConfig(key_prefix=prefix, enable_metrics=False) + config = IdempotencyConfig(key_prefix=prefix) repo = RedisIdempotencyRepository(redis_client, key_prefix=prefix) database_metrics = DatabaseMetrics(test_settings) - m = IdempotencyManager(config, repo, _test_logger, database_metrics=database_metrics) - await m.initialize() - try: - yield m - finally: - await m.close() + return IdempotencyManager(config, repo, _test_logger, database_metrics=database_metrics) @pytest.mark.asyncio async def test_handler_processes_new_event(self, manager: IdempotencyManager) -> None: @@ -344,118 +279,6 @@ async def failing_handler(event: DomainEvent) -> None: # noqa: ARG001 assert record.error is not None assert "Processing failed" in record.error - @pytest.mark.asyncio - async def test_handler_duplicate_callback(self, manager: IdempotencyManager) -> None: - """Test duplicate callback is invoked""" - duplicate_events: list[tuple[DomainEvent, Any]] = [] - - async def actual_handler(event: DomainEvent) -> None: # noqa: ARG001 - pass # Do nothing - - async def on_duplicate(event: DomainEvent, result: Any) -> None: - duplicate_events.append((event, result)) - - handler = IdempotentEventHandler( - handler=actual_handler, - idempotency_manager=manager, - key_strategy=KeyStrategy.EVENT_BASED, - on_duplicate=on_duplicate, - logger=_test_logger, - ) - - # Process twice - real_event = make_execution_requested_event(execution_id="handler-dup-cb-1") - await handler(real_event) - await handler(real_event) - - # Verify duplicate callback was called - assert len(duplicate_events) == 1 - assert duplicate_events[0][0] == real_event - assert duplicate_events[0][1].is_duplicate is True - - @pytest.mark.asyncio - async def test_decorator_integration(self, manager: IdempotencyManager) -> None: - """Test the @idempotent_handler decorator""" - processed_events: list[DomainEvent] = [] - - @idempotent_handler( - idempotency_manager=manager, - key_strategy=KeyStrategy.CONTENT_HASH, - ttl_seconds=300, - logger=_test_logger, - ) - async def my_handler(event: DomainEvent) -> None: - processed_events.append(event) - - # Process same event twice - real_event = make_execution_requested_event(execution_id="decor-1") - await my_handler(real_event) - await my_handler(real_event) - - # Should only process once - assert len(processed_events) == 1 - - # Create event with same ID and same content for content hash match - similar_event = make_execution_requested_event( - execution_id=real_event.execution_id, - script=real_event.script, - ) - - # Should still be blocked (content hash) - await my_handler(similar_event) - assert len(processed_events) == 1 # Still only one - - @pytest.mark.asyncio - async def test_custom_key_function(self, manager: IdempotencyManager) -> None: - """Test handler with custom key function""" - processed_scripts: list[str] = [] - - async def process_script(event: DomainEvent) -> None: - script: str = getattr(event, "script", "") - processed_scripts.append(script) - - def extract_script_key(event: DomainEvent) -> str: - # Custom key based on script content only - script: str = getattr(event, "script", "") - return f"script:{hash(script)}" - - handler = IdempotentEventHandler( - handler=process_script, - idempotency_manager=manager, - key_strategy=KeyStrategy.CUSTOM, - custom_key_func=extract_script_key, - logger=_test_logger, - ) - - # Events with same script - event1 = make_execution_requested_event( - execution_id="id1", - script="print('hello')", - service_name="test-service", - ) - - event2 = make_execution_requested_event( - execution_id="id2", - language="python", - language_version="3.9", # Different version - runtime_image="python:3.9-slim", - runtime_command=("python",), - runtime_filename="main.py", - timeout_seconds=60, # Different timeout - cpu_limit="200m", - memory_limit="256Mi", - cpu_request="100m", - memory_request="128Mi", - service_name="test-service", - ) - - await handler(event1) - await handler(event2) - - # Should only process once (same script) - assert len(processed_scripts) == 1 - assert processed_scripts[0] == "print('hello')" - @pytest.mark.asyncio async def test_invalid_key_strategy(self, manager: IdempotencyManager) -> None: """Test that invalid key strategy raises error""" @@ -512,21 +335,6 @@ async def test_cleanup_expired_keys(self, manager: IdempotencyManager) -> None: record = await manager._repo.find_by_key(expired_key) assert record is not None # Still exists until explicit cleanup - @pytest.mark.asyncio - async def test_metrics_enabled(self, redis_client: redis.Redis, test_settings: Settings) -> None: - """Test manager with metrics enabled""" - config = IdempotencyConfig(key_prefix=f"metrics:{uuid.uuid4().hex[:6]}", enable_metrics=True) - repository = RedisIdempotencyRepository(redis_client, key_prefix=config.key_prefix) - database_metrics = DatabaseMetrics(test_settings) - manager = IdempotencyManager(config, repository, _test_logger, database_metrics=database_metrics) - - # Initialize with metrics - await manager.initialize() - assert manager._stats_update_task is not None - - # Cleanup - await manager.close() - @pytest.mark.asyncio async def test_content_hash_with_fields(self, manager: IdempotencyManager) -> None: """Test content hash with specific fields""" diff --git a/backend/tests/e2e/result_processor/test_result_processor.py b/backend/tests/e2e/result_processor/test_result_processor.py index 4f5b11f3..e5f7aa06 100644 --- a/backend/tests/e2e/result_processor/test_result_processor.py +++ b/backend/tests/e2e/result_processor/test_result_processor.py @@ -20,7 +20,9 @@ from app.events.core.dispatcher import EventDispatcher from app.events.core.types import ConsumerConfig from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas +from app.domain.idempotency import KeyStrategy from app.services.idempotency import IdempotencyManager +from app.services.idempotency.middleware import IdempotentEventDispatcher from app.services.result_processor.processor import ResultProcessor from app.settings import Settings from dishka import AsyncContainer @@ -63,12 +65,18 @@ async def test_result_processor_persists_and_emits(scope: AsyncContainer) -> Non execution_id = created.execution_id # Build and start the processor + proc_dispatcher = IdempotentEventDispatcher( + logger=_test_logger, + idempotency_manager=idem, + key_strategy=KeyStrategy.CONTENT_HASH, + ttl_seconds=7200, + ) processor = ResultProcessor( execution_repo=repo, producer=producer, schema_registry=registry, settings=settings, - idempotency_manager=idem, + dispatcher=proc_dispatcher, logger=_test_logger, execution_metrics=execution_metrics, event_metrics=event_metrics, diff --git a/backend/tests/e2e/services/idempotency/test_redis_repository.py b/backend/tests/e2e/services/idempotency/test_redis_repository.py index c346c8f6..a295d307 100644 --- a/backend/tests/e2e/services/idempotency/test_redis_repository.py +++ b/backend/tests/e2e/services/idempotency/test_redis_repository.py @@ -136,35 +136,6 @@ async def test_update_record_when_missing( assert res == 0 -@pytest.mark.asyncio -async def test_aggregate_status_counts( - repository: RedisIdempotencyRepository, redis_client: redis.Redis -) -> None: - # Use unique prefix to avoid collision with other tests - prefix = uuid.uuid4().hex[:8] - # Seed few keys directly using repository - statuses = (IdempotencyStatus.PROCESSING, IdempotencyStatus.PROCESSING, IdempotencyStatus.COMPLETED) - for i, status in enumerate(statuses): - rec = IdempotencyRecord( - key=f"{prefix}_k{i}", - status=status, - event_type="t", - event_id=f"{prefix}_e{i}", - created_at=datetime.now(timezone.utc), - ttl_seconds=60, - ) - await repository.insert_processing(rec) - if status != IdempotencyStatus.PROCESSING: - rec.status = status - rec.completed_at = datetime.now(timezone.utc) - await repository.update_record(rec) - - counts = await repository.aggregate_status_counts("idempotency") - # Counts include all records in namespace, check we have at least our seeded counts - assert counts[IdempotencyStatus.PROCESSING] >= 2 - assert counts[IdempotencyStatus.COMPLETED] >= 1 - - @pytest.mark.asyncio async def test_health_check(repository: RedisIdempotencyRepository) -> None: await repository.health_check() # should not raise diff --git a/backend/tests/e2e/services/sse/test_partitioned_event_router.py b/backend/tests/e2e/services/sse/test_partitioned_event_router.py index 6bb6b71f..315dea86 100644 --- a/backend/tests/e2e/services/sse/test_partitioned_event_router.py +++ b/backend/tests/e2e/services/sse/test_partitioned_event_router.py @@ -43,7 +43,7 @@ async def test_router_bridges_to_redis(redis_client: redis.Redis, test_settings: subscription = await bus.open_subscription(execution_id) ev = make_execution_requested_event(execution_id=execution_id) - handler = disp.get_handlers(ev.event_type)[0] + handler = disp._handlers[ev.event_type][0] await handler(ev) # Await the subscription directly - true async, no polling diff --git a/backend/tests/unit/core/metrics/test_database_and_dlq_metrics.py b/backend/tests/unit/core/metrics/test_database_and_dlq_metrics.py index 623e20b6..08a14b69 100644 --- a/backend/tests/unit/core/metrics/test_database_and_dlq_metrics.py +++ b/backend/tests/unit/core/metrics/test_database_and_dlq_metrics.py @@ -17,8 +17,8 @@ def test_database_metrics_methods(test_settings: Settings) -> None: m.record_idempotency_cache_miss("etype", "check") m.record_idempotency_duplicate_blocked("etype") m.record_idempotency_processing_duration(0.4, "process") - m.update_idempotency_keys_active(10, "prefix") - m.update_idempotency_keys_active(5, "prefix") + m.increment_idempotency_keys("prefix") + m.decrement_idempotency_keys("prefix") m.record_idempotent_event_processed("etype", "blocked") m.record_idempotent_processing_duration(0.5, "etype") m.update_database_connections(1) diff --git a/backend/tests/unit/events/test_event_dispatcher.py b/backend/tests/unit/events/test_event_dispatcher.py index ea526c95..38f34f93 100644 --- a/backend/tests/unit/events/test_event_dispatcher.py +++ b/backend/tests/unit/events/test_event_dispatcher.py @@ -13,23 +13,6 @@ def make_event() -> DomainEvent: return make_execution_requested_event(execution_id="e1") -async def _async_noop(_: DomainEvent) -> None: - return None - - -def test_register_and_remove_handler() -> None: - disp = EventDispatcher(logger=_test_logger) - - # Register via direct method - disp.register_handler(EventType.EXECUTION_REQUESTED, _async_noop) - assert len(disp.get_handlers(EventType.EXECUTION_REQUESTED)) == 1 - - # Remove - ok = disp.remove_handler(EventType.EXECUTION_REQUESTED, _async_noop) - assert ok is True - assert len(disp.get_handlers(EventType.EXECUTION_REQUESTED)) == 0 - - def test_decorator_registration() -> None: disp = EventDispatcher(logger=_test_logger) @@ -37,10 +20,10 @@ def test_decorator_registration() -> None: async def handler(ev: DomainEvent) -> None: # noqa: ARG001 return None - assert len(disp.get_handlers(EventType.EXECUTION_REQUESTED)) == 1 + assert len(disp._handlers[EventType.EXECUTION_REQUESTED]) == 1 -async def test_dispatch_metrics_processed_and_skipped() -> None: +async def test_dispatch_calls_matching_handler() -> None: disp = EventDispatcher(logger=_test_logger) called = {"n": 0} @@ -49,13 +32,10 @@ async def handler(_: DomainEvent) -> None: called["n"] += 1 await disp.dispatch(make_event()) - # Dispatch event with no handlers (different type) - # Reuse base event but fake type by replacing value + + # Dispatch event with no handlers (different type) — should be a no-op e = make_event() e.event_type = EventType.EXECUTION_FAILED await disp.dispatch(e) - metrics = disp.get_metrics() assert called["n"] == 1 - assert metrics[EventType.EXECUTION_REQUESTED]["processed"] >= 1 - assert metrics[EventType.EXECUTION_FAILED]["skipped"] >= 1 diff --git a/backend/tests/unit/services/idempotency/test_idempotency_manager.py b/backend/tests/unit/services/idempotency/test_idempotency_manager.py index aa7fb056..98d2c433 100644 --- a/backend/tests/unit/services/idempotency/test_idempotency_manager.py +++ b/backend/tests/unit/services/idempotency/test_idempotency_manager.py @@ -7,7 +7,6 @@ from app.domain.idempotency import KeyStrategy from app.services.idempotency.idempotency_manager import ( IdempotencyConfig, - IdempotencyKeyStrategy, IdempotencyManager, ) @@ -17,46 +16,6 @@ _test_logger = logging.getLogger("test.idempotency_manager") -class TestIdempotencyKeyStrategy: - def test_event_based(self) -> None: - event = MagicMock(spec=BaseEvent) - event.event_type = "test.event" - event.event_id = "event-123" - key = IdempotencyKeyStrategy.event_based(event) - assert key == "test.event:event-123" - - def test_content_hash_all_fields(self) -> None: - event = MagicMock(spec=BaseEvent) - event.model_dump.return_value = { - "event_id": "123", - "event_type": "test", - "timestamp": "2025-01-01", - "metadata": {}, - "field1": "value1", - "field2": "value2", - } - key = IdempotencyKeyStrategy.content_hash(event) - assert isinstance(key, str) and len(key) == 64 - - def test_content_hash_specific_fields(self) -> None: - event = MagicMock(spec=BaseEvent) - event.model_dump.return_value = { - "event_id": "123", - "event_type": "test", - "field1": "value1", - "field2": "value2", - "field3": "value3", - } - key = IdempotencyKeyStrategy.content_hash(event, fields={"field1", "field3"}) - assert isinstance(key, str) and len(key) == 64 - - def test_custom(self) -> None: - event = MagicMock(spec=BaseEvent) - event.event_type = "test.event" - key = IdempotencyKeyStrategy.custom(event, "custom-key-123") - assert key == "test.event:custom-key-123" - - class TestIdempotencyConfig: def test_default_config(self) -> None: config = IdempotencyConfig() @@ -65,8 +24,6 @@ def test_default_config(self) -> None: assert config.processing_timeout_seconds == 300 assert config.enable_result_caching is True assert config.max_result_size_bytes == 1048576 - assert config.enable_metrics is True - assert config.collection_name == "idempotency_keys" def test_custom_config(self) -> None: config = IdempotencyConfig( @@ -75,16 +32,12 @@ def test_custom_config(self) -> None: processing_timeout_seconds=600, enable_result_caching=False, max_result_size_bytes=2048, - enable_metrics=False, - collection_name="custom_keys", ) assert config.key_prefix == "custom" assert config.default_ttl_seconds == 7200 assert config.processing_timeout_seconds == 600 assert config.enable_result_caching is False assert config.max_result_size_bytes == 2048 - assert config.enable_metrics is False - assert config.collection_name == "custom_keys" def test_manager_generate_key_variants(database_metrics: DatabaseMetrics) -> None: @@ -101,4 +54,3 @@ def test_manager_generate_key_variants(database_metrics: DatabaseMetrics) -> Non assert mgr._generate_key(ev, KeyStrategy.CUSTOM, custom_key="k") == "idempotency:t:k" with pytest.raises(ValueError): mgr._generate_key(ev, KeyStrategy.CUSTOM) # CUSTOM requires custom_key - diff --git a/backend/tests/unit/services/idempotency/test_middleware.py b/backend/tests/unit/services/idempotency/test_middleware.py index 58f92ec3..eb43f6e0 100644 --- a/backend/tests/unit/services/idempotency/test_middleware.py +++ b/backend/tests/unit/services/idempotency/test_middleware.py @@ -42,7 +42,6 @@ def idempotent_event_handler( idempotency_manager=mock_idempotency_manager, key_strategy=KeyStrategy.EVENT_BASED, ttl_seconds=3600, - cache_result=True, logger=_test_logger ) @@ -76,7 +75,6 @@ async def test_call_with_fields( mock_idempotency_manager.check_and_reserve.assert_called_once_with( event=event, key_strategy=KeyStrategy.CONTENT_HASH, - custom_key=None, ttl_seconds=None, fields=fields ) @@ -108,15 +106,5 @@ async def test_call_handler_exception( event=event, error="Handler error", key_strategy=KeyStrategy.EVENT_BASED, - custom_key=None, fields=None ) - - # Duplicate handler and custom key behavior covered by integration tests - - -class TestIdempotentHandlerDecorator: - pass - -class TestIdempotentConsumerWrapper: - pass diff --git a/backend/tests/unit/services/result_processor/test_processor.py b/backend/tests/unit/services/result_processor/test_processor.py index c13fe0ab..a6947279 100644 --- a/backend/tests/unit/services/result_processor/test_processor.py +++ b/backend/tests/unit/services/result_processor/test_processor.py @@ -5,6 +5,7 @@ from app.core.metrics import EventMetrics, ExecutionMetrics from app.domain.enums.events import EventType from app.domain.enums.kafka import CONSUMER_GROUP_SUBSCRIPTIONS, GroupId, KafkaTopic +from app.events.core import EventDispatcher from app.services.result_processor.processor import ResultProcessor, ResultProcessorConfig pytestmark = pytest.mark.unit @@ -29,21 +30,21 @@ def test_custom_values(self) -> None: assert config.processing_timeout == 600 -def test_create_dispatcher_registers_handlers( +def test_register_handlers_populates_dispatcher( execution_metrics: ExecutionMetrics, event_metrics: EventMetrics ) -> None: + dispatcher = EventDispatcher(logger=_test_logger) rp = ResultProcessor( execution_repo=MagicMock(), producer=MagicMock(), schema_registry=MagicMock(), settings=MagicMock(), - idempotency_manager=MagicMock(), + dispatcher=dispatcher, logger=_test_logger, execution_metrics=execution_metrics, event_metrics=event_metrics, ) - dispatcher = rp._create_dispatcher() - assert dispatcher is not None + rp._register_handlers() assert EventType.EXECUTION_COMPLETED in dispatcher._handlers assert EventType.EXECUTION_FAILED in dispatcher._handlers assert EventType.EXECUTION_TIMEOUT in dispatcher._handlers diff --git a/backend/tests/unit/services/saga/test_saga_orchestrator_unit.py b/backend/tests/unit/services/saga/test_saga_orchestrator_unit.py index 8f2b35f9..f2ce43f4 100644 --- a/backend/tests/unit/services/saga/test_saga_orchestrator_unit.py +++ b/backend/tests/unit/services/saga/test_saga_orchestrator_unit.py @@ -12,7 +12,6 @@ from app.events.core import UnifiedProducer from app.events.event_store import EventStore from app.events.schema.schema_registry import SchemaRegistryManager -from app.services.idempotency.idempotency_manager import IdempotencyManager from app.services.saga.base_saga import BaseSaga from app.services.saga.saga_orchestrator import SagaOrchestrator from app.services.saga.saga_step import CompensationStep, SagaContext, SagaStep @@ -52,16 +51,6 @@ async def produce( return None -class _FakeIdem(IdempotencyManager): - """Fake IdempotencyManager for testing.""" - - def __init__(self) -> None: - pass # Skip parent __init__ - - async def close(self) -> None: - return None - - class _FakeStore(EventStore): """Fake EventStore for testing.""" @@ -108,7 +97,6 @@ def _orch(event_metrics: EventMetrics) -> SagaOrchestrator: schema_registry_manager=MagicMock(spec=SchemaRegistryManager), settings=MagicMock(spec=Settings), event_store=_FakeStore(), - idempotency_manager=_FakeIdem(), resource_allocation_repository=_FakeAlloc(), logger=_test_logger, event_metrics=event_metrics, @@ -136,7 +124,6 @@ async def test_should_trigger_and_existing_short_circuit(event_metrics: EventMet schema_registry_manager=MagicMock(spec=SchemaRegistryManager), settings=MagicMock(spec=Settings), event_store=_FakeStore(), - idempotency_manager=_FakeIdem(), resource_allocation_repository=_FakeAlloc(), logger=_test_logger, event_metrics=event_metrics, diff --git a/backend/tests/unit/services/sse/test_kafka_redis_bridge.py b/backend/tests/unit/services/sse/test_kafka_redis_bridge.py index 6fa5d1ef..2ad7f19a 100644 --- a/backend/tests/unit/services/sse/test_kafka_redis_bridge.py +++ b/backend/tests/unit/services/sse/test_kafka_redis_bridge.py @@ -48,7 +48,7 @@ async def test_register_and_route_events_without_kafka() -> None: disp = EventDispatcher(_test_logger) bridge._register_routing_handlers(disp) - handlers = disp.get_handlers(EventType.EXECUTION_STARTED) + handlers = disp._handlers[EventType.EXECUTION_STARTED] assert len(handlers) > 0 # Event with empty execution_id is ignored diff --git a/backend/workers/run_k8s_worker.py b/backend/workers/run_k8s_worker.py index d398a508..ddba5c46 100644 --- a/backend/workers/run_k8s_worker.py +++ b/backend/workers/run_k8s_worker.py @@ -8,8 +8,8 @@ from app.core.tracing import init_tracing from app.db.docs import ALL_DOCUMENTS from app.domain.enums.kafka import GroupId +from app.events.core import UnifiedConsumer from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas -from app.services.idempotency.middleware import IdempotentConsumerWrapper from app.services.k8s_worker import KubernetesWorker from app.settings import Settings from beanie import init_beanie @@ -35,7 +35,7 @@ async def run_kubernetes_worker(settings: Settings) -> None: # Get consumer (triggers consumer creation and start) # Consumer runs in background via its internal consume loop - await container.get(IdempotentConsumerWrapper) + await container.get(UnifiedConsumer) # Bootstrap: ensure image pre-puller DaemonSet exists # Save task to variable to prevent premature garbage collection diff --git a/backend/workers/run_result_processor.py b/backend/workers/run_result_processor.py index 5431b011..1576c8e0 100644 --- a/backend/workers/run_result_processor.py +++ b/backend/workers/run_result_processor.py @@ -10,9 +10,11 @@ from app.db.docs import ALL_DOCUMENTS from app.db.repositories.execution_repository import ExecutionRepository from app.domain.enums.kafka import GroupId +from app.domain.idempotency import KeyStrategy from app.events.core import UnifiedProducer from app.events.schema.schema_registry import SchemaRegistryManager from app.services.idempotency import IdempotencyManager +from app.services.idempotency.middleware import IdempotentEventDispatcher from app.services.result_processor.processor import ProcessingState, ResultProcessor from app.settings import Settings from beanie import init_beanie @@ -36,13 +38,20 @@ async def run_result_processor(settings: Settings) -> None: logger = await container.get(logging.Logger) logger.info(f"Beanie ODM initialized with {len(ALL_DOCUMENTS)} document models") + dispatcher = IdempotentEventDispatcher( + logger=logger, + idempotency_manager=idempotency_manager, + key_strategy=KeyStrategy.CONTENT_HASH, + ttl_seconds=7200, + ) + # ResultProcessor is manually created (not from DI), so we own its lifecycle processor = ResultProcessor( execution_repo=execution_repo, producer=producer, schema_registry=schema_registry, settings=settings, - idempotency_manager=idempotency_manager, + dispatcher=dispatcher, logger=logger, execution_metrics=execution_metrics, event_metrics=event_metrics, diff --git a/docs/architecture/idempotency.md b/docs/architecture/idempotency.md index 6b036d68..55c7b477 100644 --- a/docs/architecture/idempotency.md +++ b/docs/architecture/idempotency.md @@ -46,7 +46,7 @@ this when the same logical operation might produce different event IDs but ident execution per user per minute"). ```python ---8<-- "backend/app/services/idempotency/idempotency_manager.py:37:57" +--8<-- "backend/app/services/idempotency/idempotency_manager.py:48:66" ``` ## Status Lifecycle @@ -54,7 +54,7 @@ execution per user per minute"). Each idempotency record transitions through defined states: ```python ---8<-- "backend/app/domain/idempotency/models.py:11:15" +--8<-- "backend/app/domain/idempotency/models.py:10:13" ``` When an event arrives, the manager checks for an existing key. If none exists, it creates a record in `PROCESSING` state @@ -66,16 +66,18 @@ the previous processor crashed and allows a retry. ## Middleware Integration -The `IdempotentEventHandler` wraps Kafka event handlers with automatic duplicate detection: +The `IdempotentEventHandler` wraps a single Kafka event handler with automatic duplicate detection: ```python ---8<-- "backend/app/services/idempotency/middleware.py:39:73" +--8<-- "backend/app/services/idempotency/middleware.py:10:59" ``` -For bulk registration, the `IdempotentConsumerWrapper` wraps all handlers in a dispatcher: +`IdempotentEventDispatcher` is an `EventDispatcher` subclass that automatically wraps every registered +handler with idempotency. DI providers create this subclass for services that need idempotent event handling +(coordinator, k8s worker, result processor); services that don't (saga orchestrator) use a plain `EventDispatcher`: ```python ---8<-- "backend/app/services/idempotency/middleware.py:122:141" +--8<-- "backend/app/services/idempotency/middleware.py:62:88" ``` ## Redis Storage @@ -98,7 +100,7 @@ reservation—if two processes race to claim the same key, only one succeeds: | `max_result_size_bytes` | `1048576` | Maximum cached result size (1MB) | ```python ---8<-- "backend/app/services/idempotency/idempotency_manager.py:27:34" +--8<-- "backend/app/services/idempotency/idempotency_manager.py:26:31" ``` ## Result Caching @@ -125,5 +127,5 @@ The idempotency system exposes several metrics for monitoring: |--------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------| | [`services/idempotency/idempotency_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/idempotency/idempotency_manager.py) | Core idempotency logic | | [`services/idempotency/redis_repository.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/idempotency/redis_repository.py) | Redis storage adapter | -| [`services/idempotency/middleware.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/idempotency/middleware.py) | Handler wrappers and consumer integration | +| [`services/idempotency/middleware.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/idempotency/middleware.py) | Handler wrapper and `IdempotentEventDispatcher` | | [`domain/idempotency/`](https://github.com/HardMax71/Integr8sCode/tree/main/backend/app/domain/idempotency) | Domain models | diff --git a/docs/architecture/services-overview.md b/docs/architecture/services-overview.md index bce59980..4636504c 100644 --- a/docs/architecture/services-overview.md +++ b/docs/architecture/services-overview.md @@ -44,7 +44,7 @@ The saved_script_service.py handles CRUD for saved scripts with ownership checks The rate_limit_service.py is a Redis-backed sliding window / token bucket implementation with dynamic configuration per endpoint group, user overrides, and IP fallback. It has a safe failure mode (fail open) with explicit metrics when Redis is unavailable. -The idempotency/ module provides middleware and wrappers to make Kafka consumption idempotent using content-hash or custom keys, used for SAGA_COMMANDS to avoid duplicate pod creation. +The idempotency/ module provides `IdempotentEventDispatcher`, a subclass of `EventDispatcher` that wraps every registered handler with duplicate detection via content-hash or event-based keys. DI providers create this dispatcher for services consuming Kafka events (coordinator, k8s worker, result processor). The saga_service.py provides read-model access for saga state and guardrails like enforcing access control on saga inspection routes.