Skip to content

ccproxy.streaming.simple_adapter

ccproxy.streaming.simple_adapter

Simplified streaming adapter that bypasses complex type conversions.

This adapter provides a direct dict-based interface for streaming without the complexity of the shim layer.

SimpleStreamingAdapter

SimpleStreamingAdapter(name='simple_streaming')

Simple adapter for streaming responses that works directly with dicts.

Source code in ccproxy/streaming/simple_adapter.py
def __init__(self, name: str = "simple_streaming"):
    """Initialize the simple adapter."""
    self.name = name

adapt_request async

adapt_request(request)

Pass through request - no adaptation needed for streaming.

Source code in ccproxy/streaming/simple_adapter.py
async def adapt_request(self, request: dict[str, Any]) -> dict[str, Any]:
    """Pass through request - no adaptation needed for streaming."""
    return request

adapt_response async

adapt_response(response)

Pass through response - no adaptation needed for streaming.

Source code in ccproxy/streaming/simple_adapter.py
async def adapt_response(self, response: dict[str, Any]) -> dict[str, Any]:
    """Pass through response - no adaptation needed for streaming."""
    return response

adapt_stream

adapt_stream(stream)

Pass through stream - no adaptation needed for simple streaming.

Source code in ccproxy/streaming/simple_adapter.py
def adapt_stream(
    self, stream: AsyncIterator[dict[str, Any]]
) -> AsyncGenerator[dict[str, Any], None]:
    """Pass through stream - no adaptation needed for simple streaming."""

    async def passthrough_stream() -> AsyncGenerator[dict[str, Any], None]:
        async for chunk in stream:
            yield chunk

    return passthrough_stream()

adapt_error async

adapt_error(error)

Pass through error - no adaptation needed.

Source code in ccproxy/streaming/simple_adapter.py
async def adapt_error(self, error: dict[str, Any]) -> dict[str, Any]:
    """Pass through error - no adaptation needed."""
    return error