Skip to content

ccproxy.streaming.deferred

ccproxy.streaming.deferred

Deferred streaming response that preserves headers.

This implementation solves the header timing issue and supports SSE processing.

DeferredStreaming

DeferredStreaming(
    method,
    url,
    headers,
    body,
    client,
    media_type="text/event-stream",
    handler_config=None,
    request_context=None,
    hook_manager=None,
    close_client_on_finish=False,
    on_headers=None,
)

Bases: StreamingResponse

Deferred response that starts the stream to get headers and processes SSE.

Parameters:

Name Type Description Default
method str

HTTP method

required
url str

Target URL

required
headers dict[str, str]

Request headers

required
body bytes

Request body

required
client AsyncClient

HTTP client to use

required
media_type str

Response media type

'text/event-stream'
handler_config HandlerConfig | None

Optional handler config for SSE processing

None
request_context RequestContext | None

Optional request context for tracking

None
hook_manager HookManager | None

Optional hook manager for emitting stream events

None
Source code in ccproxy/streaming/deferred.py
def __init__(
    self,
    method: str,
    url: str,
    headers: dict[str, str],
    body: bytes,
    client: httpx.AsyncClient,
    media_type: str = "text/event-stream",
    handler_config: "HandlerConfig | None" = None,
    request_context: "RequestContext | None" = None,
    hook_manager: HookManager | None = None,
    close_client_on_finish: bool = False,
    on_headers: Any | None = None,
):
    """Store request details to execute later.

    Args:
        method: HTTP method
        url: Target URL
        headers: Request headers
        body: Request body
        client: HTTP client to use
        media_type: Response media type
        handler_config: Optional handler config for SSE processing
        request_context: Optional request context for tracking
        hook_manager: Optional hook manager for emitting stream events
    """
    # Store attributes first
    self.method = method
    self.url = url
    self.request_headers = headers
    self.body = body
    self.client = client
    self.media_type = media_type
    self.handler_config = handler_config
    self.request_context = request_context
    self.hook_manager = hook_manager
    self._close_client_on_finish = close_client_on_finish
    self.on_headers = on_headers
    self._stream_accumulator: StreamAccumulator | None = None

    # Create an async generator for the streaming content
    async def generate_content() -> AsyncGenerator[bytes, None]:
        # This will be replaced when __call__ is invoked
        yield b""

    # Initialize StreamingResponse with a generator
    super().__init__(content=generate_content(), media_type=media_type)