Skip to content

ccproxy.streaming.buffer

ccproxy.streaming.buffer

Streaming buffer service for converting streaming requests to non-streaming responses.

This service handles the pattern where a non-streaming request needs to be converted internally to a streaming request, buffered, and then returned as a non-streaming response.

StreamingBufferService

StreamingBufferService(
    http_client,
    request_tracer=None,
    hook_manager=None,
    http_pool_manager=None,
)

Service for handling stream-to-buffer conversion.

This service orchestrates the conversion of non-streaming requests to streaming requests internally, buffers the entire stream response, and converts it back to a non-streaming JSON response while maintaining full observability.

Parameters:

Name Type Description Default
http_client AsyncClient

HTTP client for making requests

required
request_tracer IRequestTracer | None

Optional request tracer for observability

None
hook_manager HookManager | None

Optional hook manager for event emission

None
http_pool_manager HTTPPoolManager | None

Optional HTTP pool manager for getting clients on demand

None
Source code in ccproxy/streaming/buffer.py
def __init__(
    self,
    http_client: httpx.AsyncClient,
    request_tracer: "IRequestTracer | None" = None,
    hook_manager: HookManager | None = None,
    http_pool_manager: "HTTPPoolManager | None" = None,
) -> None:
    """Initialize the streaming buffer service.

    Args:
        http_client: HTTP client for making requests
        request_tracer: Optional request tracer for observability
        hook_manager: Optional hook manager for event emission
        http_pool_manager: Optional HTTP pool manager for getting clients on demand
    """
    self.http_client = http_client
    self.request_tracer = request_tracer
    self.hook_manager = hook_manager
    self._http_pool_manager = http_pool_manager

handle_buffered_streaming_request async

handle_buffered_streaming_request(
    method,
    url,
    headers,
    body,
    handler_config,
    request_context,
    provider_name="unknown",
)

Main orchestration method for stream-to-buffer conversion.

This method: 1. Transforms the request to enable streaming 2. Makes a streaming request to the provider 3. Collects and buffers the entire stream 4. Parses the buffered stream using SSE parser if available 5. Returns a non-streaming response with proper headers and observability

Parameters:

Name Type Description Default
method str

HTTP method

required
url str

Target API URL

required
headers dict[str, str]

Request headers

required
body bytes

Request body

required
handler_config HandlerConfig

Handler configuration with SSE parser and transformers

required
request_context RequestContext

Request context for observability

required
provider_name str

Name of the provider for hook events

'unknown'

Returns:

Type Description
Response

Non-streaming Response with JSON content

Raises:

Type Description
HTTPException

If streaming fails or parsing fails

Source code in ccproxy/streaming/buffer.py
async def handle_buffered_streaming_request(
    self,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes,
    handler_config: "HandlerConfig",
    request_context: "RequestContext",
    provider_name: str = "unknown",
) -> Response:
    """Main orchestration method for stream-to-buffer conversion.

    This method:
    1. Transforms the request to enable streaming
    2. Makes a streaming request to the provider
    3. Collects and buffers the entire stream
    4. Parses the buffered stream using SSE parser if available
    5. Returns a non-streaming response with proper headers and observability

    Args:
        method: HTTP method
        url: Target API URL
        headers: Request headers
        body: Request body
        handler_config: Handler configuration with SSE parser and transformers
        request_context: Request context for observability
        provider_name: Name of the provider for hook events

    Returns:
        Non-streaming Response with JSON content

    Raises:
        HTTPException: If streaming fails or parsing fails
    """
    try:
        request_preview, request_size, request_truncated = _stringify_payload(body)
        logger.info(
            "streaming_buffer_request_received",
            provider=provider_name,
            method=method,
            url=url,
            request_id=getattr(request_context, "request_id", None),
            body_preview=request_preview,
            body_size=request_size,
            body_truncated=request_truncated,
            category="streaming",
        )

        # Step 1: Transform request to enable streaming
        streaming_body = await self._transform_to_streaming_request(body)
        transformed_preview, transformed_size, transformed_truncated = (
            _stringify_payload(streaming_body)
        )
        logger.info(
            "streaming_buffer_request_transformed",
            provider=provider_name,
            method=method,
            url=url,
            request_id=getattr(request_context, "request_id", None),
            body_preview=transformed_preview,
            body_size=transformed_size,
            body_truncated=transformed_truncated,
            body_changed=streaming_body != body,
            category="streaming",
        )

        if handler_config.response_adapter:
            logger.info(
                "streaming_buffer_response_adapter_detected",
                provider=provider_name,
                adapter_type=type(handler_config.response_adapter).__name__,
                request_id=getattr(request_context, "request_id", None),
                category="format",
            )

        # Step 2: Collect and parse the stream
        (
            final_data,
            status_code,
            response_headers,
        ) = await self._collect_and_parse_stream(
            method=method,
            url=url,
            headers=headers,
            body=streaming_body,
            handler_config=handler_config,
            request_context=request_context,
            provider_name=provider_name,
        )

        # Step 3: Build non-streaming response
        return await self._build_non_streaming_response(
            final_data=final_data,
            status_code=status_code,
            response_headers=response_headers,
            request_context=request_context,
            provider_name=provider_name,
        )

    except Exception as e:
        logger.error(
            "streaming_buffer_service_error",
            method=method,
            url=url,
            error=str(e),
            provider=provider_name,
            request_id=getattr(request_context, "request_id", None),
            exc_info=e,
        )
        # Emit error hook if hook manager is available
        if self.hook_manager:
            try:
                error_context = HookContext(
                    event=HookEvent.PROVIDER_ERROR,
                    timestamp=datetime.now(),
                    provider=provider_name,
                    data={
                        "url": url,
                        "method": method,
                        "error": str(e),
                        "phase": "streaming_buffer_service",
                    },
                    metadata={
                        "request_id": getattr(request_context, "request_id", None),
                    },
                    error=e,
                )
                await self.hook_manager.emit_with_context(error_context)
            except Exception as hook_error:
                logger.debug(
                    "hook_emission_failed",
                    event="PROVIDER_ERROR",
                    error=str(hook_error),
                    category="hooks",
                )
        raise