Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from app.dlq.manager import DLQManager
from app.dlq.models import DLQMessageStatus
from app.domain.enums.events import EventType
from app.domain.enums.kafka import KafkaTopic
from app.schemas_pydantic.dlq import (
DLQBatchRetryResponse,
DLQMessageDetail,
Expand Down Expand Up @@ -35,7 +36,7 @@ async def get_dlq_statistics(repository: FromDishka[DLQRepository]) -> DLQStats:
async def get_dlq_messages(
repository: FromDishka[DLQRepository],
status: DLQMessageStatus | None = Query(None),
topic: str | None = None,
topic: KafkaTopic | None = None,
event_type: EventType | None = Query(None),
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0),
Expand Down
80 changes: 19 additions & 61 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime, timezone
from datetime import datetime
from typing import Annotated
from uuid import uuid4

from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute, inject
Expand All @@ -9,12 +8,12 @@
from app.api.dependencies import admin_user, current_user
from app.core.tracing import EventAttributes, add_span_attributes
from app.core.utils import get_client_ip
from app.db.repositories.redis.idempotency_repository import IdempotencyRepository
from app.domain.enums.events import EventType
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.user import UserRole
from app.domain.events.typed import BaseEvent, DomainEvent, EventMetadata
from app.domain.events.typed import DomainEvent, EventMetadata
from app.domain.exceptions import DomainError
from app.domain.idempotency import KeyStrategy
from app.schemas_pydantic.execution import (
CancelExecutionRequest,
CancelResponse,
Expand All @@ -31,7 +30,6 @@
from app.schemas_pydantic.user import UserResponse
from app.services.event_service import EventService
from app.services.execution_service import ExecutionService
from app.services.idempotency import IdempotencyManager
from app.services.kafka_event_service import KafkaEventService
from app.settings import Settings

Expand All @@ -58,7 +56,7 @@ async def create_execution(
current_user: Annotated[UserResponse, Depends(current_user)],
execution: ExecutionRequest,
execution_service: FromDishka[ExecutionService],
idempotency_manager: FromDishka[IdempotencyManager],
idempotency_repo: FromDishka[IdempotencyRepository],
idempotency_key: Annotated[str | None, Header(alias="Idempotency-Key")] = None,
) -> ExecutionResponse:
add_span_attributes(
Expand All @@ -74,33 +72,14 @@ async def create_execution(
)

# Handle idempotency if key provided
pseudo_event = None
if idempotency_key:
# Create a pseudo-event for idempotency tracking
pseudo_event = BaseEvent(
event_id=str(uuid4()),
event_type=EventType.EXECUTION_REQUESTED,
timestamp=datetime.now(timezone.utc),
metadata=EventMetadata(
user_id=current_user.user_id, correlation_id=str(uuid4()), service_name="api", service_version="1.0.0"
),
)

# Check for duplicate request using custom key
idempotency_result = await idempotency_manager.check_and_reserve(
event=pseudo_event,
key_strategy=KeyStrategy.CUSTOM,
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
ttl_seconds=86400, # 24 hours TTL for HTTP idempotency
)
idem_key = f"exec:{current_user.user_id}:{idempotency_key}" if idempotency_key else None

if idempotency_result.is_duplicate:
cached_json = await idempotency_manager.get_cached_json(
event=pseudo_event,
key_strategy=KeyStrategy.CUSTOM,
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
)
return ExecutionResponse.model_validate_json(cached_json)
if idem_key:
is_new = await idempotency_repo.try_reserve(idem_key, ttl=86400)
if not is_new:
cached = await idempotency_repo.get_result(idem_key)
if cached:
return ExecutionResponse.model_validate_json(cached)
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Duplicate requests can hit the reservation marker value ("1") and trigger model_validate_json errors, causing a 500 when a request races with the original. Guard against the reservation sentinel or return a 409/202 until a real cached JSON result is available.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/api/routes/execution.py, line 82:

<comment>Duplicate requests can hit the reservation marker value ("1") and trigger model_validate_json errors, causing a 500 when a request races with the original. Guard against the reservation sentinel or return a 409/202 until a real cached JSON result is available.</comment>

<file context>
@@ -74,33 +72,14 @@ async def create_execution(
+        if not is_new:
+            cached = await idempotency_repo.get_result(idem_key)
+            if cached:
+                return ExecutionResponse.model_validate_json(cached)
 
     try:
</file context>
Fix with Cubic

Comment on lines +77 to +82
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing handling when duplicate request finds no cached result.

If try_reserve returns False (duplicate) but get_result returns None (original request still processing), the code falls through and re-executes the request, defeating idempotency.

Consider returning a 409 Conflict or 202 Accepted to indicate the request is still being processed:

🐛 Proposed fix
     if idem_key:
         is_new = await idempotency_repo.try_reserve(idem_key, ttl=86400)
         if not is_new:
             cached = await idempotency_repo.get_result(idem_key)
             if cached:
                 return ExecutionResponse.model_validate_json(cached)
+            # Original request still processing - return conflict
+            raise HTTPException(
+                status_code=409,
+                detail="Request with this idempotency key is currently being processed"
+            )
🤖 Prompt for AI Agents
In `@backend/app/api/routes/execution.py` around lines 77 - 82, When idem_key is
present and idempotency_repo.try_reserve(idem_key, ttl=86400) returns False but
idempotency_repo.get_result(idem_key) returns None (meaning the original request
is still processing), avoid re-executing and instead short-circuit with an
appropriate HTTP response (e.g., raise HTTPException(status_code=202) or 409)
indicating the request is in-flight; update the block around idem_key /
try_reserve / get_result to check for cached is None and return a 202 Accepted
(or 409 Conflict) with a clear body or headers rather than falling through to
re-execution so idempotency is preserved.


try:
client_ip = get_client_ip(request)
Expand All @@ -114,37 +93,16 @@ async def create_execution(
user_agent=user_agent,
)

# Store result for idempotency if key was provided
if idempotency_key and pseudo_event:
response_model = ExecutionResponse.model_validate(exec_result)
await idempotency_manager.mark_completed_with_json(
event=pseudo_event,
cached_json=response_model.model_dump_json(),
key_strategy=KeyStrategy.CUSTOM,
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
)

return ExecutionResponse.model_validate(exec_result)

except DomainError as e:
# Mark as failed for idempotency
if idempotency_key and pseudo_event:
await idempotency_manager.mark_failed(
event=pseudo_event,
error=str(e),
key_strategy=KeyStrategy.CUSTOM,
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
)
response = ExecutionResponse.model_validate(exec_result)

if idem_key:
await idempotency_repo.store_result(idem_key, response.model_dump_json(), ttl=86400)

return response

except DomainError:
raise
except Exception as e:
# Mark as failed for idempotency
if idempotency_key and pseudo_event:
await idempotency_manager.mark_failed(
event=pseudo_event,
error=str(e),
key_strategy=KeyStrategy.CUSTOM,
custom_key=f"http:{current_user.user_id}:{idempotency_key}",
)
raise HTTPException(status_code=500, detail="Internal server error during script execution") from e


Expand Down
29 changes: 1 addition & 28 deletions backend/app/api/routes/sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@
from fastapi import APIRouter, Request
from sse_starlette.sse import EventSourceResponse

from app.domain.sse import SSEHealthDomain
from app.schemas_pydantic.sse import (
ShutdownStatusResponse,
SSEExecutionEventData,
SSEHealthResponse,
SSENotificationEventData,
)
from app.schemas_pydantic.sse import SSEExecutionEventData, SSENotificationEventData
from app.services.auth_service import AuthService
from app.services.sse.sse_service import SSEService

Expand Down Expand Up @@ -38,24 +32,3 @@ async def execution_events(
return EventSourceResponse(
sse_service.create_execution_stream(execution_id=execution_id, user_id=current_user.user_id)
)


@router.get("/health", response_model=SSEHealthResponse)
async def sse_health(
request: Request,
sse_service: FromDishka[SSEService],
auth_service: FromDishka[AuthService],
) -> SSEHealthResponse:
"""Get SSE service health status."""
_ = await auth_service.get_current_user(request)
domain: SSEHealthDomain = await sse_service.get_health_status()
return SSEHealthResponse(
status=domain.status,
kafka_enabled=domain.kafka_enabled,
active_connections=domain.active_connections,
active_executions=domain.active_executions,
active_consumers=domain.active_consumers,
max_connections_per_user=domain.max_connections_per_user,
shutdown=ShutdownStatusResponse(**vars(domain.shutdown)),
timestamp=domain.timestamp,
)
33 changes: 6 additions & 27 deletions backend/app/core/dishka_lifespan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import asyncio
import logging
from contextlib import AsyncExitStack, asynccontextmanager
from contextlib import asynccontextmanager
from typing import AsyncGenerator

import redis.asyncio as redis
Expand All @@ -15,7 +15,6 @@
from app.core.startup import initialize_rate_limits
from app.core.tracing import init_tracing
from app.db.docs import ALL_DOCUMENTS
from app.events.event_store_consumer import EventStoreConsumer
from app.events.schema.schema_registry import SchemaRegistryManager, initialize_event_schemas
from app.services.notification_service import NotificationService
from app.services.sse.kafka_redis_bridge import SSEKafkaRedisBridge
Expand Down Expand Up @@ -76,43 +75,23 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
extra={"testing": settings.TESTING, "enable_tracing": settings.ENABLE_TRACING},
)

# Phase 1: Resolve all DI dependencies in parallel
(
schema_registry,
database,
redis_client,
rate_limit_metrics,
sse_bridge,
event_store_consumer,
notification_service,
) = await asyncio.gather(
# Resolve DI dependencies in parallel (fail fast on config issues)
schema_registry, database, redis_client, rate_limit_metrics, _, _ = await asyncio.gather(
container.get(SchemaRegistryManager),
container.get(Database),
container.get(redis.Redis),
container.get(RateLimitMetrics),
container.get(SSEKafkaRedisBridge),
container.get(EventStoreConsumer),
container.get(NotificationService),
)

# Phase 2: Initialize infrastructure in parallel (independent subsystems)
# Initialize infrastructure in parallel
await asyncio.gather(
initialize_event_schemas(schema_registry),
init_beanie(database=database, document_models=ALL_DOCUMENTS),
initialize_rate_limits(redis_client, settings, logger, rate_limit_metrics),
)
logger.info("Infrastructure initialized (schemas, beanie, rate limits)")

# Phase 3: Start Kafka consumers in parallel (providers already started them via async with,
# but __aenter__ is idempotent so this is safe and explicit)
async with AsyncExitStack() as stack:
stack.push_async_callback(sse_bridge.aclose)
stack.push_async_callback(event_store_consumer.aclose)
stack.push_async_callback(notification_service.aclose)
await asyncio.gather(
sse_bridge.__aenter__(),
event_store_consumer.__aenter__(),
notification_service.__aenter__(),
)
logger.info("SSE bridge, EventStoreConsumer, and NotificationService started")
yield
yield
# Container close handles all cleanup automatically
62 changes: 0 additions & 62 deletions backend/app/core/lifecycle.py

This file was deleted.

Loading
Loading