Skip to content

ccproxy.plugins.copilot.oauth.provider

ccproxy.plugins.copilot.oauth.provider

OAuth provider implementation for GitHub Copilot.

CopilotOAuthProvider

CopilotOAuthProvider(
    config=None,
    storage=None,
    http_client=None,
    hook_manager=None,
    detection_service=None,
)

Bases: ProfileLoggingMixin

GitHub Copilot OAuth provider implementation.

Parameters:

Name Type Description Default
config CopilotOAuthConfig | None

OAuth configuration

None
storage CopilotOAuthStorage | None

Token storage

None
http_client AsyncClient | None

Optional HTTP client for request tracing

None
hook_manager Any | None

Optional hook manager for events

None
detection_service CLIDetectionService | None

Optional CLI detection service

None
Source code in ccproxy/plugins/copilot/oauth/provider.py
def __init__(
    self,
    config: CopilotOAuthConfig | None = None,
    storage: CopilotOAuthStorage | None = None,
    http_client: httpx.AsyncClient | None = None,
    hook_manager: Any | None = None,
    detection_service: "CLIDetectionService | None" = None,
):
    """Initialize Copilot OAuth provider.

    Args:
        config: OAuth configuration
        storage: Token storage
        http_client: Optional HTTP client for request tracing
        hook_manager: Optional hook manager for events
        detection_service: Optional CLI detection service
    """
    self.config = config or CopilotOAuthConfig()
    self.storage = storage or CopilotOAuthStorage()
    self.hook_manager = hook_manager
    self.detection_service = detection_service
    self.http_client = http_client
    self._cached_profile: StandardProfileFields | None = None

    self.client = CopilotOAuthClient(
        self.config,
        self.storage,
        http_client,
        hook_manager=hook_manager,
        detection_service=detection_service,
    )

provider_name property

provider_name

Internal provider name.

provider_display_name property

provider_display_name

Display name for UI.

supports_pkce property

supports_pkce

Whether this provider supports PKCE.

supports_refresh property

supports_refresh

Whether this provider supports token refresh.

requires_client_secret property

requires_client_secret

Whether this provider requires a client secret.

cli property

cli

Get CLI authentication configuration for this provider.

get_authorization_url async

get_authorization_url(
    state, code_verifier=None, redirect_uri=None
)

Get the authorization URL for GitHub Device Code Flow.

For device code flow, this returns the device authorization endpoint. The actual user verification happens at the verification_uri returned by start_device_flow().

Parameters:

Name Type Description Default
state str

OAuth state parameter (not used in device flow)

required
code_verifier str | None

PKCE code verifier (not used in device flow)

None

Returns:

Type Description
str

Device authorization URL

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_authorization_url(
    self,
    state: str,
    code_verifier: str | None = None,
    redirect_uri: str | None = None,
) -> str:
    """Get the authorization URL for GitHub Device Code Flow.

    For device code flow, this returns the device authorization endpoint.
    The actual user verification happens at the verification_uri returned
    by start_device_flow().

    Args:
        state: OAuth state parameter (not used in device flow)
        code_verifier: PKCE code verifier (not used in device flow)

    Returns:
        Device authorization URL
    """
    # For device code flow, we return the device authorization endpoint
    # The actual flow is handled by the device flow methods
    return self.config.authorize_url

start_device_flow async

start_device_flow()

Start the GitHub device code authorization flow.

Returns:

Type Description
tuple[str, str, str, int]

Tuple of (device_code, user_code, verification_uri, expires_in)

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def start_device_flow(self) -> tuple[str, str, str, int]:
    """Start the GitHub device code authorization flow.

    Returns:
        Tuple of (device_code, user_code, verification_uri, expires_in)
    """
    device_response = await self.client.start_device_flow()

    logger.info(
        "device_flow_started",
        user_code=device_response.user_code,
        verification_uri=device_response.verification_uri,
        expires_in=device_response.expires_in,
    )

    return (
        device_response.device_code,
        device_response.user_code,
        device_response.verification_uri,
        device_response.expires_in,
    )

complete_device_flow async

complete_device_flow(
    device_code, interval=5, expires_in=900
)

Complete the device flow authorization.

Parameters:

Name Type Description Default
device_code str

Device code from start_device_flow

required
interval int

Polling interval in seconds

5
expires_in int

Code expiration time in seconds

900

Returns:

Type Description
CopilotCredentials

Complete Copilot credentials

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def complete_device_flow(
    self, device_code: str, interval: int = 5, expires_in: int = 900
) -> CopilotCredentials:
    """Complete the device flow authorization.

    Args:
        device_code: Device code from start_device_flow
        interval: Polling interval in seconds
        expires_in: Code expiration time in seconds

    Returns:
        Complete Copilot credentials
    """
    return await self.client.complete_authorization(
        device_code, interval, expires_in
    )

handle_callback async

handle_callback(
    code, state, code_verifier=None, redirect_uri=None
)

Handle OAuth callback (not used in device flow).

This method is required by the CLI flow protocol but not used for device code flow. Use complete_device_flow instead.

Parameters:

Name Type Description Default
code str

Authorization code from OAuth callback

required
state str

State parameter for validation

required
code_verifier str | None

PKCE code verifier (if PKCE is used)

None
redirect_uri str | None

Redirect URI used in authorization (optional)

None
Source code in ccproxy/plugins/copilot/oauth/provider.py
async def handle_callback(
    self,
    code: str,
    state: str,
    code_verifier: str | None = None,
    redirect_uri: str | None = None,
) -> Any:
    """Handle OAuth callback (not used in device flow).

    This method is required by the CLI flow protocol but not used for
    device code flow. Use complete_device_flow instead.

    Args:
        code: Authorization code from OAuth callback
        state: State parameter for validation
        code_verifier: PKCE code verifier (if PKCE is used)
        redirect_uri: Redirect URI used in authorization (optional)
    """
    raise NotImplementedError(
        "Copilot uses device code flow. Browser callback is not supported."
    )

exchange_code async

exchange_code(code, state, code_verifier=None)

Exchange authorization code for token (not used in device flow).

This method is required by the OAuth protocol but not used for device code flow. Use complete_device_flow instead.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def exchange_code(
    self, code: str, state: str, code_verifier: str | None = None
) -> dict[str, Any]:
    """Exchange authorization code for token (not used in device flow).

    This method is required by the OAuth protocol but not used for
    device code flow. Use complete_device_flow instead.
    """
    raise NotImplementedError(
        "Device code flow doesn't use authorization code exchange. "
        "Use complete_device_flow instead."
    )

refresh_token async

refresh_token(refresh_token)

Refresh access token using refresh token.

For Copilot, this refreshes the Copilot service token using the stored OAuth token.

Parameters:

Name Type Description Default
refresh_token str

Not used for Copilot (uses OAuth token instead)

required

Returns:

Type Description
dict[str, Any]

Token information

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def refresh_token(self, refresh_token: str) -> dict[str, Any]:
    """Refresh access token using refresh token.

    For Copilot, this refreshes the Copilot service token using the
    stored OAuth token.

    Args:
        refresh_token: Not used for Copilot (uses OAuth token instead)

    Returns:
        Token information
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        raise ValueError("No credentials found for refresh")

    refreshed_credentials = await self.client.refresh_copilot_token(credentials)

    # Return token info in standard format
    if refreshed_credentials.copilot_token is not None:
        return {
            "access_token": refreshed_credentials.copilot_token.token.get_secret_value(),
            "token_type": "bearer",
            "expires_at": refreshed_credentials.copilot_token.expires_at,
            "provider": self.provider_name,
        }
    else:
        raise ValueError("Failed to refresh Copilot token")

get_user_profile async

get_user_profile(access_token=None)

Get user profile information.

Parameters:

Name Type Description Default
access_token str | None

Optional OAuth access token (not Copilot token)

None

Returns:

Type Description
StandardProfileFields

User profile information

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_user_profile(
    self, access_token: str | None = None
) -> StandardProfileFields:
    """Get user profile information.

    Args:
        access_token: Optional OAuth access token (not Copilot token)

    Returns:
        User profile information
    """
    oauth_token: CopilotOAuthToken | None = None

    if access_token:
        from pydantic import SecretStr

        oauth_token = CopilotOAuthToken(
            access_token=SecretStr(access_token), expires_in=None, created_at=None
        )
    else:
        credentials = await self.storage.load_credentials()
        if not credentials:
            raise ValueError("No credentials found")
        oauth_token = credentials.oauth_token

    profile = await self.client.get_standard_profile(oauth_token)
    self._cached_profile = profile
    return profile

get_standard_profile async

get_standard_profile(credentials=None)

Get standardized profile information from credentials.

Parameters:

Name Type Description Default
credentials Any | None

Copilot credentials object (optional)

None

Returns:

Type Description
StandardProfileFields | None

Standardized profile fields or None if not available

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_standard_profile(
    self, credentials: Any | None = None
) -> StandardProfileFields | None:
    """Get standardized profile information from credentials.

    Args:
        credentials: Copilot credentials object (optional)

    Returns:
        Standardized profile fields or None if not available
    """
    try:
        # If credentials is None, try to load from storage
        if credentials is None:
            try:
                credentials = await self.storage.load_credentials()
                if not credentials:
                    return None
            except Exception:
                return None

        # If credentials has OAuth token, use it directly
        if hasattr(credentials, "oauth_token") and credentials.oauth_token:
            return await self.client.get_standard_profile(credentials.oauth_token)
        else:
            # Fallback to loading from storage
            return await self.get_user_profile()
    except Exception as e:
        logger.debug(
            "get_standard_profile_failed",
            error=str(e),
            exc_info=e,
        )
        # Return fallback profile using _extract_standard_profile if we have credentials
        if credentials is not None:
            return self._extract_standard_profile(credentials)
        return None

get_token_info async

get_token_info()

Get current token information.

Returns:

Type Description
CopilotTokenInfo | None

Token information if available

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_token_info(self) -> CopilotTokenInfo | None:
    """Get current token information.

    Returns:
        Token information if available
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        return None

    oauth_expires_at = credentials.oauth_token.expires_at_datetime
    copilot_expires_at = None

    if credentials.copilot_token and credentials.copilot_token.expires_at:
        # expires_at is now a datetime object, no need to parse
        copilot_expires_at = credentials.copilot_token.expires_at

    # Get profile for additional info
    profile = None
    with contextlib.suppress(Exception):
        profile = await self.get_user_profile()

    copilot_access = False
    if profile is not None:
        features = getattr(profile, "features", {}) or {}
        copilot_access = bool(features.get("copilot_access"))
        if not copilot_access and getattr(profile, "subscription_type", None):
            copilot_access = True

    if not copilot_access and credentials.copilot_token is not None:
        token = credentials.copilot_token
        indicative_flags = [
            getattr(token, "chat_enabled", None),
            getattr(token, "annotations_enabled", None),
            getattr(token, "individual", None),
        ]
        if any(flag is True for flag in indicative_flags if flag is not None):
            copilot_access = True
        else:
            copilot_access = (
                True  # Possession of a copilot token implies active access
            )

    if not copilot_access:
        copilot_access = credentials.copilot_token is not None

    return CopilotTokenInfo(
        provider="copilot",
        oauth_expires_at=oauth_expires_at,
        copilot_expires_at=copilot_expires_at,
        account_type=credentials.account_type,
        copilot_access=copilot_access,
    )

get_token_snapshot async

get_token_snapshot()

Return a token snapshot built from stored credentials.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_token_snapshot(self) -> TokenSnapshot | None:
    """Return a token snapshot built from stored credentials."""

    try:
        manager = await self.create_token_manager(storage=self.storage)
        snapshot = await manager.get_token_snapshot()
        if snapshot:
            return snapshot
    except Exception as exc:  # pragma: no cover - defensive logging
        logger.debug("copilot_snapshot_via_manager_failed", error=str(exc))

    try:
        credentials = await self.storage.load_credentials()
        if not credentials:
            return None

        from ..manager import CopilotTokenManager

        temp_manager = CopilotTokenManager(storage=self.storage)
        return temp_manager._build_token_snapshot(credentials)
    except Exception as exc:  # pragma: no cover - defensive logging
        logger.debug("copilot_snapshot_from_credentials_failed", error=str(exc))
        return None

is_authenticated async

is_authenticated()

Check if user is authenticated with valid tokens.

Returns:

Type Description
bool

True if authenticated with valid tokens

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def is_authenticated(self) -> bool:
    """Check if user is authenticated with valid tokens.

    Returns:
        True if authenticated with valid tokens
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        return False

    # Check if OAuth token is expired
    if credentials.oauth_token.is_expired:
        return False

    # Check if we have a valid (non-expired) Copilot token
    if not credentials.copilot_token:
        return False

    # Check if Copilot token is expired
    return not credentials.copilot_token.is_expired

get_copilot_token async

get_copilot_token()

Get current Copilot service token for API requests.

Returns:

Type Description
str | None

Copilot token if available and valid, None otherwise

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_copilot_token(self) -> str | None:
    """Get current Copilot service token for API requests.

    Returns:
        Copilot token if available and valid, None otherwise
    """
    credentials = await self.storage.load_credentials()
    if not credentials or not credentials.copilot_token:
        return None

    # Check if token is expired
    if credentials.copilot_token.is_expired:
        logger.info(
            "copilot_token_expired_in_get",
            expires_at=credentials.copilot_token.expires_at,
        )
        return None

    return credentials.copilot_token.token.get_secret_value()

ensure_oauth_token async

ensure_oauth_token()

Ensure we have a valid OAuth token.

Returns:

Type Description
str

Valid OAuth token

Raises:

Type Description
ValueError

If unable to get valid token

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def ensure_oauth_token(self) -> str:
    """Ensure we have a valid OAuth token.

    Returns:
        Valid OAuth token

    Raises:
        ValueError: If unable to get valid token
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        raise ValueError("No credentials found - authorization required")

    if credentials.oauth_token.is_expired:
        raise ValueError("OAuth token expired - re-authorization required")

    return credentials.oauth_token.access_token.get_secret_value()

logout async

logout()

Clear stored credentials.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def logout(self) -> None:
    """Clear stored credentials."""
    await self.storage.clear_credentials()

get_storage

get_storage()

Get storage implementation for this provider.

Returns:

Type Description
Any

Storage implementation

Source code in ccproxy/plugins/copilot/oauth/provider.py
def get_storage(self) -> Any:
    """Get storage implementation for this provider.

    Returns:
        Storage implementation
    """
    return self.storage

load_credentials async

load_credentials(custom_path=None)

Load credentials from provider's storage.

Parameters:

Name Type Description Default
custom_path Any | None

Optional custom storage path (Path object)

None

Returns:

Type Description
Any | None

Credentials if found, None otherwise

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def load_credentials(self, custom_path: Any | None = None) -> Any | None:
    """Load credentials from provider's storage.

    Args:
        custom_path: Optional custom storage path (Path object)

    Returns:
        Credentials if found, None otherwise
    """
    try:
        if custom_path:
            # Create storage with custom path
            from pathlib import Path

            from .storage import CopilotOAuthStorage

            storage = CopilotOAuthStorage(credentials_path=Path(custom_path))
            credentials = await storage.load_credentials()
        else:
            # Load from default storage
            credentials = await self.storage.load_credentials()

        # Use standardized profile logging
        self._log_credentials_loaded("copilot", credentials)

        return credentials
    except Exception as e:
        logger.debug(
            "copilot_load_credentials_failed",
            error=str(e),
            exc_info=e,
        )
        return None

save_credentials async

save_credentials(credentials)

Save credentials to storage.

Parameters:

Name Type Description Default
credentials CopilotCredentials | None

Copilot credentials to save (None to clear)

required

Returns:

Type Description
bool

True if save was successful

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def save_credentials(self, credentials: CopilotCredentials | None) -> bool:
    """Save credentials to storage.

    Args:
        credentials: Copilot credentials to save (None to clear)

    Returns:
        True if save was successful
    """
    try:
        if credentials is None:
            await self.storage.clear_credentials()
            logger.info("copilot_credentials_cleared")
            return True
        else:
            await self.storage.save_credentials(credentials)
            logger.info(
                "copilot_credentials_saved",
                account_type=credentials.account_type,
                has_oauth=bool(credentials.oauth_token),
                has_copilot_token=bool(credentials.copilot_token),
            )
            return True
    except Exception as e:
        logger.error(
            "copilot_credentials_save_failed",
            error=str(e),
            exc_info=e,
        )
        return False

create_token_manager async

create_token_manager(storage=None)

Create a token manager instance wired to this provider's context.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def create_token_manager(
    self, storage: Any | None = None
) -> "CopilotTokenManager":
    """Create a token manager instance wired to this provider's context."""

    from ..manager import CopilotTokenManager

    return await CopilotTokenManager.create(
        storage=storage or self.storage,
        config=self.config,
        http_client=self.http_client,
        hook_manager=self.hook_manager,
        detection_service=self.detection_service,
    )

cleanup async

cleanup()

Cleanup resources.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def cleanup(self) -> None:
    """Cleanup resources."""
    try:
        await self.client.close()
    except Exception as e:
        logger.error(
            "provider_cleanup_failed",
            error=str(e),
            exc_info=e,
        )

get_provider_info

get_provider_info()

Get provider information for registry.

Source code in ccproxy/plugins/copilot/oauth/provider.py
def get_provider_info(self) -> OAuthProviderInfo:
    """Get provider information for registry."""
    return OAuthProviderInfo(
        name=self.provider_name,
        display_name=self.provider_display_name,
        description="GitHub Copilot OAuth authentication",
        supports_pkce=self.supports_pkce,
        scopes=["read:user", "copilot"],
        is_available=True,
        plugin_name="copilot",
    )

exchange_manual_code async

exchange_manual_code(code)

Exchange manual authorization code for tokens.

Note: Copilot primarily uses device code flow, but this method is provided for completeness.

Parameters:

Name Type Description Default
code str

Authorization code from manual entry

required

Returns:

Type Description
Any

Copilot credentials object

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def exchange_manual_code(self, code: str) -> Any:
    """Exchange manual authorization code for tokens.

    Note: Copilot primarily uses device code flow, but this method
    is provided for completeness.

    Args:
        code: Authorization code from manual entry

    Returns:
        Copilot credentials object
    """
    # Copilot doesn't typically support manual code entry as it uses device flow
    # This is a placeholder implementation
    raise NotImplementedError(
        "Copilot uses device code flow. Manual code entry is not supported."
    )