Skip to content

ccproxy.services.adapters.http_adapter

ccproxy.services.adapters.http_adapter

BaseHTTPAdapter

BaseHTTPAdapter(
    config,
    auth_manager,
    http_pool_manager,
    streaming_handler=None,
    **kwargs,
)

Bases: BaseAdapter

Simplified HTTP adapter with format chain support.

Source code in ccproxy/services/adapters/http_adapter.py
def __init__(
    self,
    config: ProviderConfig,
    auth_manager: Any,
    http_pool_manager: Any,
    streaming_handler: StreamingHandler | None = None,
    **kwargs: Any,
) -> None:
    # Call parent constructor to properly initialize config
    super().__init__(config=config, **kwargs)
    self.auth_manager = auth_manager
    self.http_pool_manager = http_pool_manager
    self.streaming_handler = streaming_handler
    self.format_registry = kwargs.get("format_registry")
    self.context = kwargs.get("context")
    self.model_mapper = kwargs.get("model_mapper")

    logger.debug(
        "base_http_adapter_initialized",
        has_streaming_handler=streaming_handler is not None,
        has_format_registry=self.format_registry is not None,
    )

handle_request async

handle_request(request)

Handle request with streaming detection and format chain support.

Source code in ccproxy/services/adapters/http_adapter.py
async def handle_request(
    self, request: Request
) -> Response | StreamingResponse | DeferredStreaming:
    """Handle request with streaming detection and format chain support."""

    # Get context from middleware (already initialized)
    ctx = request.state.context
    self._ensure_tool_accumulator(ctx)

    # Step 1: Extract request data
    body = await request.body()
    body = await self._map_request_model(ctx, body)
    headers = extract_request_headers(request)
    method = request.method
    endpoint = ctx.metadata.get("endpoint", "")

    # Fail fast if a format chain is configured without a registry
    self._ensure_format_registry(ctx.format_chain, endpoint)

    # Extra debug breadcrumbs to confirm code path and detection inputs
    logger.debug(
        "http_adapter_handle_request_entry",
        endpoint=endpoint,
        method=method,
        content_type=headers.get("content-type"),
        has_streaming_handler=bool(self.streaming_handler),
        category="stream_detection",
    )

    # Step 2: Early streaming detection
    if self.streaming_handler:
        logger.debug(
            "checking_should_stream",
            endpoint=endpoint,
            has_streaming_handler=True,
            content_type=headers.get("content-type"),
            category="stream_detection",
        )
        # Detect streaming via Accept header and/or body flag stream:true
        body_wants_stream = False
        parsed_payload: dict[str, Any] | None = None
        try:
            parsed_payload = json.loads(body.decode()) if body else {}
            body_wants_stream = bool(parsed_payload.get("stream", False))
        except Exception:
            body_wants_stream = False
        header_wants_stream = self.streaming_handler.should_stream_response(headers)
        logger.debug(
            "should_stream_results",
            body_wants_stream=body_wants_stream,
            header_wants_stream=header_wants_stream,
            endpoint=endpoint,
            category="stream_detection",
        )
        if body_wants_stream or header_wants_stream:
            logger.debug(
                "streaming_request_detected",
                endpoint=endpoint,
                detected_via=(
                    "content_type_sse"
                    if header_wants_stream
                    else "body_stream_flag"
                ),
                category="stream_detection",
            )
            if isinstance(parsed_payload, dict):
                self._record_tool_definitions(ctx, parsed_payload)
            return await self.handle_streaming(request, endpoint)
        else:
            logger.debug(
                "not_streaming_request",
                endpoint=endpoint,
                category="stream_detection",
            )

    # Step 3: Execute format chain if specified (non-streaming)
    request_payload: dict[str, Any] | None = None
    if ctx.format_chain and len(ctx.format_chain) > 1:
        try:
            request_payload = self._decode_json_body(body, context="request")
        except ValueError as exc:
            logger.error(
                "format_chain_request_parse_failed",
                error=str(exc),
                endpoint=endpoint,
                category="transform",
            )
            return JSONResponse(
                status_code=400,
                content={
                    "error": {
                        "type": "invalid_request_error",
                        "message": "Failed to parse request body for format conversion",
                        "details": str(exc),
                    }
                },
            )

        self._record_tool_definitions(ctx, request_payload)

        try:
            logger.debug(
                "format_chain_request_about_to_convert",
                chain=ctx.format_chain,
                endpoint=endpoint,
                category="transform",
            )
            request_payload = await self._apply_format_chain(
                data=request_payload,
                format_chain=ctx.format_chain,
                stage="request",
            )
            body = self._encode_json_body(request_payload)
            logger.trace(
                "format_chain_request_converted",
                from_format=ctx.format_chain[0],
                to_format=ctx.format_chain[-1],
                keys=list(request_payload.keys()),
                size_bytes=len(body),
                category="transform",
            )
            logger.info(
                "format_chain_applied",
                stage="request",
                endpoint=endpoint,
                chain=ctx.format_chain,
                steps=len(ctx.format_chain) - 1,
                category="format",
            )
        except Exception as e:
            logger.error(
                "format_chain_request_failed",
                error=str(e),
                endpoint=endpoint,
                exc_info=e,
                category="transform",
            )
            return JSONResponse(
                status_code=400,
                content={
                    "error": {
                        "type": "invalid_request_error",
                        "message": "Failed to convert request using format chain",
                        "details": str(e),
                    }
                },
            )
    # Step 4: Provider-specific preparation
    prepared_body, prepared_headers = await self.prepare_provider_request(
        body, headers, endpoint
    )
    with contextlib.suppress(Exception):
        logger.trace(
            "provider_request_prepared",
            endpoint=endpoint,
            header_keys=list(prepared_headers.keys()),
            body_size=len(prepared_body or b""),
            category="http",
        )

    # Step 5: Execute HTTP request
    target_url = await self.get_target_url(endpoint)
    (
        method,
        target_url,
        prepared_body,
        prepared_headers,
    ) = await self._emit_provider_request_prepared(
        request_obj=request,
        ctx=ctx,
        method=method,
        endpoint=endpoint,
        target_url=target_url,
        prepared_body=prepared_body,
        prepared_headers=prepared_headers,
        is_streaming=False,
    )
    provider_response = await self._execute_http_request(
        method,
        target_url,
        prepared_headers,
        prepared_body,
    )
    logger.trace(
        "provider_response_received",
        status_code=getattr(provider_response, "status_code", None),
        content_type=getattr(provider_response, "headers", {}).get(
            "content-type", None
        ),
        category="http",
    )

    # Step 6: Provider-specific response processing
    response = await self.process_provider_response(provider_response, endpoint)

    # filter out hop-by-hop headers
    headers = filter_response_headers(dict(provider_response.headers))

    # Step 7: Format the response
    if isinstance(response, StreamingResponse):
        logger.debug("process_provider_response_streaming")
        return await self._convert_streaming_response(
            response, ctx.format_chain, ctx
        )
    elif isinstance(response, Response):
        logger.debug("process_provider_response")
        response = self._restore_model_response(response, ctx)

        # httpx has already decoded provider payloads, so strip encoding
        # headers that no longer match the body we forward to clients.
        for header in ("content-encoding", "transfer-encoding", "content-length"):
            with contextlib.suppress(KeyError):
                del response.headers[header]
        if ctx.format_chain and len(ctx.format_chain) > 1:
            stage: Literal["response", "error"] = (
                "error" if provider_response.status_code >= 400 else "response"
            )
            try:
                payload = self._decode_json_body(
                    cast(bytes, response.body), context=stage
                )
            except ValueError as exc:
                logger.error(
                    "format_chain_response_parse_failed",
                    error=str(exc),
                    endpoint=endpoint,
                    stage=stage,
                    category="transform",
                )
                return response

            try:
                payload = await self._apply_format_chain(
                    data=payload,
                    format_chain=ctx.format_chain,
                    stage=stage,
                )
                metadata = getattr(ctx, "metadata", None)
                if isinstance(metadata, dict):
                    alias_map = metadata.get("_model_alias_map")
                else:
                    alias_map = None
                if not alias_map:
                    alias_map = getattr(ctx, "_model_alias_map", None)
                if isinstance(metadata, dict):
                    if (
                        isinstance(payload, dict)
                        and isinstance(alias_map, Mapping)
                        and isinstance(payload.get("model"), str)
                    ):
                        payload["model"] = alias_map.get(
                            payload["model"], payload["model"]
                        )
                    restore_model_aliases(payload, metadata)
                body_bytes = self._encode_json_body(payload)
                logger.info(
                    "format_chain_applied",
                    stage=stage,
                    endpoint=endpoint,
                    chain=ctx.format_chain,
                    steps=len(ctx.format_chain) - 1,
                    category="format",
                )
                restored = Response(
                    content=body_bytes,
                    status_code=provider_response.status_code,
                    headers=headers,
                    media_type=provider_response.headers.get(
                        "content-type", "application/json"
                    ),
                )
                return self._restore_model_response(restored, ctx)
            except Exception as e:
                logger.error(
                    "format_chain_response_failed",
                    error=str(e),
                    endpoint=endpoint,
                    stage=stage,
                    exc_info=e,
                    category="transform",
                )
                # Return proper error instead of potentially malformed response
                return JSONResponse(
                    status_code=500,
                    content={
                        "error": {
                            "type": "internal_server_error",
                            "message": "Failed to convert response format",
                            "details": str(e),
                        }
                    },
                )
        else:
            logger.debug("format_chain_skipped", reason="no forward chain")
            return self._restore_model_response(response, ctx)
    else:
        logger.warning(
            "unexpected_provider_response_type", type=type(response).__name__
        )
    restored = Response(
        content=provider_response.content,
        status_code=provider_response.status_code,
        headers=headers,
        media_type=headers.get("content-type", "application/json"),
    )
    return self._restore_model_response(restored, ctx)

handle_streaming async

handle_streaming(request, endpoint, **kwargs)

Handle a streaming request using StreamingHandler with format chain support.

Source code in ccproxy/services/adapters/http_adapter.py
async def handle_streaming(
    self, request: Request, endpoint: str, **kwargs: Any
) -> StreamingResponse | DeferredStreaming:
    """Handle a streaming request using StreamingHandler with format chain support."""

    logger.debug("handle_streaming_called", endpoint=endpoint)

    if not self.streaming_handler:
        logger.error(
            "streaming_handler_missing",
            endpoint=endpoint,
            category="streaming",
        )
        raise HTTPException(
            status_code=500,
            detail={
                "error": {
                    "type": "configuration_error",
                    "message": "Streaming handler is not configured for this provider.",
                    "details": {
                        "endpoint": endpoint,
                    },
                }
            },
        )

    # Get context from middleware
    ctx = request.state.context
    method = request.method
    self._ensure_tool_accumulator(ctx)

    # Extract request data
    body = await request.body()
    body = await self._map_request_model(ctx, body)
    headers = extract_request_headers(request)

    # Fail fast on missing format registry if chain configured
    self._ensure_format_registry(ctx.format_chain, endpoint)

    # Step 1: Execute request-side format chain if specified (streaming)
    if ctx.format_chain and len(ctx.format_chain) > 1:
        try:
            stream_payload = self._decode_json_body(body, context="stream_request")
            stream_payload = await self._apply_format_chain(
                data=stream_payload,
                format_chain=ctx.format_chain,
                stage="request",
            )
            self._record_tool_definitions(ctx, stream_payload)
            body = self._encode_json_body(stream_payload)
            logger.trace(
                "format_chain_stream_request_converted",
                from_format=ctx.format_chain[0],
                to_format=ctx.format_chain[-1],
                keys=list(stream_payload.keys()),
                size_bytes=len(body),
                category="transform",
            )
            logger.info(
                "format_chain_applied",
                stage="stream_request",
                endpoint=endpoint,
                chain=ctx.format_chain,
                steps=len(ctx.format_chain) - 1,
                category="format",
            )
        except Exception as e:
            logger.error(
                "format_chain_stream_request_failed",
                error=str(e),
                endpoint=endpoint,
                exc_info=e,
                category="transform",
            )
            raise HTTPException(
                status_code=400,
                detail={
                    "error": {
                        "type": "invalid_request_error",
                        "message": "Failed to convert streaming request using format chain",
                        "details": str(e),
                    }
                },
            )

    # Step 2: Provider-specific preparation (add auth headers, etc.)
    prepared_body, prepared_headers = await self.prepare_provider_request(
        body, headers, endpoint
    )
    try:
        original_payload = json.loads(body.decode()) if body else {}
        if isinstance(original_payload, dict):
            self._record_tool_definitions(ctx, original_payload)
    except Exception:
        pass

    # Get format adapter for streaming if format chain exists
    # Important: Do NOT reverse the chain. Adapters are defined for the
    # declared flow and handle response/streaming internally.
    streaming_format_adapter = None
    if ctx.format_chain and self.format_registry:
        # For streaming responses, we need to reverse the format chain direction
        # Request: client_format → provider_format
        # Stream Response: provider_format → client_format
        from_format = ctx.format_chain[-1]  # provider format (e.g., "anthropic")
        to_format = ctx.format_chain[
            0
        ]  # client format (e.g., "openai.chat_completions")
        streaming_format_adapter = self.format_registry.get_if_exists(
            from_format, to_format
        )

        logger.debug(
            "streaming_adapter_lookup",
            format_chain=ctx.format_chain,
            from_format=from_format,
            to_format=to_format,
            adapter_found=streaming_format_adapter is not None,
            adapter_type=type(streaming_format_adapter).__name__
            if streaming_format_adapter
            else None,
        )

    # Build handler config for streaming with a composed format adapter derived from chain
    # Import here to avoid circular imports
    composed_adapter = (
        compose_from_chain(registry=self.format_registry, chain=ctx.format_chain)
        if self.format_registry and ctx.format_chain
        else streaming_format_adapter
    )

    if ctx.format_chain and len(ctx.format_chain) > 1 and composed_adapter is None:
        logger.error(
            "streaming_adapter_missing",
            endpoint=endpoint,
            chain=ctx.format_chain,
            category="format",
        )
        raise HTTPException(
            status_code=500,
            detail={
                "error": {
                    "type": "configuration_error",
                    "message": "No streaming format adapter available for configured format chain.",
                    "details": {
                        "endpoint": endpoint,
                        "format_chain": ctx.format_chain,
                    },
                }
            },
        )

    if composed_adapter is not None and ctx.format_chain:
        logger.debug(
            "streaming_format_adapter_selected",
            endpoint=endpoint,
            chain=ctx.format_chain,
            adapter_type=type(composed_adapter).__name__,
            category="format",
        )

    handler_config = HandlerConfig(
        supports_streaming=True,
        request_transformer=None,
        response_adapter=composed_adapter,  # use composed adapter when available
        format_context=None,
    )

    # Get target URL for proper client pool management
    target_url = await self.get_target_url(endpoint)

    (
        method,
        target_url,
        prepared_body,
        prepared_headers,
    ) = await self._emit_provider_request_prepared(
        request_obj=request,
        ctx=ctx,
        method=method,
        endpoint=endpoint,
        target_url=target_url,
        prepared_body=prepared_body,
        prepared_headers=prepared_headers,
        is_streaming=True,
    )

    # Get HTTP client from pool manager with base URL for hook integration
    parsed_url = urlparse(target_url)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

    # Delegate to StreamingHandler - no format chain needed since adapter is in config
    return await self.streaming_handler.handle_streaming_request(
        method=method,
        url=target_url,
        headers=prepared_headers,  # Use prepared headers with auth
        body=prepared_body,  # Use prepared body
        handler_config=handler_config,
        request_context=ctx,
        client=await self.http_pool_manager.get_client(base_url=base_url),
    )

prepare_provider_request abstractmethod async

prepare_provider_request(body, headers, endpoint)

Provider prepares request. Headers have lowercase keys.

Source code in ccproxy/services/adapters/http_adapter.py
@abstractmethod
async def prepare_provider_request(
    self, body: bytes, headers: dict[str, str], endpoint: str
) -> tuple[bytes, dict[str, str]]:
    """Provider prepares request. Headers have lowercase keys."""
    pass

process_provider_response abstractmethod async

process_provider_response(response, endpoint)

Provider processes response.

Source code in ccproxy/services/adapters/http_adapter.py
@abstractmethod
async def process_provider_response(
    self, response: httpx.Response, endpoint: str
) -> Response | StreamingResponse:
    """Provider processes response."""
    pass

get_target_url abstractmethod async

get_target_url(endpoint)

Get target URL for this provider.

Source code in ccproxy/services/adapters/http_adapter.py
@abstractmethod
async def get_target_url(self, endpoint: str) -> str:
    """Get target URL for this provider."""
    pass

cleanup async

cleanup()

Cleanup resources.

Source code in ccproxy/services/adapters/http_adapter.py
async def cleanup(self) -> None:
    """Cleanup resources."""
    logger.debug("adapter_cleanup_completed")