Skip to content

ccproxy.observability.streaming_response

ccproxy.observability.streaming_response

FastAPI StreamingResponse with automatic access logging on completion.

This module provides a reusable StreamingResponseWithLogging class that wraps any async generator and handles access logging when the stream completes, eliminating code duplication between different streaming endpoints.

StreamingResponseWithLogging

StreamingResponseWithLogging(
    content,
    request_context,
    metrics=None,
    status_code=200,
    **kwargs,
)

Bases: StreamingResponse

FastAPI StreamingResponse that triggers access logging on completion.

This class wraps a streaming response generator to automatically trigger access logging when the stream completes (either successfully or with an error). This eliminates the need for manual access logging in individual stream processors.

Parameters:

Name Type Description Default
content AsyncGenerator[bytes, None] | AsyncIterator[bytes]

The async generator producing streaming content

required
request_context RequestContext

The request context for access logging

required
metrics PrometheusMetrics | None

Optional PrometheusMetrics instance for recording metrics

None
status_code int

HTTP status code for the response

200
**kwargs Any

Additional arguments passed to StreamingResponse

{}
Source code in ccproxy/observability/streaming_response.py
def __init__(
    self,
    content: AsyncGenerator[bytes, None] | AsyncIterator[bytes],
    request_context: RequestContext,
    metrics: PrometheusMetrics | None = None,
    status_code: int = 200,
    **kwargs: Any,
) -> None:
    """Initialize streaming response with logging capability.

    Args:
        content: The async generator producing streaming content
        request_context: The request context for access logging
        metrics: Optional PrometheusMetrics instance for recording metrics
        status_code: HTTP status code for the response
        **kwargs: Additional arguments passed to StreamingResponse
    """
    # Wrap the content generator to add logging
    logged_content = self._wrap_with_logging(
        content, request_context, metrics, status_code
    )
    super().__init__(logged_content, status_code=status_code, **kwargs)