Skip to content

ccproxy.services.proxy_service

ccproxy.services.proxy_service

Proxy service for orchestrating Claude API requests with business logic.

RequestData

Bases: TypedDict

Typed structure for transformed request data.

ResponseData

Bases: TypedDict

Typed structure for transformed response data.

ProxyService

ProxyService(
    proxy_client,
    credentials_manager,
    settings,
    proxy_mode="full",
    target_base_url="https://api.anthropic.com",
    metrics=None,
)

Claude-specific proxy orchestration with business logic.

This service orchestrates the complete proxy flow including: - Authentication management - Request/response transformations - Metrics collection (future) - Error handling and logging

Pure HTTP forwarding is delegated to BaseProxyClient.

Parameters:

Name Type Description Default
proxy_client BaseProxyClient

HTTP client for pure forwarding

required
credentials_manager CredentialsManager

Authentication manager

required
settings Settings

Application settings

required
proxy_mode str

Transformation mode - "minimal" or "full"

'full'
target_base_url str

Base URL for the target API

'https://api.anthropic.com'
metrics PrometheusMetrics | None

Prometheus metrics collector (optional)

None
Source code in ccproxy/services/proxy_service.py
def __init__(
    self,
    proxy_client: BaseProxyClient,
    credentials_manager: CredentialsManager,
    settings: Settings,
    proxy_mode: str = "full",
    target_base_url: str = "https://api.anthropic.com",
    metrics: PrometheusMetrics | None = None,
) -> None:
    """Initialize the proxy service.

    Args:
        proxy_client: HTTP client for pure forwarding
        credentials_manager: Authentication manager
        settings: Application settings
        proxy_mode: Transformation mode - "minimal" or "full"
        target_base_url: Base URL for the target API
        metrics: Prometheus metrics collector (optional)
    """
    self.proxy_client = proxy_client
    self.credentials_manager = credentials_manager
    self.settings = settings
    self.proxy_mode = proxy_mode
    self.target_base_url = target_base_url.rstrip("/")
    self.metrics = metrics or get_metrics()

    # Create concrete transformers
    self.request_transformer = HTTPRequestTransformer()
    self.response_transformer = HTTPResponseTransformer()

    # Create OpenAI adapter for stream transformation
    from ccproxy.adapters.openai.adapter import OpenAIAdapter

    self.openai_adapter = OpenAIAdapter()

    # Create mock response generator for bypass mode
    self.mock_generator = RealisticMockResponseGenerator()

    # Cache environment-based configuration
    self._proxy_url = self._init_proxy_url()
    self._ssl_context = self._init_ssl_context()
    self._verbose_streaming = (
        os.environ.get("CCPROXY_VERBOSE_STREAMING", "false").lower() == "true"
    )
    self._verbose_api = (
        os.environ.get("CCPROXY_VERBOSE_API", "false").lower() == "true"
    )
    self._request_log_dir = os.environ.get("CCPROXY_REQUEST_LOG_DIR")

    # Create request log directory if specified
    if self._request_log_dir and self._verbose_api:
        Path(self._request_log_dir).mkdir(parents=True, exist_ok=True)

    # Track current request ID for logging
    self._current_request_id: str | None = None

handle_request async

handle_request(
    method,
    path,
    headers,
    body=None,
    query_params=None,
    timeout=240.0,
    request=None,
)

Handle a proxy request with full business logic orchestration.

Parameters:

Name Type Description Default
method str

HTTP method

required
path str

Request path (without /unclaude prefix)

required
headers dict[str, str]

Request headers

required
body bytes | None

Request body

None
query_params dict[str, str | list[str]] | None

Query parameters

None
timeout float

Request timeout in seconds

240.0
request Request | None

Optional FastAPI Request object for accessing request context

None

Returns:

Type Description
tuple[int, dict[str, str], bytes] | StreamingResponse

Tuple of (status_code, headers, body) or StreamingResponse for streaming

Raises:

Type Description
HTTPException

If request fails

Source code in ccproxy/services/proxy_service.py
async def handle_request(
    self,
    method: str,
    path: str,
    headers: dict[str, str],
    body: bytes | None = None,
    query_params: dict[str, str | list[str]] | None = None,
    timeout: float = 240.0,
    request: Request | None = None,  # Optional FastAPI Request object
) -> tuple[int, dict[str, str], bytes] | StreamingResponse:
    """Handle a proxy request with full business logic orchestration.

    Args:
        method: HTTP method
        path: Request path (without /unclaude prefix)
        headers: Request headers
        body: Request body
        query_params: Query parameters
        timeout: Request timeout in seconds
        request: Optional FastAPI Request object for accessing request context

    Returns:
        Tuple of (status_code, headers, body) or StreamingResponse for streaming

    Raises:
        HTTPException: If request fails
    """
    # Extract request metadata
    model, streaming = self._extract_request_metadata(body)
    endpoint = path.split("/")[-1] if path else "unknown"

    # Handle /v1/models endpoint specially
    if path == "/v1/models":
        return await self.handle_models_request(headers, timeout)

    # Use existing context from request if available, otherwise create new one
    if request and hasattr(request, "state") and hasattr(request.state, "context"):
        # Use existing context from middleware
        ctx = request.state.context
        # Add service-specific metadata
        ctx.add_metadata(
            endpoint=endpoint,
            model=model,
            streaming=streaming,
            service_type="proxy_service",
        )
        # Create a context manager that preserves the existing context's lifecycle
        # This ensures __aexit__ is called for proper access logging
        from contextlib import asynccontextmanager

        @asynccontextmanager
        async def existing_context_manager() -> AsyncGenerator[Any, None]:
            try:
                yield ctx
            finally:
                # Let the existing context handle its own lifecycle
                # The middleware or parent context will call __aexit__
                pass

        context_manager: Any = existing_context_manager()
    else:
        # Create new context for observability
        context_manager = request_context(
            method=method,
            path=path,
            endpoint=endpoint,
            model=model,
            streaming=streaming,
            service_type="proxy_service",
            metrics=self.metrics,
        )

    async with context_manager as ctx:
        # Store the current request ID for file logging
        self._current_request_id = ctx.request_id

        try:
            # 1. Authentication - get access token
            async with timed_operation("oauth_token", ctx.request_id):
                logger.debug("oauth_token_retrieval_start")
                access_token = await self._get_access_token()

            # 2. Request transformation
            async with timed_operation("request_transform", ctx.request_id):
                logger.debug("request_transform_start")
                transformed_request = await self._transform_request(
                    method, path, headers, body, query_params, access_token
                )

            # 3. Check for bypass header to skip upstream forwarding
            bypass_upstream = (
                headers.get("X-CCProxy-Bypass-Upstream", "").lower() == "true"
            )

            if bypass_upstream:
                logger.debug("bypassing_upstream_forwarding_due_to_header")
                # Determine message type from request body for realistic response generation
                message_type = self._extract_message_type_from_body(body)

                # Check if this will be a streaming response
                should_stream = streaming or self._should_stream_response(
                    transformed_request["headers"]
                )

                # Determine response format based on original request path
                is_openai_format = self.response_transformer._is_openai_request(
                    path
                )

                if should_stream:
                    return await self._generate_bypass_streaming_response(
                        model, is_openai_format, ctx, message_type
                    )
                else:
                    return await self._generate_bypass_standard_response(
                        model, is_openai_format, ctx, message_type
                    )

            # 3. Forward request using proxy client
            logger.debug("request_forwarding_start", url=transformed_request["url"])

            # Check if this will be a streaming response
            should_stream = streaming or self._should_stream_response(
                transformed_request["headers"]
            )

            if should_stream:
                logger.debug("streaming_response_detected")
                return await self._handle_streaming_request(
                    transformed_request, path, timeout, ctx
                )
            else:
                logger.debug("non_streaming_response_detected")

            # Log the outgoing request if verbose API logging is enabled
            self._log_verbose_api_request(transformed_request)

            # Handle regular request
            async with timed_operation("api_call", ctx.request_id) as api_op:
                start_time = time.perf_counter()

                (
                    status_code,
                    response_headers,
                    response_body,
                ) = await self.proxy_client.forward(
                    method=transformed_request["method"],
                    url=transformed_request["url"],
                    headers=transformed_request["headers"],
                    body=transformed_request["body"],
                    timeout=timeout,
                )

                end_time = time.perf_counter()
                api_duration = end_time - start_time
                api_op["duration_seconds"] = api_duration

            # Log the received response if verbose API logging is enabled
            self._log_verbose_api_response(
                status_code, response_headers, response_body
            )

            # 4. Response transformation
            async with timed_operation("response_transform", ctx.request_id):
                logger.debug("response_transform_start")
                # For error responses, transform to OpenAI format if needed
                transformed_response: ResponseData
                if status_code >= 400:
                    logger.info(
                        "upstream_error_received",
                        status_code=status_code,
                        has_body=bool(response_body),
                        content_length=len(response_body) if response_body else 0,
                    )

                    # Transform error to OpenAI format if this is an OpenAI endpoint
                    transformed_error_body = response_body
                    if self.response_transformer._is_openai_request(path):
                        try:
                            error_data = json.loads(response_body.decode("utf-8"))
                            openai_error = self.openai_adapter.adapt_error(
                                error_data
                            )
                            transformed_error_body = json.dumps(
                                openai_error
                            ).encode("utf-8")
                        except (json.JSONDecodeError, UnicodeDecodeError):
                            # Keep original error if parsing fails
                            pass

                    transformed_response = ResponseData(
                        status_code=status_code,
                        headers=response_headers,
                        body=transformed_error_body,
                    )
                else:
                    transformed_response = await self._transform_response(
                        status_code, response_headers, response_body, path
                    )

            # 5. Extract response metrics using direct JSON parsing
            tokens_input = tokens_output = cache_read_tokens = (
                cache_write_tokens
            ) = cost_usd = None
            if transformed_response["body"]:
                try:
                    response_data = json.loads(
                        transformed_response["body"].decode("utf-8")
                    )
                    usage = response_data.get("usage", {})
                    tokens_input = usage.get("input_tokens")
                    tokens_output = usage.get("output_tokens")
                    cache_read_tokens = usage.get("cache_read_input_tokens")
                    cache_write_tokens = usage.get("cache_creation_input_tokens")

                    # Calculate cost including cache tokens if we have tokens and model
                    from ccproxy.utils.cost_calculator import calculate_token_cost

                    cost_usd = calculate_token_cost(
                        tokens_input,
                        tokens_output,
                        model,
                        cache_read_tokens,
                        cache_write_tokens,
                    )
                except (json.JSONDecodeError, UnicodeDecodeError):
                    pass  # Keep all values as None if parsing fails

            # 6. Update context with response data
            ctx.add_metadata(
                status_code=status_code,
                tokens_input=tokens_input,
                tokens_output=tokens_output,
                cache_read_tokens=cache_read_tokens,
                cache_write_tokens=cache_write_tokens,
                cost_usd=cost_usd,
            )

            # 7. Log comprehensive access log (includes Prometheus metrics)
            await log_request_access(
                context=ctx,
                status_code=status_code,
                method=method,
                metrics=self.metrics,
            )

            return (
                transformed_response["status_code"],
                transformed_response["headers"],
                transformed_response["body"],
            )

        except Exception as e:
            # Record error metrics via access logger
            error_type = type(e).__name__

            # Log the error with access logger (includes metrics)
            await log_request_access(
                context=ctx,
                method=method,
                error_message=str(e),
                metrics=self.metrics,
                error_type=error_type,
            )

            logger.exception(
                "proxy_request_failed",
                method=method,
                path=path,
                error=str(e),
                exc_info=True,
            )
            # Re-raise the exception without transformation
            # Let higher layers handle specific error types
            raise
        finally:
            # Reset current request ID
            self._current_request_id = None

handle_models_request async

handle_models_request(headers, timeout=240.0)

Handle a /v1/models request to list available models.

Since Anthropic API doesn't support /v1/models endpoint, returns a hardcoded list of Anthropic models and recent OpenAI models.

Parameters:

Name Type Description Default
headers dict[str, str]

Request headers

required
timeout float

Request timeout in seconds

240.0

Returns:

Type Description
tuple[int, dict[str, str], bytes]

Tuple of (status_code, headers, body)

Source code in ccproxy/services/proxy_service.py
async def handle_models_request(
    self,
    headers: dict[str, str],
    timeout: float = 240.0,
) -> tuple[int, dict[str, str], bytes]:
    """Handle a /v1/models request to list available models.

    Since Anthropic API doesn't support /v1/models endpoint,
    returns a hardcoded list of Anthropic models and recent OpenAI models.

    Args:
        headers: Request headers
        timeout: Request timeout in seconds

    Returns:
        Tuple of (status_code, headers, body)
    """
    # Define hardcoded Anthropic models
    anthropic_models = [
        {
            "type": "model",
            "id": "claude-opus-4-20250514",
            "display_name": "Claude Opus 4",
            "created_at": 1747526400,  # 2025-05-22
        },
        {
            "type": "model",
            "id": "claude-sonnet-4-20250514",
            "display_name": "Claude Sonnet 4",
            "created_at": 1747526400,  # 2025-05-22
        },
        {
            "type": "model",
            "id": "claude-3-7-sonnet-20250219",
            "display_name": "Claude Sonnet 3.7",
            "created_at": 1740268800,  # 2025-02-24
        },
        {
            "type": "model",
            "id": "claude-3-5-sonnet-20241022",
            "display_name": "Claude Sonnet 3.5 (New)",
            "created_at": 1729555200,  # 2024-10-22
        },
        {
            "type": "model",
            "id": "claude-3-5-haiku-20241022",
            "display_name": "Claude Haiku 3.5",
            "created_at": 1729555200,  # 2024-10-22
        },
        {
            "type": "model",
            "id": "claude-3-5-sonnet-20240620",
            "display_name": "Claude Sonnet 3.5 (Old)",
            "created_at": 1718841600,  # 2024-06-20
        },
        {
            "type": "model",
            "id": "claude-3-haiku-20240307",
            "display_name": "Claude Haiku 3",
            "created_at": 1709769600,  # 2024-03-07
        },
        {
            "type": "model",
            "id": "claude-3-opus-20240229",
            "display_name": "Claude Opus 3",
            "created_at": 1709164800,  # 2024-02-29
        },
    ]

    # Define recent OpenAI models to include (GPT-4 variants and O1 models)
    openai_models = [
        {
            "id": "gpt-4o",
            "object": "model",
            "created": 1715367049,
            "owned_by": "openai",
        },
        {
            "id": "gpt-4o-mini",
            "object": "model",
            "created": 1721172741,
            "owned_by": "openai",
        },
        {
            "id": "gpt-4-turbo",
            "object": "model",
            "created": 1712361441,
            "owned_by": "openai",
        },
        {
            "id": "gpt-4-turbo-preview",
            "object": "model",
            "created": 1706037777,
            "owned_by": "openai",
        },
        {
            "id": "o1",
            "object": "model",
            "created": 1734375816,
            "owned_by": "openai",
        },
        {
            "id": "o1-mini",
            "object": "model",
            "created": 1725649008,
            "owned_by": "openai",
        },
        {
            "id": "o1-preview",
            "object": "model",
            "created": 1725648897,
            "owned_by": "openai",
        },
        {
            "id": "o3",
            "object": "model",
            "created": 1744225308,
            "owned_by": "openai",
        },
        {
            "id": "o3-mini",
            "object": "model",
            "created": 1737146383,
            "owned_by": "openai",
        },
    ]

    # Combine models - mixed format with both Anthropic and OpenAI fields
    combined_response = {
        "data": anthropic_models + openai_models,
        "has_more": False,
        "object": "list",  # Add OpenAI-style field
    }

    # Serialize response
    response_body = json.dumps(combined_response).encode("utf-8")

    # Create response headers
    response_headers = {
        "content-type": "application/json",
        "content-length": str(len(response_body)),
    }

    return 200, response_headers, response_body

close async

close()

Close any resources held by the proxy service.

Source code in ccproxy/services/proxy_service.py
async def close(self) -> None:
    """Close any resources held by the proxy service."""
    if self.proxy_client:
        await self.proxy_client.close()
    if self.credentials_manager:
        await self.credentials_manager.__aexit__(None, None, None)