Skip to content

ccproxy.plugins.claude_api.adapter

ccproxy.plugins.claude_api.adapter

ClaudeAPIAdapter

ClaudeAPIAdapter(detection_service, config=None, **kwargs)

Bases: BaseHTTPAdapter

Simplified Claude API adapter.

Source code in ccproxy/plugins/claude_api/adapter.py
def __init__(
    self,
    detection_service: DetectionServiceProtocol,
    config: ClaudeAPISettings | None = None,
    **kwargs: Any,
) -> None:
    super().__init__(config=config or ClaudeAPISettings(), **kwargs)
    self.detection_service: DetectionServiceProtocol = detection_service
    self.token_manager: TokenManagerProtocol = cast(
        TokenManagerProtocol, self.auth_manager
    )

    self.base_url = self.config.base_url.rstrip("/")

process_provider_response async

process_provider_response(response, endpoint)

Return a plain Response; streaming handled upstream by BaseHTTPAdapter.

The BaseHTTPAdapter is responsible for detecting streaming and delegating to the shared StreamingHandler. For non-streaming responses, adapters should return a simple Starlette Response.

Source code in ccproxy/plugins/claude_api/adapter.py
async def process_provider_response(
    self, response: httpx.Response, endpoint: str
) -> Response | StreamingResponse:
    """Return a plain Response; streaming handled upstream by BaseHTTPAdapter.

    The BaseHTTPAdapter is responsible for detecting streaming and delegating
    to the shared StreamingHandler. For non-streaming responses, adapters
    should return a simple Starlette Response.
    """
    response_headers = extract_response_headers(response)

    body_bytes = response.content
    media_type = response.headers.get("content-type")

    if self._needs_openai_conversion(endpoint):
        converted = self._convert_anthropic_to_openai_response(response)
        if converted is not None:
            body_bytes = json.dumps(converted).encode()
            media_type = "application/json"

    return Response(
        content=body_bytes,
        status_code=response.status_code,
        headers=response_headers,
        media_type=media_type,
    )