Skip to content

ccproxy.services.adapters

ccproxy.services.adapters

Adapter subpackage exports.

DictFormatAdapter

DictFormatAdapter(
    *,
    request=None,
    response=None,
    error=None,
    stream=None,
    name=None,
)

Bases: FormatAdapterProtocol

Adapter built from per-stage callables with strict dict IO.

Source code in ccproxy/services/adapters/format_adapter.py
def __init__(
    self,
    *,
    request: Callable[[FormatDict], Awaitable[FormatDict]]
    | Callable[[FormatDict], FormatDict]
    | None = None,
    response: Callable[[FormatDict], Awaitable[FormatDict]]
    | Callable[[FormatDict], FormatDict]
    | None = None,
    error: Callable[[FormatDict], Awaitable[FormatDict]]
    | Callable[[FormatDict], FormatDict]
    | None = None,
    stream: Callable[[AsyncIterator[FormatDict]], AsyncIterator[FormatDict]]
    | Callable[[AsyncIterator[FormatDict]], Awaitable[AsyncIterator[FormatDict]]]
    | Callable[[AsyncIterator[FormatDict]], Awaitable[Any]]
    | None = None,
    name: str | None = None,
) -> None:
    self._request = request
    self._response = response
    self._error = error
    self._stream = stream
    self.name = name or self.__class__.__name__

FormatAdapterProtocol

Bases: Protocol

Protocol for format adapters operating on plain dictionaries.

convert_request async

convert_request(data)

Convert an outgoing request payload.

Source code in ccproxy/services/adapters/format_adapter.py
async def convert_request(self, data: FormatDict) -> FormatDict:
    """Convert an outgoing request payload."""

convert_response async

convert_response(data)

Convert a non-streaming response payload.

Source code in ccproxy/services/adapters/format_adapter.py
async def convert_response(self, data: FormatDict) -> FormatDict:
    """Convert a non-streaming response payload."""

convert_error async

convert_error(data)

Convert an error payload.

Source code in ccproxy/services/adapters/format_adapter.py
async def convert_error(self, data: FormatDict) -> FormatDict:
    """Convert an error payload."""

convert_stream

convert_stream(stream)

Convert a streaming response represented as an async iterator.

Source code in ccproxy/services/adapters/format_adapter.py
def convert_stream(
    self, stream: AsyncIterator[FormatDict]
) -> AsyncIterator[FormatDict]:
    """Convert a streaming response represented as an async iterator."""

FormatRegistry

FormatRegistry()

Registry mapping format pairs to concrete adapters.

Source code in ccproxy/services/adapters/format_registry.py
def __init__(self) -> None:
    self._adapters: dict[tuple[str, str], FormatAdapterProtocol] = {}
    self._registered_plugins: dict[tuple[str, str], str] = {}
    self._pending_logs: dict[str, list[dict[str, str]]] = {}
    self._current_batch_plugin: str | None = None