Skip to content

ccproxy.streaming

ccproxy.streaming

Generic streaming utilities for CCProxy.

This package provides transport-agnostic streaming functionality: - Stream interfaces and handlers - Buffer management - Deferred streaming for header preservation

StreamingBufferService

StreamingBufferService(
    http_client,
    request_tracer=None,
    hook_manager=None,
    http_pool_manager=None,
)

Service for handling stream-to-buffer conversion.

This service orchestrates the conversion of non-streaming requests to streaming requests internally, buffers the entire stream response, and converts it back to a non-streaming JSON response while maintaining full observability.

Parameters:

Name Type Description Default
http_client AsyncClient

HTTP client for making requests

required
request_tracer IRequestTracer | None

Optional request tracer for observability

None
hook_manager HookManager | None

Optional hook manager for event emission

None
http_pool_manager HTTPPoolManager | None

Optional HTTP pool manager for getting clients on demand

None
Source code in ccproxy/streaming/buffer.py
def __init__(
    self,
    http_client: httpx.AsyncClient,
    request_tracer: "IRequestTracer | None" = None,
    hook_manager: HookManager | None = None,
    http_pool_manager: "HTTPPoolManager | None" = None,
) -> None:
    """Initialize the streaming buffer service.

    Args:
        http_client: HTTP client for making requests
        request_tracer: Optional request tracer for observability
        hook_manager: Optional hook manager for event emission
        http_pool_manager: Optional HTTP pool manager for getting clients on demand
    """
    self.http_client = http_client
    self.request_tracer = request_tracer
    self.hook_manager = hook_manager
    self._http_pool_manager = http_pool_manager

handle_buffered_streaming_request async

handle_buffered_streaming_request(
    method,
    url,
    headers,
    body,
    handler_config,
    request_context,
    provider_name="unknown",
)

Main orchestration method for stream-to-buffer conversion.

This method: 1. Transforms the request to enable streaming 2. Makes a streaming request to the provider 3. Collects and buffers the entire stream 4. Parses the buffered stream using SSE parser if available 5. Returns a non-streaming response with proper headers and observability

Parameters:

Name Type Description Default
method str

HTTP method

required
url str

Target API URL

required
headers dict[str, str]

Request headers

required
body bytes

Request body

required
handler_config HandlerConfig

Handler configuration with SSE parser and transformers

required
request_context RequestContext

Request context for observability

required
provider_name str

Name of the provider for hook events

'unknown'

Returns:

Type Description
Response

Non-streaming Response with JSON content

Raises:

Type Description
HTTPException

If streaming fails or parsing fails

Source code in ccproxy/streaming/buffer.py
async def handle_buffered_streaming_request(
    self,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes,
    handler_config: "HandlerConfig",
    request_context: "RequestContext",
    provider_name: str = "unknown",
) -> Response:
    """Main orchestration method for stream-to-buffer conversion.

    This method:
    1. Transforms the request to enable streaming
    2. Makes a streaming request to the provider
    3. Collects and buffers the entire stream
    4. Parses the buffered stream using SSE parser if available
    5. Returns a non-streaming response with proper headers and observability

    Args:
        method: HTTP method
        url: Target API URL
        headers: Request headers
        body: Request body
        handler_config: Handler configuration with SSE parser and transformers
        request_context: Request context for observability
        provider_name: Name of the provider for hook events

    Returns:
        Non-streaming Response with JSON content

    Raises:
        HTTPException: If streaming fails or parsing fails
    """
    try:
        request_preview, request_size, request_truncated = _stringify_payload(body)
        logger.info(
            "streaming_buffer_request_received",
            provider=provider_name,
            method=method,
            url=url,
            request_id=getattr(request_context, "request_id", None),
            body_preview=request_preview,
            body_size=request_size,
            body_truncated=request_truncated,
            category="streaming",
        )

        # Step 1: Transform request to enable streaming
        streaming_body = await self._transform_to_streaming_request(body)
        transformed_preview, transformed_size, transformed_truncated = (
            _stringify_payload(streaming_body)
        )
        logger.info(
            "streaming_buffer_request_transformed",
            provider=provider_name,
            method=method,
            url=url,
            request_id=getattr(request_context, "request_id", None),
            body_preview=transformed_preview,
            body_size=transformed_size,
            body_truncated=transformed_truncated,
            body_changed=streaming_body != body,
            category="streaming",
        )

        if handler_config.response_adapter:
            logger.info(
                "streaming_buffer_response_adapter_detected",
                provider=provider_name,
                adapter_type=type(handler_config.response_adapter).__name__,
                request_id=getattr(request_context, "request_id", None),
                category="format",
            )

        # Step 2: Collect and parse the stream
        (
            final_data,
            status_code,
            response_headers,
        ) = await self._collect_and_parse_stream(
            method=method,
            url=url,
            headers=headers,
            body=streaming_body,
            handler_config=handler_config,
            request_context=request_context,
            provider_name=provider_name,
        )

        # Step 3: Build non-streaming response
        return await self._build_non_streaming_response(
            final_data=final_data,
            status_code=status_code,
            response_headers=response_headers,
            request_context=request_context,
            provider_name=provider_name,
        )

    except Exception as e:
        logger.error(
            "streaming_buffer_service_error",
            method=method,
            url=url,
            error=str(e),
            provider=provider_name,
            request_id=getattr(request_context, "request_id", None),
            exc_info=e,
        )
        # Emit error hook if hook manager is available
        if self.hook_manager:
            try:
                error_context = HookContext(
                    event=HookEvent.PROVIDER_ERROR,
                    timestamp=datetime.now(),
                    provider=provider_name,
                    data={
                        "url": url,
                        "method": method,
                        "error": str(e),
                        "phase": "streaming_buffer_service",
                    },
                    metadata={
                        "request_id": getattr(request_context, "request_id", None),
                    },
                    error=e,
                )
                await self.hook_manager.emit_with_context(error_context)
            except Exception as hook_error:
                logger.debug(
                    "hook_emission_failed",
                    event="PROVIDER_ERROR",
                    error=str(hook_error),
                    category="hooks",
                )
        raise

BufferService

BufferService(
    http_client,
    request_tracer=None,
    hook_manager=None,
    http_pool_manager=None,
)

Service for handling stream-to-buffer conversion.

This service orchestrates the conversion of non-streaming requests to streaming requests internally, buffers the entire stream response, and converts it back to a non-streaming JSON response while maintaining full observability.

Parameters:

Name Type Description Default
http_client AsyncClient

HTTP client for making requests

required
request_tracer IRequestTracer | None

Optional request tracer for observability

None
hook_manager HookManager | None

Optional hook manager for event emission

None
http_pool_manager HTTPPoolManager | None

Optional HTTP pool manager for getting clients on demand

None
Source code in ccproxy/streaming/buffer.py
def __init__(
    self,
    http_client: httpx.AsyncClient,
    request_tracer: "IRequestTracer | None" = None,
    hook_manager: HookManager | None = None,
    http_pool_manager: "HTTPPoolManager | None" = None,
) -> None:
    """Initialize the streaming buffer service.

    Args:
        http_client: HTTP client for making requests
        request_tracer: Optional request tracer for observability
        hook_manager: Optional hook manager for event emission
        http_pool_manager: Optional HTTP pool manager for getting clients on demand
    """
    self.http_client = http_client
    self.request_tracer = request_tracer
    self.hook_manager = hook_manager
    self._http_pool_manager = http_pool_manager

handle_buffered_streaming_request async

handle_buffered_streaming_request(
    method,
    url,
    headers,
    body,
    handler_config,
    request_context,
    provider_name="unknown",
)

Main orchestration method for stream-to-buffer conversion.

This method: 1. Transforms the request to enable streaming 2. Makes a streaming request to the provider 3. Collects and buffers the entire stream 4. Parses the buffered stream using SSE parser if available 5. Returns a non-streaming response with proper headers and observability

Parameters:

Name Type Description Default
method str

HTTP method

required
url str

Target API URL

required
headers dict[str, str]

Request headers

required
body bytes

Request body

required
handler_config HandlerConfig

Handler configuration with SSE parser and transformers

required
request_context RequestContext

Request context for observability

required
provider_name str

Name of the provider for hook events

'unknown'

Returns:

Type Description
Response

Non-streaming Response with JSON content

Raises:

Type Description
HTTPException

If streaming fails or parsing fails

Source code in ccproxy/streaming/buffer.py
async def handle_buffered_streaming_request(
    self,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes,
    handler_config: "HandlerConfig",
    request_context: "RequestContext",
    provider_name: str = "unknown",
) -> Response:
    """Main orchestration method for stream-to-buffer conversion.

    This method:
    1. Transforms the request to enable streaming
    2. Makes a streaming request to the provider
    3. Collects and buffers the entire stream
    4. Parses the buffered stream using SSE parser if available
    5. Returns a non-streaming response with proper headers and observability

    Args:
        method: HTTP method
        url: Target API URL
        headers: Request headers
        body: Request body
        handler_config: Handler configuration with SSE parser and transformers
        request_context: Request context for observability
        provider_name: Name of the provider for hook events

    Returns:
        Non-streaming Response with JSON content

    Raises:
        HTTPException: If streaming fails or parsing fails
    """
    try:
        request_preview, request_size, request_truncated = _stringify_payload(body)
        logger.info(
            "streaming_buffer_request_received",
            provider=provider_name,
            method=method,
            url=url,
            request_id=getattr(request_context, "request_id", None),
            body_preview=request_preview,
            body_size=request_size,
            body_truncated=request_truncated,
            category="streaming",
        )

        # Step 1: Transform request to enable streaming
        streaming_body = await self._transform_to_streaming_request(body)
        transformed_preview, transformed_size, transformed_truncated = (
            _stringify_payload(streaming_body)
        )
        logger.info(
            "streaming_buffer_request_transformed",
            provider=provider_name,
            method=method,
            url=url,
            request_id=getattr(request_context, "request_id", None),
            body_preview=transformed_preview,
            body_size=transformed_size,
            body_truncated=transformed_truncated,
            body_changed=streaming_body != body,
            category="streaming",
        )

        if handler_config.response_adapter:
            logger.info(
                "streaming_buffer_response_adapter_detected",
                provider=provider_name,
                adapter_type=type(handler_config.response_adapter).__name__,
                request_id=getattr(request_context, "request_id", None),
                category="format",
            )

        # Step 2: Collect and parse the stream
        (
            final_data,
            status_code,
            response_headers,
        ) = await self._collect_and_parse_stream(
            method=method,
            url=url,
            headers=headers,
            body=streaming_body,
            handler_config=handler_config,
            request_context=request_context,
            provider_name=provider_name,
        )

        # Step 3: Build non-streaming response
        return await self._build_non_streaming_response(
            final_data=final_data,
            status_code=status_code,
            response_headers=response_headers,
            request_context=request_context,
            provider_name=provider_name,
        )

    except Exception as e:
        logger.error(
            "streaming_buffer_service_error",
            method=method,
            url=url,
            error=str(e),
            provider=provider_name,
            request_id=getattr(request_context, "request_id", None),
            exc_info=e,
        )
        # Emit error hook if hook manager is available
        if self.hook_manager:
            try:
                error_context = HookContext(
                    event=HookEvent.PROVIDER_ERROR,
                    timestamp=datetime.now(),
                    provider=provider_name,
                    data={
                        "url": url,
                        "method": method,
                        "error": str(e),
                        "phase": "streaming_buffer_service",
                    },
                    metadata={
                        "request_id": getattr(request_context, "request_id", None),
                    },
                    error=e,
                )
                await self.hook_manager.emit_with_context(error_context)
            except Exception as hook_error:
                logger.debug(
                    "hook_emission_failed",
                    event="PROVIDER_ERROR",
                    error=str(hook_error),
                    category="hooks",
                )
        raise

DeferredStreaming

DeferredStreaming(
    method,
    url,
    headers,
    body,
    client,
    media_type="text/event-stream",
    handler_config=None,
    request_context=None,
    hook_manager=None,
    close_client_on_finish=False,
    on_headers=None,
)

Bases: StreamingResponse

Deferred response that starts the stream to get headers and processes SSE.

Parameters:

Name Type Description Default
method str

HTTP method

required
url str

Target URL

required
headers dict[str, str]

Request headers

required
body bytes

Request body

required
client AsyncClient

HTTP client to use

required
media_type str

Response media type

'text/event-stream'
handler_config HandlerConfig | None

Optional handler config for SSE processing

None
request_context RequestContext | None

Optional request context for tracking

None
hook_manager HookManager | None

Optional hook manager for emitting stream events

None
Source code in ccproxy/streaming/deferred.py
def __init__(
    self,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes,
    client: httpx.AsyncClient,
    media_type: str = "text/event-stream",
    handler_config: "HandlerConfig | None" = None,
    request_context: "RequestContext | None" = None,
    hook_manager: HookManager | None = None,
    close_client_on_finish: bool = False,
    on_headers: Any | None = None,
):
    """Store request details to execute later.

    Args:
        method: HTTP method
        url: Target URL
        headers: Request headers
        body: Request body
        client: HTTP client to use
        media_type: Response media type
        handler_config: Optional handler config for SSE processing
        request_context: Optional request context for tracking
        hook_manager: Optional hook manager for emitting stream events
    """
    # Store attributes first
    self.method = method
    self.url = url
    self.request_headers = headers
    self.body = body
    self.client = client
    self.media_type = media_type
    self.handler_config = handler_config
    self.request_context = request_context
    self.hook_manager = hook_manager
    self._close_client_on_finish = close_client_on_finish
    self.on_headers = on_headers
    self._stream_accumulator: StreamAccumulator | None = None

    # Create an async generator for the streaming content
    async def generate_content() -> AsyncGenerator[bytes, None]:
        # This will be replaced when __call__ is invoked
        yield b""

    # Initialize StreamingResponse with a generator
    super().__init__(content=generate_content(), media_type=media_type)

StreamingHandler

StreamingHandler(hook_manager=None)

Manages streaming request processing with header preservation and SSE adaptation.

Parameters:

Name Type Description Default
hook_manager HookManager | None

Optional hook manager for emitting stream events

None
Source code in ccproxy/streaming/handler.py
def __init__(
    self,
    hook_manager: HookManager | None = None,
) -> None:
    """Initialize with hook manager for stream events.

    Args:
        hook_manager: Optional hook manager for emitting stream events
    """
    self.hook_manager = hook_manager

should_stream_response

should_stream_response(headers)

Detect streaming intent from request headers.

  • Prefer client Accept: text/event-stream
  • Fallback to provider-style Content-Type: text/event-stream (rare for requests)
  • Case-insensitive checks
Source code in ccproxy/streaming/handler.py
def should_stream_response(self, headers: dict[str, str]) -> bool:
    """Detect streaming intent from request headers.

    - Prefer client `Accept: text/event-stream`
    - Fallback to provider-style `Content-Type: text/event-stream` (rare for requests)
    - Case-insensitive checks
    """
    accept = str(headers.get("accept", "")).lower()
    if "text/event-stream" in accept:
        return True

    content_type = str(headers.get("content-type", "")).lower()
    return "text/event-stream" in content_type

should_stream async

should_stream(request_body, handler_config)

Check if request body has stream:true flag.

  • Returns False if provider doesn't support streaming
  • Parses JSON body for 'stream' field
  • Handles parse errors gracefully
Source code in ccproxy/streaming/handler.py
async def should_stream(
    self, request_body: bytes, handler_config: HandlerConfig
) -> bool:
    """Check if request body has stream:true flag.

    - Returns False if provider doesn't support streaming
    - Parses JSON body for 'stream' field
    - Handles parse errors gracefully
    """
    if not handler_config.supports_streaming:
        return False

    try:
        data = json.loads(request_body)
        return data.get("stream", False) is True
    except (json.JSONDecodeError, TypeError):
        return False

handle_streaming_request async

handle_streaming_request(
    method,
    url,
    headers,
    body,
    handler_config,
    request_context,
    on_headers=None,
    client_config=None,
    client=None,
)

Create a deferred streaming response that preserves headers.

This always returns a DeferredStreaming response which: - Defers the actual HTTP request until FastAPI sends the response - Captures all upstream headers correctly - Supports SSE processing through handler_config - Provides request tracing and metrics

Source code in ccproxy/streaming/handler.py
async def handle_streaming_request(
    self,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes,
    handler_config: HandlerConfig,
    request_context: RequestContext,
    on_headers: Any | None = None,
    client_config: dict[str, Any] | None = None,
    client: httpx.AsyncClient | None = None,
) -> DeferredStreaming:
    """Create a deferred streaming response that preserves headers.

    This always returns a DeferredStreaming response which:
    - Defers the actual HTTP request until FastAPI sends the response
    - Captures all upstream headers correctly
    - Supports SSE processing through handler_config
    - Provides request tracing and metrics
    """

    # Use provided client or create a short-lived one
    owns_client = False
    if client is None:
        client = httpx.AsyncClient(**(client_config or {}))
        owns_client = True

    # Log that we're creating a deferred response
    logger.debug(
        "streaming_handler_creating_deferred_response",
        url=url,
        method=method,
        has_sse_adapter=bool(handler_config.response_adapter),
        adapter_type=type(handler_config.response_adapter).__name__
        if handler_config.response_adapter
        else None,
    )

    # Return the deferred response with format adapter from handler config
    return DeferredStreaming(
        method=method,
        url=url,
        headers=headers,
        body=body,
        client=client,
        media_type="text/event-stream; charset=utf-8",
        handler_config=handler_config,  # Contains format adapter if needed
        request_context=request_context,
        hook_manager=self.hook_manager,
        on_headers=on_headers,
        close_client_on_finish=owns_client,
    )

IStreamingMetricsCollector

Bases: Protocol

Interface for provider-specific streaming metrics collection.

Providers implement this interface to extract token usage and other metrics from their specific streaming response formats.

process_chunk

process_chunk(chunk_str)

Process a streaming chunk to extract metrics.

Parameters:

Name Type Description Default
chunk_str str

Raw chunk string from streaming response

required

Returns:

Type Description
bool

True if this was the final chunk with complete metrics, False otherwise

Source code in ccproxy/streaming/interfaces.py
def process_chunk(self, chunk_str: str) -> bool:
    """Process a streaming chunk to extract metrics.

    Args:
        chunk_str: Raw chunk string from streaming response

    Returns:
        True if this was the final chunk with complete metrics, False otherwise
    """
    ...

process_raw_chunk

process_raw_chunk(chunk_str)

Process a raw provider chunk before any format conversion.

This method is called with chunks in the provider's native format, before any OpenAI/Anthropic format conversion happens.

Parameters:

Name Type Description Default
chunk_str str

Raw chunk string in provider's native format

required

Returns:

Type Description
bool

True if this was the final chunk with complete metrics, False otherwise

Source code in ccproxy/streaming/interfaces.py
def process_raw_chunk(self, chunk_str: str) -> bool:
    """Process a raw provider chunk before any format conversion.

    This method is called with chunks in the provider's native format,
    before any OpenAI/Anthropic format conversion happens.

    Args:
        chunk_str: Raw chunk string in provider's native format

    Returns:
        True if this was the final chunk with complete metrics, False otherwise
    """
    ...

process_converted_chunk

process_converted_chunk(chunk_str)

Process a chunk after format conversion.

This method is called with chunks after they've been converted to a different format (e.g., OpenAI format).

Parameters:

Name Type Description Default
chunk_str str

Chunk string after format conversion

required

Returns:

Type Description
bool

True if this was the final chunk with complete metrics, False otherwise

Source code in ccproxy/streaming/interfaces.py
def process_converted_chunk(self, chunk_str: str) -> bool:
    """Process a chunk after format conversion.

    This method is called with chunks after they've been converted
    to a different format (e.g., OpenAI format).

    Args:
        chunk_str: Chunk string after format conversion

    Returns:
        True if this was the final chunk with complete metrics, False otherwise
    """
    ...

get_metrics

get_metrics()

Get the collected metrics.

Returns:

Type Description
StreamingMetrics

Dictionary with provider-specific metrics (tokens, costs, etc.)

Source code in ccproxy/streaming/interfaces.py
def get_metrics(self) -> StreamingMetrics:
    """Get the collected metrics.

    Returns:
        Dictionary with provider-specific metrics (tokens, costs, etc.)
    """
    ...

StreamingMetrics

Bases: TypedDict

Standard streaming metrics structure.