Skip to content

ccproxy.services.adapters.chain_composer

ccproxy.services.adapters.chain_composer

ComposedAdapter

ComposedAdapter(
    *,
    request=None,
    response=None,
    error=None,
    stream=None,
    name=None,
)

Bases: DictFormatAdapter

A DictFormatAdapter composed from multiple pairwise adapters.

Source code in ccproxy/services/adapters/format_adapter.py
def __init__(
    self,
    *,
    request: Callable[[FormatDict], Awaitable[FormatDict]]
    | Callable[[FormatDict], FormatDict]
    | None = None,
    response: Callable[[FormatDict], Awaitable[FormatDict]]
    | Callable[[FormatDict], FormatDict]
    | None = None,
    error: Callable[[FormatDict], Awaitable[FormatDict]]
    | Callable[[FormatDict], FormatDict]
    | None = None,
    stream: Callable[[AsyncIterator[FormatDict]], AsyncIterator[FormatDict]]
    | Callable[[AsyncIterator[FormatDict]], Awaitable[AsyncIterator[FormatDict]]]
    | Callable[[AsyncIterator[FormatDict]], Awaitable[Any]]
    | None = None,
    name: str | None = None,
) -> None:
    self._request = request
    self._response = response
    self._error = error
    self._stream = stream
    self.name = name or self.__class__.__name__

compose_from_chain

compose_from_chain(*, registry, chain, name=None)

Compose a FormatAdapter from a format_chain using the registry.

The composed adapter sequentially applies the per‑pair adapters for request, response, error, and stream stages.

Source code in ccproxy/services/adapters/chain_composer.py
def compose_from_chain(
    *,
    registry: FormatRegistry,
    chain: list[str],
    name: str | None = None,
) -> FormatAdapterProtocol:
    """Compose a FormatAdapter from a format_chain using the registry.

    The composed adapter sequentially applies the per‑pair adapters for request,
    response, error, and stream stages.
    """

    async def _compose_stage(
        data: dict[str, Any], stage: Literal["request", "response", "error"]
    ) -> dict[str, Any]:
        current = data
        for src, dst in _pairs_from_chain(chain, stage):
            adapter = registry.get(src, dst)
            if stage == "request":
                current = await adapter.convert_request(current)
            elif stage == "response":
                current = await adapter.convert_response(current)
            else:
                # Default error passthrough if adapter lacks explicit error handling
                with contextlib.suppress(NotImplementedError):
                    current = await adapter.convert_error(current)
        return current

    async def _request(data: dict[str, Any]) -> dict[str, Any]:
        return await _compose_stage(data, "request")

    async def _response(data: dict[str, Any]) -> dict[str, Any]:
        return await _compose_stage(data, "response")

    async def _error(data: dict[str, Any]) -> dict[str, Any]:
        return await _compose_stage(data, "error")

    async def _stream(
        stream: AsyncIterator[dict[str, Any]],
    ) -> AsyncIterator[dict[str, Any]]:
        # Pipe the stream through each pairwise adapter's convert_stream
        current_stream = stream
        for src, dst in _pairs_from_chain(chain, "stream"):
            adapter = registry.get(src, dst)
            current_stream = adapter.convert_stream(current_stream)
        async for item in current_stream:
            yield item

    return ComposedAdapter(
        request=_request,
        response=_response,
        error=_error,
        stream=_stream,
        name=name or f"ComposedAdapter({' -> '.join(chain)})",
    )