Skip to content

ccproxy.plugins.copilot.adapter

ccproxy.plugins.copilot.adapter

CopilotAdapter

CopilotAdapter(
    config,
    auth_manager,
    detection_service,
    http_pool_manager,
    oauth_provider=None,
    **kwargs,
)

Bases: BaseHTTPAdapter

Simplified Copilot adapter.

Source code in ccproxy/plugins/copilot/adapter.py
def __init__(
    self,
    config: CopilotConfig,
    auth_manager: CopilotTokenManager | None,
    detection_service: CopilotDetectionService,
    http_pool_manager: Any,
    oauth_provider: CopilotOAuthProvider | None = None,
    **kwargs: Any,
) -> None:
    super().__init__(
        config=config,
        auth_manager=auth_manager,
        http_pool_manager=http_pool_manager,
        **kwargs,
    )
    self.oauth_provider = oauth_provider
    self.detection_service = detection_service
    self.token_manager: CopilotTokenManager | None = cast(
        CopilotTokenManager | None, self.auth_manager
    )

    self.base_url = self.config.base_url.rstrip("/")

process_provider_response async

process_provider_response(response, endpoint)

Process provider response with format conversion support.

Source code in ccproxy/plugins/copilot/adapter.py
async def process_provider_response(
    self, response: httpx.Response, endpoint: str
) -> Response:
    """Process provider response with format conversion support."""
    # Streaming detection and handling is centralized in BaseHTTPAdapter.
    # Always return a plain Response for non-streaming flows.
    response_headers = extract_response_headers(response)

    # Normalize Copilot chat completion payloads to include the required
    # OpenAI "created" timestamp field. GitHub's API occasionally omits it,
    # but our OpenAI-compatible schema requires it for validation.
    if (
        response.status_code < 400
        and endpoint.endswith("/chat/completions")
        and "json" in (response.headers.get("content-type", "").lower())
    ):
        try:
            payload = response.json()
            if isinstance(payload, dict) and "choices" in payload:
                if "created" not in payload or not isinstance(
                    payload["created"], int
                ):
                    payload["created"] = int(time.time())
                    body = json.dumps(payload).encode()
                    return Response(
                        content=body,
                        status_code=response.status_code,
                        headers=response_headers,
                        media_type=response.headers.get("content-type"),
                    )
        except (json.JSONDecodeError, UnicodeDecodeError, ValueError):
            # Fall back to the raw payload if normalization fails
            pass

    if (
        response.status_code < 400
        and endpoint.endswith("/responses")
        and "json" in (response.headers.get("content-type", "").lower())
    ):
        try:
            payload = response.json()
            normalized = self._normalize_response_payload(payload)
            if normalized is not None:
                body = json.dumps(normalized).encode()
                return Response(
                    content=body,
                    status_code=response.status_code,
                    headers=response_headers,
                    media_type=response.headers.get("content-type"),
                )
        except (json.JSONDecodeError, UnicodeDecodeError, ValueError):
            # Fall back to raw payload on normalization errors
            pass

    return Response(
        content=response.content,
        status_code=response.status_code,
        headers=response_headers,
        media_type=response.headers.get("content-type"),
    )

handle_request_gh_api async

handle_request_gh_api(request)

Forward request to GitHub API with proper authentication.

Parameters:

Name Type Description Default
path

API path (e.g., '/copilot_internal/user')

required
mode

API mode - 'api' for GitHub API with OAuth token, 'copilot' for Copilot API with Copilot token

required
method

HTTP method

required
body

Request body

required
extra_headers

Additional headers

required
Source code in ccproxy/plugins/copilot/adapter.py
async def handle_request_gh_api(self, request: Request) -> Response:
    """Forward request to GitHub API with proper authentication.

    Args:
        path: API path (e.g., '/copilot_internal/user')
        mode: API mode - 'api' for GitHub API with OAuth token, 'copilot' for Copilot API with Copilot token
        method: HTTP method
        body: Request body
        extra_headers: Additional headers
    """
    auth_manager_name = (
        getattr(self.config, "auth_manager", None) or "oauth_copilot"
    )

    if self.auth_manager is None:
        from ccproxy.core.errors import AuthenticationError

        logger.warning(
            "auth_manager_override_not_resolved",
            plugin="copilot",
            auth_manager_name=auth_manager_name,
            category="auth",
        )
        raise AuthenticationError(
            "Authentication manager not configured for Copilot provider"
        )
    oauth_provider = self.oauth_provider
    if oauth_provider is None:
        from ccproxy.core.errors import AuthenticationError

        logger.warning(
            "oauth_provider_not_available",
            plugin="copilot",
            category="auth",
        )
        raise AuthenticationError(
            "OAuth provider not configured for Copilot provider"
        )

    access_token = await oauth_provider.ensure_oauth_token()
    base_url = "https://api.github.com"

    base_headers = {
        "authorization": f"Bearer {access_token}",
        "accept": "application/json",
    }
    # Get context from middleware (already initialized)
    ctx = request.state.context

    # Step 1: Extract request data
    body = await request.body()
    request_headers = extract_request_headers(request)
    method = request.method
    endpoint = ctx.metadata.get("endpoint", "")
    target_url = f"{base_url}{endpoint}"

    outgoing_headers = filter_request_headers(request_headers, preserve_auth=False)
    outgoing_headers.update(base_headers)

    provider_response = await self._execute_http_request(
        method,
        target_url,
        outgoing_headers,
        body,
    )

    filtered_headers = filter_response_headers(dict(provider_response.headers))

    return Response(
        content=provider_response.content,
        status_code=provider_response.status_code,
        headers=filtered_headers,
        media_type=provider_response.headers.get(
            "content-type", "application/json"
        ),
    )