Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions backend/app/core/metrics/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ def record_idempotency_duplicate_blocked(self, event_type: str) -> None:
def record_idempotency_processing_duration(self, duration_seconds: float, operation: str) -> None:
self.idempotency_processing_duration.record(duration_seconds, attributes={"operation": operation})

def update_idempotency_keys_active(self, count: int, prefix: str) -> None:
# Track the delta for gauge-like behavior
key = f"_idempotency_keys_{prefix}"
current_val = getattr(self, key, 0)
delta = count - current_val
if delta != 0:
self.idempotency_keys_active.add(delta, attributes={"key_prefix": prefix})
setattr(self, key, count)
def increment_idempotency_keys(self, prefix: str) -> None:
"""Increment active idempotency keys count when a new key is created."""
self.idempotency_keys_active.add(1, attributes={"key_prefix": prefix})

def decrement_idempotency_keys(self, prefix: str) -> None:
"""Decrement active idempotency keys count when a key is removed."""
self.idempotency_keys_active.add(-1, attributes={"key_prefix": prefix})

def record_idempotent_event_processed(self, event_type: str, result: str) -> None:
self.event_store_operations.add(
Expand Down
58 changes: 25 additions & 33 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@
from app.services.execution_service import ExecutionService
from app.services.grafana_alert_processor import GrafanaAlertProcessor
from app.services.idempotency import IdempotencyConfig, IdempotencyManager
from app.services.idempotency.idempotency_manager import create_idempotency_manager
from app.services.idempotency.middleware import IdempotentConsumerWrapper
from app.services.idempotency.middleware import IdempotentEventDispatcher
from app.services.idempotency.redis_repository import RedisIdempotencyRepository
from app.services.k8s_worker import KubernetesWorker
from app.services.kafka_event_service import KafkaEventService
Expand Down Expand Up @@ -185,17 +184,10 @@ def get_idempotency_repository(self, redis_client: redis.Redis) -> RedisIdempote
return RedisIdempotencyRepository(redis_client, key_prefix="idempotency")

@provide
async def get_idempotency_manager(
def get_idempotency_manager(
self, repo: RedisIdempotencyRepository, logger: logging.Logger, database_metrics: DatabaseMetrics
) -> AsyncIterator[IdempotencyManager]:
manager = create_idempotency_manager(
repository=repo, config=IdempotencyConfig(), logger=logger, database_metrics=database_metrics
)
await manager.initialize()
try:
yield manager
finally:
await manager.close()
) -> IdempotencyManager:
return IdempotencyManager(IdempotencyConfig(), repo, logger, database_metrics)


class EventProvider(Provider):
Expand Down Expand Up @@ -579,7 +571,6 @@ async def _provide_saga_orchestrator(
schema_registry: SchemaRegistryManager,
settings: Settings,
event_store: EventStore,
idempotency_manager: IdempotencyManager,
resource_allocation_repository: ResourceAllocationRepository,
logger: logging.Logger,
event_metrics: EventMetrics,
Expand All @@ -591,7 +582,6 @@ async def _provide_saga_orchestrator(
schema_registry_manager=schema_registry,
settings=settings,
event_store=event_store,
idempotency_manager=idempotency_manager,
resource_allocation_repository=resource_allocation_repository,
config=_create_default_saga_config(),
logger=logger,
Expand All @@ -612,13 +602,19 @@ async def _provide_execution_coordinator(
event_metrics: EventMetrics,
) -> AsyncIterator[ExecutionCoordinator]:
"""Shared factory for ExecutionCoordinator with lifecycle management."""
dispatcher = IdempotentEventDispatcher(
logger=logger,
idempotency_manager=idempotency_manager,
key_strategy=KeyStrategy.EVENT_BASED,
ttl_seconds=7200,
)
async with ExecutionCoordinator(
producer=kafka_producer,
schema_registry_manager=schema_registry,
settings=settings,
event_store=event_store,
execution_repository=execution_repository,
idempotency_manager=idempotency_manager,
dispatcher=dispatcher,
logger=logger,
coordinator_metrics=coordinator_metrics,
event_metrics=event_metrics,
Expand Down Expand Up @@ -715,9 +711,16 @@ class K8sWorkerProvider(Provider):
scope = Scope.APP

@provide
def get_k8s_worker_dispatcher(self, logger: logging.Logger) -> EventDispatcher:
"""Create EventDispatcher for K8s worker."""
return EventDispatcher(logger=logger)
def get_k8s_worker_dispatcher(
self, logger: logging.Logger, idempotency_manager: IdempotencyManager
) -> EventDispatcher:
"""Create idempotent EventDispatcher for K8s worker."""
return IdempotentEventDispatcher(
logger=logger,
idempotency_manager=idempotency_manager,
key_strategy=KeyStrategy.CONTENT_HASH,
ttl_seconds=3600,
)

@provide
def get_kubernetes_worker(
Expand Down Expand Up @@ -746,10 +749,9 @@ async def get_k8s_worker_consumer(
dispatcher: EventDispatcher,
schema_registry: SchemaRegistryManager,
settings: Settings,
idempotency_manager: IdempotencyManager,
logger: logging.Logger,
event_metrics: EventMetrics,
) -> AsyncIterator[IdempotentConsumerWrapper]:
) -> AsyncIterator[UnifiedConsumer]:
"""Create and start consumer for K8s worker."""
consumer_config = ConsumerConfig(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
Expand All @@ -770,24 +772,14 @@ async def get_k8s_worker_consumer(
event_metrics=event_metrics,
)

idempotent_consumer = IdempotentConsumerWrapper(
consumer=consumer,
idempotency_manager=idempotency_manager,
dispatcher=dispatcher,
logger=logger,
default_key_strategy=KeyStrategy.CONTENT_HASH,
default_ttl_seconds=3600,
enable_for_all_handlers=True,
)

await idempotent_consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.K8S_WORKER]))
await consumer.start(list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.K8S_WORKER]))
logger.info("K8s worker consumer started")

try:
yield idempotent_consumer
yield consumer
finally:
await worker.wait_for_active_creations()
await idempotent_consumer.stop()
await consumer.stop()
logger.info("K8s worker consumer stopped")


Expand Down
2 changes: 0 additions & 2 deletions backend/app/domain/idempotency/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from .models import (
IdempotencyRecord,
IdempotencyStats,
IdempotencyStatus,
KeyStrategy,
)

__all__ = [
"IdempotencyStatus",
"IdempotencyRecord",
"IdempotencyStats",
"KeyStrategy",
]
8 changes: 0 additions & 8 deletions backend/app/domain/idempotency/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ class IdempotencyStatus(StringEnum):
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
EXPIRED = "expired"


class KeyStrategy(StringEnum):
Expand All @@ -34,10 +33,3 @@ class IdempotencyRecord:
processing_duration_ms: int | None = None
error: str | None = None
result_json: str | None = None


@dataclass
class IdempotencyStats:
total_keys: int
status_counts: dict[IdempotencyStatus, int]
prefix: str
89 changes: 11 additions & 78 deletions backend/app/events/core/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from app.domain.enums.events import EventType
from app.domain.events.typed import DomainEvent
from app.infrastructure.kafka.mappings import get_event_class_for_type

T = TypeVar("T", bound=DomainEvent)
EventHandler: TypeAlias = Callable[[DomainEvent], Awaitable[None]]
Expand All @@ -18,20 +17,20 @@ class EventDispatcher:

This dispatcher eliminates the need for manual if/elif routing by maintaining
a direct mapping from event types to their handlers.

Subclasses may override ``_wrap_handler`` to intercept handler registration
(e.g. ``IdempotentEventDispatcher`` adds idempotency protection).
"""

def __init__(self, logger: logging.Logger) -> None:
self.logger = logger

# Map event types to their handlers
self._handlers: dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]] = defaultdict(list)

# Map topics to event types that can appear on them
self._topic_event_types: dict[str, set[type[DomainEvent]]] = defaultdict(set)

# Metrics per event type
self._event_metrics: dict[EventType, dict[str, int]] = defaultdict(
lambda: {"processed": 0, "failed": 0, "skipped": 0}
)
def _wrap_handler(self, handler: EventHandler) -> EventHandler:
"""Hook for subclasses to wrap handlers at registration time."""
return handler

def register(
self, event_type: EventType
Expand All @@ -51,7 +50,7 @@ async def handle_execution(event: ExecutionRequestedEvent) -> None:
def decorator(handler: Callable[[T], Awaitable[None]]) -> Callable[[T], Awaitable[None]]:
self.logger.info(f"Registering handler '{handler.__name__}' for event type '{event_type}'")
# Safe: dispatch() routes by event_type, guaranteeing correct types at runtime
self._handlers[event_type].append(handler) # type: ignore[arg-type]
self._handlers[event_type].append(self._wrap_handler(handler)) # type: ignore[arg-type]
return handler

return decorator
Expand All @@ -65,27 +64,7 @@ def register_handler(self, event_type: EventType, handler: EventHandler) -> None
handler: The async handler function
"""
self.logger.info(f"Registering handler '{handler.__name__}' for event type '{event_type}'")
self._handlers[event_type].append(handler)

def remove_handler(self, event_type: EventType, handler: EventHandler) -> bool:
"""
Remove a specific handler for an event type.

Args:
event_type: The event type to remove handler from
handler: The handler function to remove

Returns:
True if handler was found and removed, False otherwise
"""
if event_type in self._handlers and handler in self._handlers[event_type]:
self._handlers[event_type].remove(handler)
self.logger.info(f"Removed handler '{handler.__name__}' for event type '{event_type}'")
# Clean up empty lists
if not self._handlers[event_type]:
del self._handlers[event_type]
return True
return False
self._handlers[event_type].append(self._wrap_handler(handler))

async def dispatch(self, event: DomainEvent) -> None:
"""
Expand All @@ -102,25 +81,14 @@ async def dispatch(self, event: DomainEvent) -> None:
)

if not handlers:
self._event_metrics[event_type]["skipped"] += 1
self.logger.debug(f"No handlers registered for event type {event_type}")
return

self.logger.debug(f"Dispatching {event_type} to {len(handlers)} handler(s)")

# Run handlers concurrently for better performance
tasks = []
for handler in handlers:
tasks.append(self._execute_handler(handler, event))

results = await asyncio.gather(*tasks, return_exceptions=True)

# Count successes and failures
for result in results:
if isinstance(result, Exception):
self._event_metrics[event_type]["failed"] += 1
else:
self._event_metrics[event_type]["processed"] += 1
tasks = [self._execute_handler(handler, event) for handler in handlers]
await asyncio.gather(*tasks)

async def _execute_handler(self, handler: EventHandler, event: DomainEvent) -> None:
"""
Expand All @@ -140,38 +108,3 @@ async def _execute_handler(self, handler: EventHandler, event: DomainEvent) -> N
)
raise

def get_topics_for_registered_handlers(self) -> set[str]:
"""
Get all topics that have registered handlers.

Returns:
Set of topic names that should be subscribed to
"""
topics = set()
for event_type in self._handlers.keys():
# Find event class for this type
event_class = get_event_class_for_type(event_type)
if event_class and hasattr(event_class, "topic"):
topics.add(str(event_class.topic))
return topics

def get_metrics(self) -> dict[str, dict[str, int]]:
"""Get processing metrics for all event types."""
return {event_type: metrics for event_type, metrics in self._event_metrics.items()}

def clear_handlers(self) -> None:
"""Clear all registered handlers (useful for testing)."""
self._handlers.clear()
self.logger.info("All event handlers cleared")

def get_handlers(self, event_type: EventType) -> list[Callable[[DomainEvent], Awaitable[None]]]:
"""Get all handlers for a specific event type."""
return self._handlers.get(event_type, []).copy()

def get_all_handlers(self) -> dict[EventType, list[Callable[[DomainEvent], Awaitable[None]]]]:
"""Get all registered handlers (returns a copy)."""
return {k: v.copy() for k, v in self._handlers.items()}

def replace_handlers(self, event_type: EventType, handlers: list[Callable[[DomainEvent], Awaitable[None]]]) -> None:
"""Replace all handlers for a specific event type."""
self._handlers[event_type] = handlers
Loading
Loading