Skip to content

ccproxy.plugins.codex.adapter

ccproxy.plugins.codex.adapter

CodexAdapter

CodexAdapter(detection_service, config=None, **kwargs)

Bases: BaseHTTPAdapter

Simplified Codex adapter.

Source code in ccproxy/plugins/codex/adapter.py
def __init__(
    self,
    detection_service: DetectionServiceProtocol,
    config: Any = None,
    **kwargs: Any,
) -> None:
    super().__init__(config=config, **kwargs)
    self.detection_service: DetectionServiceProtocol = detection_service
    self.token_manager: ProfiledTokenManagerProtocol = cast(
        ProfiledTokenManagerProtocol, self.auth_manager
    )
    self.base_url = self.config.base_url.rstrip("/")

handle_request async

handle_request(request)

Handle request with Codex-specific streaming behavior.

Codex upstream only supports streaming. If the client requests a non-streaming response, we internally stream and buffer it, then return a standard Response.

Source code in ccproxy/plugins/codex/adapter.py
async def handle_request(
    self, request: Request
) -> Response | StreamingResponse | DeferredStreaming:
    """Handle request with Codex-specific streaming behavior.

    Codex upstream only supports streaming. If the client requests a non-streaming
    response, we internally stream and buffer it, then return a standard Response.
    """
    # Context + request info
    ctx = request.state.context
    self._ensure_tool_accumulator(ctx)
    endpoint = ctx.metadata.get("endpoint", "")
    body = await request.body()
    body = await self._map_request_model(ctx, body)
    headers = extract_request_headers(request)

    # Determine client streaming intent from body flag (fallback to False)
    wants_stream = False
    try:
        data = json.loads(body.decode()) if body else {}
        wants_stream = bool(data.get("stream", False))
    except Exception:  # Malformed/missing JSON -> assume non-streaming
        wants_stream = False
    logger.trace(
        "codex_adapter_request_intent",
        wants_stream=wants_stream,
        endpoint=endpoint,
        format_chain=getattr(ctx, "format_chain", []),
        category="streaming",
    )

    # Explicitly set service_type for downstream helpers
    with contextlib.suppress(Exception):
        ctx.metadata.setdefault("service_type", "codex")

    # If client wants streaming, delegate to streaming handler directly
    if wants_stream and self.streaming_handler:
        logger.trace(
            "codex_adapter_delegating_streaming",
            endpoint=endpoint,
            category="streaming",
        )
        return await self.handle_streaming(request, endpoint)

    # Otherwise, buffer the upstream streaming response into a standard one
    if getattr(self.config, "buffer_non_streaming", True):
        # 1) Prepare provider request (adds auth, sets stream=true, etc.)
        # Apply request format conversion if specified
        if ctx.format_chain and len(ctx.format_chain) > 1:
            try:
                request_payload = self._decode_json_body(
                    body, context="codex_request"
                )
                request_payload = await self._apply_format_chain(
                    data=request_payload,
                    format_chain=ctx.format_chain,
                    stage="request",
                )
                body = self._encode_json_body(request_payload)
            except Exception as e:
                logger.error(
                    "codex_format_chain_request_failed",
                    error=str(e),
                    exc_info=e,
                    category="transform",
                )
                return JSONResponse(
                    status_code=400,
                    content={
                        "error": {
                            "type": "invalid_request_error",
                            "message": "Failed to convert request using format chain",
                            "details": str(e),
                        }
                    },
                )

        prepared_body, prepared_headers = await self.prepare_provider_request(
            body, headers, endpoint
        )
        logger.trace(
            "codex_adapter_prepared_provider_request",
            header_keys=list(prepared_headers.keys()),
            body_size=len(prepared_body or b""),
            category="http",
        )

        # 2) Build handler config using composed adapter from format_chain (unified path)

        composed_adapter = (
            compose_from_chain(
                registry=self.format_registry, chain=ctx.format_chain
            )
            if self.format_registry and ctx.format_chain
            else None
        )

        handler_config = HandlerConfig(
            supports_streaming=True,
            request_transformer=None,
            response_adapter=composed_adapter,
            format_context=None,
        )

        # 3) Use StreamingBufferService to convert upstream stream -> regular response
        target_url = await self.get_target_url(endpoint)
        # Try to use a client with base_url for better hook integration
        http_client = await self.http_pool_manager.get_client()
        hook_manager = (
            getattr(self.streaming_handler, "hook_manager", None)
            if self.streaming_handler
            else None
        )
        buffer_service = StreamingBufferService(
            http_client=http_client,
            request_tracer=None,
            hook_manager=hook_manager,
            http_pool_manager=self.http_pool_manager,
        )

        buffered_response = await buffer_service.handle_buffered_streaming_request(
            method=request.method,
            url=target_url,
            headers=prepared_headers,
            body=prepared_body,
            handler_config=handler_config,
            request_context=ctx,
            provider_name="codex",
        )
        logger.trace(
            "codex_adapter_buffered_response_ready",
            status_code=buffered_response.status_code,
            buffer_respones_preview=buffered_response.body[:300],
            category="streaming",
            format_chain=getattr(ctx, "format_chain", []),
        )

        # 4) Apply reverse format chain on buffered body if needed
        if ctx.format_chain and len(ctx.format_chain) > 1:
            from typing import Literal

            mode: Literal["error", "response"] = (
                "error" if buffered_response.status_code >= 400 else "response"
            )
            try:
                body_bytes = (
                    buffered_response.body
                    if isinstance(buffered_response.body, bytes)
                    else bytes(buffered_response.body)
                )
                response_payload = self._decode_json_body(
                    body_bytes, context=f"codex_{mode}"
                )
                response_payload = await self._apply_format_chain(
                    data=response_payload,
                    format_chain=ctx.format_chain,
                    stage=mode,
                )
                metadata = getattr(ctx, "metadata", None)
                alias_map = getattr(ctx, "_model_alias_map", None)
                if isinstance(metadata, dict):
                    if (
                        isinstance(alias_map, dict)
                        and isinstance(response_payload, dict)
                        and isinstance(response_payload.get("model"), str)
                    ):
                        response_payload["model"] = alias_map.get(
                            response_payload["model"], response_payload["model"]
                        )
                    restore_model_aliases(response_payload, metadata)
                converted_body = self._encode_json_body(response_payload)
            except Exception as e:
                logger.error(
                    "codex_format_chain_response_failed",
                    error=str(e),
                    mode=mode,
                    exc_info=e,
                    category="transform",
                )
                return JSONResponse(
                    status_code=502,
                    content={
                        "error": {
                            "type": "server_error",
                            "message": "Failed to convert provider response using format chain",
                            "details": str(e),
                        }
                    },
                )

            headers_out = filter_response_headers(dict(buffered_response.headers))
            return Response(
                content=converted_body,
                status_code=buffered_response.status_code,
                headers=headers_out,
                media_type="application/json",
            )

        # No conversion needed; return buffered response as-is
        return buffered_response

    # Fallback: no buffering requested, use base non-streaming flow
    return await super().handle_request(request)

process_provider_response async

process_provider_response(response, endpoint)

Return a plain Response; streaming handled upstream by BaseHTTPAdapter.

The BaseHTTPAdapter is responsible for detecting streaming and delegating to the shared StreamingHandler. For non-streaming responses, adapters should return a simple Starlette Response.

Source code in ccproxy/plugins/codex/adapter.py
async def process_provider_response(
    self, response: httpx.Response, endpoint: str
) -> Response | StreamingResponse:
    """Return a plain Response; streaming handled upstream by BaseHTTPAdapter.

    The BaseHTTPAdapter is responsible for detecting streaming and delegating
    to the shared StreamingHandler. For non-streaming responses, adapters
    should return a simple Starlette Response.
    """
    response_headers = extract_response_headers(response)
    return Response(
        content=response.content,
        status_code=response.status_code,
        headers=response_headers,
        media_type=response.headers.get("content-type"),
    )

handle_streaming async

handle_streaming(request, endpoint, **kwargs)

Handle streaming with request conversion for Codex.

Applies request format conversion (e.g., anthropic.messages -> openai.responses) before preparing the provider request, then delegates to StreamingHandler with a streaming response adapter for reverse conversion as needed.

Source code in ccproxy/plugins/codex/adapter.py
async def handle_streaming(
    self, request: Request, endpoint: str, **kwargs: Any
) -> StreamingResponse | DeferredStreaming:
    """Handle streaming with request conversion for Codex.

    Applies request format conversion (e.g., anthropic.messages -> openai.responses) before
    preparing the provider request, then delegates to StreamingHandler with
    a streaming response adapter for reverse conversion as needed.
    """
    if not self.streaming_handler:
        # Fallback to base behavior
        return await super().handle_streaming(request, endpoint, **kwargs)

    # Get context
    ctx = request.state.context
    self._ensure_tool_accumulator(ctx)

    # Extract body and headers
    body = await request.body()
    body = await self._map_request_model(ctx, body)
    headers = extract_request_headers(request)

    # Ensure format adapters are available when required
    self._ensure_format_registry(ctx.format_chain, endpoint)

    # Apply request format conversion if a chain is defined
    if ctx.format_chain and len(ctx.format_chain) > 1:
        try:
            request_payload = self._decode_json_body(
                body, context="codex_stream_request"
            )
            request_payload = await self._apply_format_chain(
                data=request_payload,
                format_chain=ctx.format_chain,
                stage="request",
            )
            self._record_tool_definitions(ctx, request_payload)
            body = self._encode_json_body(request_payload)
        except Exception as e:
            logger.error(
                "codex_format_chain_request_failed",
                error=str(e),
                exc_info=e,
                category="transform",
            )
            # Convert error to streaming response

            error_content = {
                "error": {
                    "type": "invalid_request_error",
                    "message": "Failed to convert request using format chain",
                    "details": str(e),
                }
            }
            error_bytes = json.dumps(error_content).encode("utf-8")

            async def error_generator() -> (
                Any
            ):  # AsyncGenerator[bytes, None] would be more specific
                yield error_bytes

            return StreamingResponse(
                content=error_generator(),
                status_code=400,
                media_type="application/json",
            )

    # Provider-specific preparation (adds auth, sets stream=true)
    prepared_body, prepared_headers = await self.prepare_provider_request(
        body, headers, endpoint
    )

    # Get format adapter for streaming reverse conversion
    streaming_format_adapter = None
    if ctx.format_chain and len(ctx.format_chain) > 1 and self.format_registry:
        from_format = ctx.format_chain[-1]
        to_format = ctx.format_chain[0]
        try:
            streaming_format_adapter = self.format_registry.get_if_exists(
                from_format, to_format
            )
        except Exception:
            streaming_format_adapter = None

    handler_config = HandlerConfig(
        supports_streaming=True,
        request_transformer=None,
        response_adapter=streaming_format_adapter,
        format_context=None,
    )

    target_url = await self.get_target_url(endpoint)

    parsed_url = urlparse(target_url)
    base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"

    return await self.streaming_handler.handle_streaming_request(
        method=request.method,
        url=target_url,
        headers=prepared_headers,
        body=prepared_body,
        handler_config=handler_config,
        request_context=ctx,
        client=await self.http_pool_manager.get_client(base_url=base_url),
    )

adapt_error

adapt_error(error_body)

Convert Codex error format to appropriate API error format.

Parameters:

Name Type Description Default
error_body dict[str, Any]

Codex error response

required

Returns:

Type Description
dict[str, Any]

API-formatted error response

Source code in ccproxy/plugins/codex/adapter.py
def adapt_error(self, error_body: dict[str, Any]) -> dict[str, Any]:
    """Convert Codex error format to appropriate API error format.

    Args:
        error_body: Codex error response

    Returns:
        API-formatted error response
    """
    # Handle the specific "Stream must be set to true" error
    if isinstance(error_body, dict) and "detail" in error_body:
        detail = error_body["detail"]
        if "Stream must be set to true" in detail:
            # Convert to generic invalid request error
            return {
                "error": {
                    "type": "invalid_request_error",
                    "message": "Invalid streaming parameter",
                }
            }

    # Handle other error formats that might have "error" key
    if "error" in error_body:
        return error_body

    # Default: wrap non-standard errors
    return {
        "error": {
            "type": "internal_server_error",
            "message": "An error occurred processing the request",
        }
    }