Skip to content

ccproxy.api.middleware.hooks

ccproxy.api.middleware.hooks

Hooks middleware for request lifecycle management.

HooksMiddleware

HooksMiddleware(app, hook_manager=None)

Bases: BaseHTTPMiddleware

Middleware that emits hook lifecycle events for requests.

This middleware wraps the entire request-response cycle and emits: - REQUEST_STARTED before processing request - REQUEST_COMPLETED on successful response - REQUEST_FAILED on error

It maintains RequestContext compatibility and provides centralized hook emission for both regular and streaming responses.

Parameters:

Name Type Description Default
app Any

ASGI application

required
hook_manager HookManager | None

Hook manager for emitting events

None
Source code in ccproxy/api/middleware/hooks.py
def __init__(self, app: Any, hook_manager: HookManager | None = None) -> None:
    """Initialize the hooks middleware.

    Args:
        app: ASGI application
        hook_manager: Hook manager for emitting events
    """
    super().__init__(app)
    self.hook_manager = hook_manager

dispatch async

dispatch(request, call_next)

Dispatch the request with hook emission.

Parameters:

Name Type Description Default
request Request

The incoming request

required
call_next Any

The next middleware/handler in the chain

required

Returns:

Type Description
Response

The response from downstream handlers

Source code in ccproxy/api/middleware/hooks.py
async def dispatch(self, request: Request, call_next: Any) -> Response:
    """Dispatch the request with hook emission.

    Args:
        request: The incoming request
        call_next: The next middleware/handler in the chain

    Returns:
        The response from downstream handlers
    """
    # Get hook manager from app state if not set during init
    hook_manager = self.hook_manager
    if not hook_manager and hasattr(request.app.state, "hook_manager"):
        hook_manager = request.app.state.hook_manager

    # Skip hook emission if no hook manager available
    if not hook_manager:
        return cast(Response, await call_next(request))

    # Extract request_id from ASGI scope extensions
    request_id = getattr(request.state, "request_id", None)
    if not request_id:
        # Fallback to headers or generate one
        request_id = request.headers.get(
            "X-Request-ID", f"req-{int(time.time() * 1000)}"
        )

    # Get or create RequestContext
    from ccproxy.core.request_context import RequestContext

    request_context = RequestContext.get_current()
    if not request_context:
        # Create minimal context if none exists
        start_time_perf = time.perf_counter()
        request_context = RequestContext(
            request_id=request_id,
            start_time=start_time_perf,
            logger=logger,
        )

    # Wall-clock time for human-readable timestamps
    start_time = time.time()

    # Create hook context for the request
    logger.debug("headers_on_request_start", headers=dict(request.headers))
    hook_context = HookContext(
        event=HookEvent.REQUEST_STARTED,  # Will be overridden in emit calls
        timestamp=datetime.fromtimestamp(start_time),
        data={
            "request_id": request_id,
            "method": request.method,
            "url": str(request.url),
            # Extract headers using utility function
            "headers": extract_request_headers(request),
        },
        metadata=getattr(request_context, "metadata", {}),
        request=request,
    )

    try:
        # Emit REQUEST_STARTED before processing
        await hook_manager.emit_with_context(hook_context)

        # Capture and emit HTTP_REQUEST hook with body
        (
            body_preview,
            body_size,
            body_truncated,
            body_is_json,
        ) = await self._emit_http_request_hook(hook_manager, request, hook_context)

        accept_header = request.headers.get("accept", "").lower()
        if "text/event-stream" not in accept_header:
            logger.info(
                "request_started",
                request_id=request_id,
                method=request.method,
                url=str(request.url),
                has_body=body_preview is not None,
                body_size=body_size,
                body_truncated=body_truncated,
                is_json=body_is_json,
                origin="client",
                streaming=False,
                category="http",
            )

        # Process the request
        response = cast(Response, await call_next(request))

        # Update hook context with response information
        end_time = time.time()
        response_hook_context = HookContext(
            event=HookEvent.REQUEST_COMPLETED,  # Will be overridden in emit calls
            timestamp=datetime.fromtimestamp(start_time),
            data={
                "request_id": request_id,
                "method": request.method,
                "url": str(request.url),
                "headers": extract_request_headers(request),
                "response_status": getattr(response, "status_code", 200),
                # Response headers preserved via extract_response_headers
                "response_headers": extract_response_headers(response),
                "duration": end_time - start_time,
            },
            metadata=getattr(request_context, "metadata", {}),
            request=request,
            response=response,
        )

        # Handle streaming responses specially
        # Check if it's a streaming response (including middleware wrapped streaming responses)
        is_streaming = (
            isinstance(response, StreamingResponse)
            or type(response).__name__ == "_StreamingResponse"
        )
        logger.debug(
            "hooks_middleware_checking_response_type",
            response_type=type(response).__name__,
            response_class=str(type(response)),
            is_streaming=is_streaming,
            request_id=request_id,
        )
        if is_streaming:
            # For streaming responses, wrap with hook emission on completion
            # Don't emit REQUEST_COMPLETED here - it will be emitted when streaming actually completes

            logger.debug(
                "hooks_middleware_wrapping_streaming_response",
                request_id=request_id,
                method=request.method,
                url=str(request.url),
                status_code=getattr(response, "status_code", 200),
                duration=end_time - start_time,
                response_type="streaming",
                category="hooks",
            )

            # Wrap the streaming response to emit hooks on completion
            request_data = {
                "method": request.method,
                "url": str(request.url),
                "headers": extract_request_headers(request),
            }

            # Include RequestContext metadata if available
            request_metadata: dict[str, Any] = {}
            if request_context:
                request_metadata = getattr(request_context, "metadata", {})

            response_stream = cast(StreamingResponse, response)
            is_sse = self._is_sse_response(response_stream)

            if is_sse:
                logger.info(
                    "sse_connection_started",
                    request_id=request_id,
                    method=request.method,
                    url=str(request.url),
                    origin="client",
                    streaming=True,
                    has_body=body_preview is not None,
                    body_size=body_size,
                    body_truncated=body_truncated,
                    is_json=body_is_json,
                    category="http",
                )

            # Coerce body iterator to AsyncGenerator[bytes]
            async def _coerce_bytes() -> Any:
                async for chunk in response_stream.body_iterator:
                    if isinstance(chunk, bytes):
                        yield chunk
                    elif isinstance(chunk, memoryview):
                        yield bytes(chunk)
                    else:
                        yield str(chunk).encode("utf-8", errors="replace")

            wrapped_response = StreamingResponseWithHooks(
                content=_coerce_bytes(),
                hook_manager=hook_manager,
                request_id=request_id,
                request_data=request_data,
                request_metadata=request_metadata,
                start_time=start_time,
                status_code=response_stream.status_code,
                origin="client",
                is_sse=is_sse,
                headers=dict(response_stream.headers),
                media_type=response_stream.media_type,
            )

            return wrapped_response
        else:
            # For regular responses, emit HTTP_RESPONSE and REQUEST_COMPLETED
            await self._emit_http_response_hook(
                hook_manager, request, response, hook_context
            )
            await hook_manager.emit_with_context(response_hook_context)

            duration_ms = round((end_time - start_time) * 1000, 3)
            logger.info(
                "request_completed",
                request_id=request_id,
                method=request.method,
                url=str(request.url),
                status_code=getattr(response, "status_code", 200),
                duration_ms=duration_ms,
                origin="client",
                streaming=False,
                success=True,
                category="http",
            )

            logger.debug(
                "hooks_middleware_request_completed",
                request_id=request_id,
                method=request.method,
                url=str(request.url),
                status_code=getattr(response, "status_code", 200),
                duration=end_time - start_time,
                response_type="regular",
                category="hooks",
            )

        return response

    except Exception as e:
        # Update hook context with error information
        end_time = time.time()
        error_hook_context = HookContext(
            event=HookEvent.REQUEST_FAILED,  # Will be overridden in emit calls
            timestamp=datetime.fromtimestamp(start_time),
            data={
                "request_id": request_id,
                "method": request.method,
                "url": str(request.url),
                "headers": extract_request_headers(request),
                "duration": end_time - start_time,
            },
            metadata=getattr(request_context, "metadata", {}),
            request=request,
            error=e,
        )

        # Emit REQUEST_FAILED on error
        try:
            await hook_manager.emit_with_context(error_hook_context)
        except Exception as hook_error:
            logger.error(
                "hooks_middleware_hook_emission_failed",
                request_id=request_id,
                original_error=str(e),
                hook_error=str(hook_error),
                category="hooks",
            )

        logger.debug(
            "hooks_middleware_request_failed",
            request_id=request_id,
            method=request.method,
            url=str(request.url),
            error=str(e),
            duration=end_time - start_time,
            category="hooks",
        )

        duration_ms = round((end_time - start_time) * 1000, 3)
        status_code = getattr(e, "status_code", None)
        logger.info(
            "request_completed",
            request_id=request_id,
            method=request.method,
            url=str(request.url),
            status_code=status_code,
            duration_ms=duration_ms,
            origin="client",
            streaming=False,
            success=False,
            error_type=type(e).__name__,
            category="http",
        )

        # Re-raise the original exception
        raise

create_hooks_middleware

create_hooks_middleware(hook_manager=None)

Create a hooks middleware class with the provided hook manager.

Parameters:

Name Type Description Default
hook_manager HookManager | None

Hook manager for emitting events

None

Returns:

Type Description
type[HooksMiddleware]

HooksMiddleware class configured with the hook manager

Source code in ccproxy/api/middleware/hooks.py
def create_hooks_middleware(
    hook_manager: HookManager | None = None,
) -> type[HooksMiddleware]:
    """Create a hooks middleware class with the provided hook manager.

    Args:
        hook_manager: Hook manager for emitting events

    Returns:
        HooksMiddleware class configured with the hook manager
    """

    class ConfiguredHooksMiddleware(HooksMiddleware):
        def __init__(self, app: Any) -> None:
            super().__init__(app, hook_manager)

    return ConfiguredHooksMiddleware