Skip to content

ccproxy.plugins.copilot.oauth

ccproxy.plugins.copilot.oauth

OAuth implementation for GitHub Copilot plugin.

CopilotOAuthClient

CopilotOAuthClient(
    config,
    storage,
    http_client=None,
    hook_manager=None,
    detection_service=None,
)

OAuth client for GitHub Copilot using Device Code Flow.

Parameters:

Name Type Description Default
config CopilotOAuthConfig

OAuth configuration

required
storage CopilotOAuthStorage

Token storage

required
http_client AsyncClient | None

Optional HTTP client for request tracing

None
hook_manager Any | None

Optional hook manager for events

None
detection_service CLIDetectionService | None

Optional CLI detection service

None
Source code in ccproxy/plugins/copilot/oauth/client.py
def __init__(
    self,
    config: CopilotOAuthConfig,
    storage: CopilotOAuthStorage,
    http_client: httpx.AsyncClient | None = None,
    hook_manager: Any | None = None,
    detection_service: "CLIDetectionService | None" = None,
):
    """Initialize the OAuth client.

    Args:
        config: OAuth configuration
        storage: Token storage
        http_client: Optional HTTP client for request tracing
        hook_manager: Optional hook manager for events
        detection_service: Optional CLI detection service
    """
    self.config = config
    self.storage = storage
    self.hook_manager = hook_manager
    self.detection_service = detection_service
    self._http_client = http_client
    self._owns_client = http_client is None

close async

close()

Close HTTP client if we own it.

Source code in ccproxy/plugins/copilot/oauth/client.py
async def close(self) -> None:
    """Close HTTP client if we own it."""
    if self._owns_client and self._http_client:
        await self._http_client.aclose()
        self._http_client = None

start_device_flow async

start_device_flow()

Start the GitHub device code authorization flow.

Returns:

Type Description
DeviceCodeResponse

Device code response with verification details

Source code in ccproxy/plugins/copilot/oauth/client.py
async def start_device_flow(self) -> DeviceCodeResponse:
    """Start the GitHub device code authorization flow.

    Returns:
        Device code response with verification details
    """
    client = await self._get_http_client()

    # Request device code from GitHub
    data = {
        "client_id": self.config.client_id,
        "scope": " ".join(self.config.scopes),
    }

    logger.debug(
        "requesting_device_code",
        client_id=self.config.client_id[:8] + "...",
        scopes=self.config.scopes,
    )

    try:
        response = await client.post(
            self.config.authorize_url,
            data=data,
            headers={
                "Accept": "application/json",
            },
        )
        response.raise_for_status()

        device_code_data = response.json()
        device_code_response = DeviceCodeResponse.model_validate(device_code_data)

        logger.debug(
            "device_code_received",
            user_code=device_code_response.user_code,
            verification_uri=device_code_response.verification_uri,
            expires_in=device_code_response.expires_in,
        )

        return device_code_response

    except httpx.HTTPError as e:
        logger.error(
            "device_code_request_failed",
            error=str(e),
            status_code=getattr(e.response, "status_code", None)
            if hasattr(e, "response")
            else None,
            exc_info=e,
        )
        raise

poll_for_token async

poll_for_token(device_code, interval, expires_in)

Poll GitHub for OAuth token after user authorization.

Parameters:

Name Type Description Default
device_code str

Device code from device flow

required
interval int

Polling interval in seconds

required
expires_in int

Code expiration time in seconds

required

Returns:

Type Description
CopilotOAuthToken

OAuth token once authorized

Raises:

Type Description
TimeoutError

If device code expires

ValueError

If user denies authorization

Source code in ccproxy/plugins/copilot/oauth/client.py
async def poll_for_token(
    self, device_code: str, interval: int, expires_in: int
) -> CopilotOAuthToken:
    """Poll GitHub for OAuth token after user authorization.

    Args:
        device_code: Device code from device flow
        interval: Polling interval in seconds
        expires_in: Code expiration time in seconds

    Returns:
        OAuth token once authorized

    Raises:
        TimeoutError: If device code expires
        ValueError: If user denies authorization
    """
    client = await self._get_http_client()

    start_time = time.time()
    current_interval = interval

    logger.debug(
        "polling_for_token",
        interval=interval,
        expires_in=expires_in,
    )

    while True:
        # Check if we've exceeded the expiration time
        if time.time() - start_time > expires_in:
            raise TimeoutError("Device code has expired")

        await asyncio.sleep(current_interval)

        data = {
            "client_id": self.config.client_id,
            "device_code": device_code,
            "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
        }

        try:
            response = await client.post(
                self.config.token_url,
                data=data,
                headers={
                    "Accept": "application/json",
                },
            )

            poll_response = DeviceTokenPollResponse.model_validate(response.json())

            if poll_response.is_success:
                # Success! Create OAuth token
                oauth_token = CopilotOAuthToken(
                    access_token=SecretStr(poll_response.access_token or ""),
                    token_type=poll_response.token_type or "bearer",
                    scope=poll_response.scope or " ".join(self.config.scopes),
                    created_at=int(time.time()),
                    expires_in=None,  # GitHub tokens don't typically expire
                )

                logger.debug(
                    "oauth_token_received",
                    token_type=oauth_token.token_type,
                    scope=oauth_token.scope,
                )

                return oauth_token

            elif poll_response.is_pending:
                # Still waiting for user authorization
                logger.debug("authorization_pending")
                continue

            elif poll_response.is_slow_down:
                # Need to slow down polling
                current_interval += 5
                logger.debug("slowing_down_poll", new_interval=current_interval)
                continue

            elif poll_response.is_expired:
                raise TimeoutError("Device code has expired")

            elif poll_response.is_denied:
                raise ValueError("User denied authorization")

            else:
                # Unknown error
                logger.error(
                    "unknown_oauth_error",
                    error=poll_response.error,
                    error_description=poll_response.error_description,
                )
                raise ValueError(f"OAuth error: {poll_response.error}")

        except httpx.HTTPError as e:
            logger.error(
                "token_poll_request_failed",
                error=str(e),
                status_code=getattr(e.response, "status_code", None)
                if hasattr(e, "response")
                else None,
                exc_info=e,
            )
            # Continue polling on HTTP errors
            await asyncio.sleep(current_interval)
            continue

exchange_for_copilot_token async

exchange_for_copilot_token(oauth_token)

Exchange GitHub OAuth token for Copilot service token.

Parameters:

Name Type Description Default
oauth_token CopilotOAuthToken

GitHub OAuth token

required

Returns:

Type Description
CopilotTokenResponse

Copilot service token response

Source code in ccproxy/plugins/copilot/oauth/client.py
async def exchange_for_copilot_token(
    self, oauth_token: CopilotOAuthToken
) -> CopilotTokenResponse:
    """Exchange GitHub OAuth token for Copilot service token.

    Args:
        oauth_token: GitHub OAuth token

    Returns:
        Copilot service token response
    """
    client = await self._get_http_client()

    logger.debug(
        "exchanging_for_copilot_token",
        copilot_token_url=self.config.copilot_token_url,
    )

    try:
        response = await client.get(
            self.config.copilot_token_url,
            headers={
                "Authorization": f"Bearer {oauth_token.access_token.get_secret_value()}",
                "Accept": "application/json",
            },
        )
        response.raise_for_status()

        copilot_data = response.json()
        copilot_token = CopilotTokenResponse.model_validate(copilot_data)

        logger.debug(
            "copilot_token_received",
            expires_at=copilot_token.expires_at,
            refresh_in=copilot_token.refresh_in,
        )

        return copilot_token

    except httpx.HTTPError as e:
        logger.error(
            "copilot_token_exchange_failed",
            error=str(e),
            status_code=getattr(e.response, "status_code", None)
            if hasattr(e, "response")
            else None,
            exc_info=e,
        )
        raise

get_user_profile async

get_user_profile(oauth_token)

Get user profile information from GitHub API.

Parameters:

Name Type Description Default
oauth_token CopilotOAuthToken

GitHub OAuth token

required

Returns:

Type Description
CopilotProfileInfo

User profile information

Source code in ccproxy/plugins/copilot/oauth/client.py
async def get_user_profile(
    self, oauth_token: CopilotOAuthToken
) -> CopilotProfileInfo:
    """Get user profile information from GitHub API.

    Args:
        oauth_token: GitHub OAuth token

    Returns:
        User profile information
    """
    client = await self._get_http_client()

    try:
        # Get basic user info
        response = await client.get(
            "https://api.github.com/user",
            headers={
                "Authorization": f"Bearer {oauth_token.access_token.get_secret_value()}",
                "Accept": "application/vnd.github.v3+json",
            },
        )
        response.raise_for_status()
        user_data = response.json()

        # Check Copilot access
        copilot_access = False
        copilot_plan = None

        try:
            copilot_response = await client.get(
                "https://api.github.com/user/copilot_business_accounts",
                headers={
                    "Authorization": f"Bearer {oauth_token.access_token.get_secret_value()}",
                    "Accept": "application/vnd.github.v3+json",
                },
            )
            if copilot_response.status_code == 200:
                copilot_data = copilot_response.json()
                copilot_access = (
                    len(copilot_data.get("copilot_business_accounts", [])) > 0
                )
                copilot_plan = "business" if copilot_access else None
            elif copilot_response.status_code == 404:
                # Try individual plan
                individual_response = await client.get(
                    "https://api.github.com/copilot_internal/user",
                    headers={
                        "Authorization": f"Bearer {oauth_token.access_token.get_secret_value()}",
                        "Accept": "application/vnd.github.v3+json",
                    },
                )
                if individual_response.status_code == 200:
                    copilot_access = True
                    copilot_plan = "individual"
        except httpx.HTTPError:
            # Ignore Copilot access check errors
            logger.debug("copilot_access_check_failed")

        profile = CopilotProfileInfo(
            account_id=str(user_data.get("id", user_data["login"])),
            login=user_data["login"],
            name=user_data.get("name"),
            email=user_data.get("email") or "",
            avatar_url=user_data.get("avatar_url"),
            html_url=user_data.get("html_url"),
            copilot_plan=copilot_plan,
            copilot_access=copilot_access,
        )

        logger.debug(
            "profile_retrieved",
            login=profile.login,
            user_name=profile.name,
            copilot_access=copilot_access,
            copilot_plan=copilot_plan,
        )

        return profile

    except httpx.HTTPError as e:
        logger.error(
            "profile_request_failed",
            error=str(e),
            status_code=getattr(e.response, "status_code", None)
            if hasattr(e, "response")
            else None,
            exc_info=e,
        )
        raise

to_standard_profile

to_standard_profile(profile)

Convert Copilot profile info into StandardProfileFields.

Source code in ccproxy/plugins/copilot/oauth/client.py
def to_standard_profile(self, profile: CopilotProfileInfo) -> StandardProfileFields:
    """Convert Copilot profile info into `StandardProfileFields`."""

    display_name = getattr(profile, "computed_display_name", None) or (
        profile.display_name or profile.name or profile.login
    )

    features: dict[str, Any] = {
        "copilot_access": profile.copilot_access,
        "login": profile.login,
    }
    if profile.copilot_plan:
        features["copilot_plan"] = profile.copilot_plan

    raw_profile = {"copilot_profile": profile.model_dump()}

    return StandardProfileFields(
        account_id=profile.account_id,
        provider_type="copilot",
        email=profile.email or None,
        display_name=display_name,
        subscription_type=profile.copilot_plan,
        features=features,
        raw_profile_data=raw_profile,
    )

get_standard_profile async

get_standard_profile(oauth_token)

Fetch profile info and normalize it for generic consumers.

Source code in ccproxy/plugins/copilot/oauth/client.py
async def get_standard_profile(
    self, oauth_token: CopilotOAuthToken
) -> StandardProfileFields:
    """Fetch profile info and normalize it for generic consumers."""

    profile = await self.get_user_profile(oauth_token)
    return self.to_standard_profile(profile)

complete_authorization async

complete_authorization(device_code, interval, expires_in)

Complete the full authorization flow.

Parameters:

Name Type Description Default
device_code str

Device code from device flow

required
interval int

Polling interval

required
expires_in int

Code expiration time

required

Returns:

Type Description
CopilotCredentials

Complete Copilot credentials

Source code in ccproxy/plugins/copilot/oauth/client.py
async def complete_authorization(
    self, device_code: str, interval: int, expires_in: int
) -> CopilotCredentials:
    """Complete the full authorization flow.

    Args:
        device_code: Device code from device flow
        interval: Polling interval
        expires_in: Code expiration time

    Returns:
        Complete Copilot credentials
    """
    # Get OAuth token
    oauth_token = await self.poll_for_token(device_code, interval, expires_in)

    # Exchange for Copilot token
    copilot_token = await self.exchange_for_copilot_token(oauth_token)

    # Get user profile
    profile = await self.get_user_profile(oauth_token)

    # Determine account type from profile
    account_type = "individual"
    if profile.copilot_plan == "business":
        account_type = "business"
    elif profile.copilot_plan and "enterprise" in profile.copilot_plan:
        account_type = "enterprise"

    # Create credentials
    credentials = CopilotCredentials(
        oauth_token=oauth_token,
        copilot_token=copilot_token,
        account_type=account_type,
    )

    # Store credentials
    await self.storage.store_credentials(credentials)

    logger.debug(
        "authorization_completed",
        login=profile.login,
        account_type=account_type,
        copilot_access=profile.copilot_access,
    )

    return credentials

refresh_copilot_token async

refresh_copilot_token(credentials)

Refresh the Copilot service token using stored OAuth token.

Parameters:

Name Type Description Default
credentials CopilotCredentials

Current credentials

required

Returns:

Type Description
CopilotCredentials

Updated credentials with new Copilot token

Source code in ccproxy/plugins/copilot/oauth/client.py
async def refresh_copilot_token(
    self, credentials: CopilotCredentials
) -> CopilotCredentials:
    """Refresh the Copilot service token using stored OAuth token.

    Args:
        credentials: Current credentials

    Returns:
        Updated credentials with new Copilot token
    """
    if credentials.oauth_token.is_expired:
        logger.warning("oauth_token_expired_cannot_refresh")
        raise ValueError("OAuth token is expired, re-authorization required")

    # Exchange OAuth token for new Copilot token
    new_copilot_token = await self.exchange_for_copilot_token(
        credentials.oauth_token
    )

    # Update credentials
    credentials.copilot_token = new_copilot_token
    credentials.refresh_updated_at()

    # Store updated credentials
    await self.storage.store_credentials(credentials)

    logger.debug(
        "copilot_token_refreshed",
        account_type=credentials.account_type,
    )

    return credentials

CopilotCredentials

Bases: BaseModel

Copilot credentials containing OAuth and Copilot tokens.

is_expired

is_expired()

Check if credentials are expired (BaseCredentials protocol).

Source code in ccproxy/plugins/copilot/oauth/models.py
def is_expired(self) -> bool:
    """Check if credentials are expired (BaseCredentials protocol)."""
    return self.oauth_token.is_expired

to_dict

to_dict()

Convert to dictionary for storage (BaseCredentials protocol).

Source code in ccproxy/plugins/copilot/oauth/models.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary for storage (BaseCredentials protocol)."""
    return self.model_dump(mode="json")

from_dict classmethod

from_dict(data)

Create from dictionary (BaseCredentials protocol).

Source code in ccproxy/plugins/copilot/oauth/models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "CopilotCredentials":
    """Create from dictionary (BaseCredentials protocol)."""
    return cls.model_validate(data)

refresh_updated_at

refresh_updated_at()

Update the updated_at timestamp.

Source code in ccproxy/plugins/copilot/oauth/models.py
def refresh_updated_at(self) -> None:
    """Update the updated_at timestamp."""
    self.updated_at = int(datetime.now(UTC).timestamp())

CopilotOAuthToken

Bases: BaseModel

OAuth token information for GitHub Copilot.

is_expired property

is_expired

Check if the token is expired.

expires_at_datetime property

expires_at_datetime

Get expiration as datetime object.

serialize_secret

serialize_secret(value)

Serialize SecretStr to plain string for JSON output.

Source code in ccproxy/plugins/copilot/oauth/models.py
@field_serializer("access_token", "refresh_token")
def serialize_secret(self, value: SecretStr | None) -> str | None:
    """Serialize SecretStr to plain string for JSON output."""
    return value.get_secret_value() if value else None

validate_tokens classmethod

validate_tokens(v)

Convert string values to SecretStr.

Source code in ccproxy/plugins/copilot/oauth/models.py
@field_validator("access_token", "refresh_token", mode="before")
@classmethod
def validate_tokens(cls, v: str | SecretStr | None) -> SecretStr | None:
    """Convert string values to SecretStr."""
    if v is None:
        return None
    if isinstance(v, str):
        return SecretStr(v)
    return v

CopilotProfileInfo

Bases: BaseProfileInfo

GitHub profile information for Copilot users.

computed_display_name

computed_display_name()

Display name for UI.

Source code in ccproxy/plugins/copilot/oauth/models.py
@computed_field
def computed_display_name(self) -> str:
    """Display name for UI."""
    if self.display_name:
        return self.display_name
    return self.name or self.login

CopilotOAuthProvider

CopilotOAuthProvider(
    config=None,
    storage=None,
    http_client=None,
    hook_manager=None,
    detection_service=None,
)

Bases: ProfileLoggingMixin

GitHub Copilot OAuth provider implementation.

Parameters:

Name Type Description Default
config CopilotOAuthConfig | None

OAuth configuration

None
storage CopilotOAuthStorage | None

Token storage

None
http_client AsyncClient | None

Optional HTTP client for request tracing

None
hook_manager Any | None

Optional hook manager for events

None
detection_service CLIDetectionService | None

Optional CLI detection service

None
Source code in ccproxy/plugins/copilot/oauth/provider.py
def __init__(
    self,
    config: CopilotOAuthConfig | None = None,
    storage: CopilotOAuthStorage | None = None,
    http_client: httpx.AsyncClient | None = None,
    hook_manager: Any | None = None,
    detection_service: "CLIDetectionService | None" = None,
):
    """Initialize Copilot OAuth provider.

    Args:
        config: OAuth configuration
        storage: Token storage
        http_client: Optional HTTP client for request tracing
        hook_manager: Optional hook manager for events
        detection_service: Optional CLI detection service
    """
    self.config = config or CopilotOAuthConfig()
    self.storage = storage or CopilotOAuthStorage()
    self.hook_manager = hook_manager
    self.detection_service = detection_service
    self.http_client = http_client
    self._cached_profile: StandardProfileFields | None = None

    self.client = CopilotOAuthClient(
        self.config,
        self.storage,
        http_client,
        hook_manager=hook_manager,
        detection_service=detection_service,
    )

provider_name property

provider_name

Internal provider name.

provider_display_name property

provider_display_name

Display name for UI.

supports_pkce property

supports_pkce

Whether this provider supports PKCE.

supports_refresh property

supports_refresh

Whether this provider supports token refresh.

requires_client_secret property

requires_client_secret

Whether this provider requires a client secret.

cli property

cli

Get CLI authentication configuration for this provider.

get_authorization_url async

get_authorization_url(
    state, code_verifier=None, redirect_uri=None
)

Get the authorization URL for GitHub Device Code Flow.

For device code flow, this returns the device authorization endpoint. The actual user verification happens at the verification_uri returned by start_device_flow().

Parameters:

Name Type Description Default
state str

OAuth state parameter (not used in device flow)

required
code_verifier str | None

PKCE code verifier (not used in device flow)

None

Returns:

Type Description
str

Device authorization URL

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_authorization_url(
    self,
    state: str,
    code_verifier: str | None = None,
    redirect_uri: str | None = None,
) -> str:
    """Get the authorization URL for GitHub Device Code Flow.

    For device code flow, this returns the device authorization endpoint.
    The actual user verification happens at the verification_uri returned
    by start_device_flow().

    Args:
        state: OAuth state parameter (not used in device flow)
        code_verifier: PKCE code verifier (not used in device flow)

    Returns:
        Device authorization URL
    """
    # For device code flow, we return the device authorization endpoint
    # The actual flow is handled by the device flow methods
    return self.config.authorize_url

start_device_flow async

start_device_flow()

Start the GitHub device code authorization flow.

Returns:

Type Description
tuple[str, str, str, int]

Tuple of (device_code, user_code, verification_uri, expires_in)

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def start_device_flow(self) -> tuple[str, str, str, int]:
    """Start the GitHub device code authorization flow.

    Returns:
        Tuple of (device_code, user_code, verification_uri, expires_in)
    """
    device_response = await self.client.start_device_flow()

    logger.info(
        "device_flow_started",
        user_code=device_response.user_code,
        verification_uri=device_response.verification_uri,
        expires_in=device_response.expires_in,
    )

    return (
        device_response.device_code,
        device_response.user_code,
        device_response.verification_uri,
        device_response.expires_in,
    )

complete_device_flow async

complete_device_flow(
    device_code, interval=5, expires_in=900
)

Complete the device flow authorization.

Parameters:

Name Type Description Default
device_code str

Device code from start_device_flow

required
interval int

Polling interval in seconds

5
expires_in int

Code expiration time in seconds

900

Returns:

Type Description
CopilotCredentials

Complete Copilot credentials

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def complete_device_flow(
    self, device_code: str, interval: int = 5, expires_in: int = 900
) -> CopilotCredentials:
    """Complete the device flow authorization.

    Args:
        device_code: Device code from start_device_flow
        interval: Polling interval in seconds
        expires_in: Code expiration time in seconds

    Returns:
        Complete Copilot credentials
    """
    return await self.client.complete_authorization(
        device_code, interval, expires_in
    )

handle_callback async

handle_callback(
    code, state, code_verifier=None, redirect_uri=None
)

Handle OAuth callback (not used in device flow).

This method is required by the CLI flow protocol but not used for device code flow. Use complete_device_flow instead.

Parameters:

Name Type Description Default
code str

Authorization code from OAuth callback

required
state str

State parameter for validation

required
code_verifier str | None

PKCE code verifier (if PKCE is used)

None
redirect_uri str | None

Redirect URI used in authorization (optional)

None
Source code in ccproxy/plugins/copilot/oauth/provider.py
async def handle_callback(
    self,
    code: str,
    state: str,
    code_verifier: str | None = None,
    redirect_uri: str | None = None,
) -> Any:
    """Handle OAuth callback (not used in device flow).

    This method is required by the CLI flow protocol but not used for
    device code flow. Use complete_device_flow instead.

    Args:
        code: Authorization code from OAuth callback
        state: State parameter for validation
        code_verifier: PKCE code verifier (if PKCE is used)
        redirect_uri: Redirect URI used in authorization (optional)
    """
    raise NotImplementedError(
        "Copilot uses device code flow. Browser callback is not supported."
    )

exchange_code async

exchange_code(code, state, code_verifier=None)

Exchange authorization code for token (not used in device flow).

This method is required by the OAuth protocol but not used for device code flow. Use complete_device_flow instead.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def exchange_code(
    self, code: str, state: str, code_verifier: str | None = None
) -> dict[str, Any]:
    """Exchange authorization code for token (not used in device flow).

    This method is required by the OAuth protocol but not used for
    device code flow. Use complete_device_flow instead.
    """
    raise NotImplementedError(
        "Device code flow doesn't use authorization code exchange. "
        "Use complete_device_flow instead."
    )

refresh_token async

refresh_token(refresh_token)

Refresh access token using refresh token.

For Copilot, this refreshes the Copilot service token using the stored OAuth token.

Parameters:

Name Type Description Default
refresh_token str

Not used for Copilot (uses OAuth token instead)

required

Returns:

Type Description
dict[str, Any]

Token information

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def refresh_token(self, refresh_token: str) -> dict[str, Any]:
    """Refresh access token using refresh token.

    For Copilot, this refreshes the Copilot service token using the
    stored OAuth token.

    Args:
        refresh_token: Not used for Copilot (uses OAuth token instead)

    Returns:
        Token information
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        raise ValueError("No credentials found for refresh")

    refreshed_credentials = await self.client.refresh_copilot_token(credentials)

    # Return token info in standard format
    if refreshed_credentials.copilot_token is not None:
        return {
            "access_token": refreshed_credentials.copilot_token.token.get_secret_value(),
            "token_type": "bearer",
            "expires_at": refreshed_credentials.copilot_token.expires_at,
            "provider": self.provider_name,
        }
    else:
        raise ValueError("Failed to refresh Copilot token")

get_user_profile async

get_user_profile(access_token=None)

Get user profile information.

Parameters:

Name Type Description Default
access_token str | None

Optional OAuth access token (not Copilot token)

None

Returns:

Type Description
StandardProfileFields

User profile information

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_user_profile(
    self, access_token: str | None = None
) -> StandardProfileFields:
    """Get user profile information.

    Args:
        access_token: Optional OAuth access token (not Copilot token)

    Returns:
        User profile information
    """
    oauth_token: CopilotOAuthToken | None = None

    if access_token:
        from pydantic import SecretStr

        oauth_token = CopilotOAuthToken(
            access_token=SecretStr(access_token), expires_in=None, created_at=None
        )
    else:
        credentials = await self.storage.load_credentials()
        if not credentials:
            raise ValueError("No credentials found")
        oauth_token = credentials.oauth_token

    profile = await self.client.get_standard_profile(oauth_token)
    self._cached_profile = profile
    return profile

get_standard_profile async

get_standard_profile(credentials=None)

Get standardized profile information from credentials.

Parameters:

Name Type Description Default
credentials Any | None

Copilot credentials object (optional)

None

Returns:

Type Description
StandardProfileFields | None

Standardized profile fields or None if not available

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_standard_profile(
    self, credentials: Any | None = None
) -> StandardProfileFields | None:
    """Get standardized profile information from credentials.

    Args:
        credentials: Copilot credentials object (optional)

    Returns:
        Standardized profile fields or None if not available
    """
    try:
        # If credentials is None, try to load from storage
        if credentials is None:
            try:
                credentials = await self.storage.load_credentials()
                if not credentials:
                    return None
            except Exception:
                return None

        # If credentials has OAuth token, use it directly
        if hasattr(credentials, "oauth_token") and credentials.oauth_token:
            return await self.client.get_standard_profile(credentials.oauth_token)
        else:
            # Fallback to loading from storage
            return await self.get_user_profile()
    except Exception as e:
        logger.debug(
            "get_standard_profile_failed",
            error=str(e),
            exc_info=e,
        )
        # Return fallback profile using _extract_standard_profile if we have credentials
        if credentials is not None:
            return self._extract_standard_profile(credentials)
        return None

get_token_info async

get_token_info()

Get current token information.

Returns:

Type Description
CopilotTokenInfo | None

Token information if available

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_token_info(self) -> CopilotTokenInfo | None:
    """Get current token information.

    Returns:
        Token information if available
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        return None

    oauth_expires_at = credentials.oauth_token.expires_at_datetime
    copilot_expires_at = None

    if credentials.copilot_token and credentials.copilot_token.expires_at:
        # expires_at is now a datetime object, no need to parse
        copilot_expires_at = credentials.copilot_token.expires_at

    # Get profile for additional info
    profile = None
    with contextlib.suppress(Exception):
        profile = await self.get_user_profile()

    copilot_access = False
    if profile is not None:
        features = getattr(profile, "features", {}) or {}
        copilot_access = bool(features.get("copilot_access"))
        if not copilot_access and getattr(profile, "subscription_type", None):
            copilot_access = True

    if not copilot_access and credentials.copilot_token is not None:
        token = credentials.copilot_token
        indicative_flags = [
            getattr(token, "chat_enabled", None),
            getattr(token, "annotations_enabled", None),
            getattr(token, "individual", None),
        ]
        if any(flag is True for flag in indicative_flags if flag is not None):
            copilot_access = True
        else:
            copilot_access = (
                True  # Possession of a copilot token implies active access
            )

    if not copilot_access:
        copilot_access = credentials.copilot_token is not None

    return CopilotTokenInfo(
        provider="copilot",
        oauth_expires_at=oauth_expires_at,
        copilot_expires_at=copilot_expires_at,
        account_type=credentials.account_type,
        copilot_access=copilot_access,
    )

get_token_snapshot async

get_token_snapshot()

Return a token snapshot built from stored credentials.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_token_snapshot(self) -> TokenSnapshot | None:
    """Return a token snapshot built from stored credentials."""

    try:
        manager = await self.create_token_manager(storage=self.storage)
        snapshot = await manager.get_token_snapshot()
        if snapshot:
            return snapshot
    except Exception as exc:  # pragma: no cover - defensive logging
        logger.debug("copilot_snapshot_via_manager_failed", error=str(exc))

    try:
        credentials = await self.storage.load_credentials()
        if not credentials:
            return None

        from ..manager import CopilotTokenManager

        temp_manager = CopilotTokenManager(storage=self.storage)
        return temp_manager._build_token_snapshot(credentials)
    except Exception as exc:  # pragma: no cover - defensive logging
        logger.debug("copilot_snapshot_from_credentials_failed", error=str(exc))
        return None

is_authenticated async

is_authenticated()

Check if user is authenticated with valid tokens.

Returns:

Type Description
bool

True if authenticated with valid tokens

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def is_authenticated(self) -> bool:
    """Check if user is authenticated with valid tokens.

    Returns:
        True if authenticated with valid tokens
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        return False

    # Check if OAuth token is expired
    if credentials.oauth_token.is_expired:
        return False

    # Check if we have a valid (non-expired) Copilot token
    if not credentials.copilot_token:
        return False

    # Check if Copilot token is expired
    return not credentials.copilot_token.is_expired

get_copilot_token async

get_copilot_token()

Get current Copilot service token for API requests.

Returns:

Type Description
str | None

Copilot token if available and valid, None otherwise

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def get_copilot_token(self) -> str | None:
    """Get current Copilot service token for API requests.

    Returns:
        Copilot token if available and valid, None otherwise
    """
    credentials = await self.storage.load_credentials()
    if not credentials or not credentials.copilot_token:
        return None

    # Check if token is expired
    if credentials.copilot_token.is_expired:
        logger.info(
            "copilot_token_expired_in_get",
            expires_at=credentials.copilot_token.expires_at,
        )
        return None

    return credentials.copilot_token.token.get_secret_value()

ensure_oauth_token async

ensure_oauth_token()

Ensure we have a valid OAuth token.

Returns:

Type Description
str

Valid OAuth token

Raises:

Type Description
ValueError

If unable to get valid token

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def ensure_oauth_token(self) -> str:
    """Ensure we have a valid OAuth token.

    Returns:
        Valid OAuth token

    Raises:
        ValueError: If unable to get valid token
    """
    credentials = await self.storage.load_credentials()
    if not credentials:
        raise ValueError("No credentials found - authorization required")

    if credentials.oauth_token.is_expired:
        raise ValueError("OAuth token expired - re-authorization required")

    return credentials.oauth_token.access_token.get_secret_value()

logout async

logout()

Clear stored credentials.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def logout(self) -> None:
    """Clear stored credentials."""
    await self.storage.clear_credentials()

get_storage

get_storage()

Get storage implementation for this provider.

Returns:

Type Description
Any

Storage implementation

Source code in ccproxy/plugins/copilot/oauth/provider.py
def get_storage(self) -> Any:
    """Get storage implementation for this provider.

    Returns:
        Storage implementation
    """
    return self.storage

load_credentials async

load_credentials(custom_path=None)

Load credentials from provider's storage.

Parameters:

Name Type Description Default
custom_path Any | None

Optional custom storage path (Path object)

None

Returns:

Type Description
Any | None

Credentials if found, None otherwise

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def load_credentials(self, custom_path: Any | None = None) -> Any | None:
    """Load credentials from provider's storage.

    Args:
        custom_path: Optional custom storage path (Path object)

    Returns:
        Credentials if found, None otherwise
    """
    try:
        if custom_path:
            # Create storage with custom path
            from pathlib import Path

            from .storage import CopilotOAuthStorage

            storage = CopilotOAuthStorage(credentials_path=Path(custom_path))
            credentials = await storage.load_credentials()
        else:
            # Load from default storage
            credentials = await self.storage.load_credentials()

        # Use standardized profile logging
        self._log_credentials_loaded("copilot", credentials)

        return credentials
    except Exception as e:
        logger.debug(
            "copilot_load_credentials_failed",
            error=str(e),
            exc_info=e,
        )
        return None

save_credentials async

save_credentials(credentials)

Save credentials to storage.

Parameters:

Name Type Description Default
credentials CopilotCredentials | None

Copilot credentials to save (None to clear)

required

Returns:

Type Description
bool

True if save was successful

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def save_credentials(self, credentials: CopilotCredentials | None) -> bool:
    """Save credentials to storage.

    Args:
        credentials: Copilot credentials to save (None to clear)

    Returns:
        True if save was successful
    """
    try:
        if credentials is None:
            await self.storage.clear_credentials()
            logger.info("copilot_credentials_cleared")
            return True
        else:
            await self.storage.save_credentials(credentials)
            logger.info(
                "copilot_credentials_saved",
                account_type=credentials.account_type,
                has_oauth=bool(credentials.oauth_token),
                has_copilot_token=bool(credentials.copilot_token),
            )
            return True
    except Exception as e:
        logger.error(
            "copilot_credentials_save_failed",
            error=str(e),
            exc_info=e,
        )
        return False

create_token_manager async

create_token_manager(storage=None)

Create a token manager instance wired to this provider's context.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def create_token_manager(
    self, storage: Any | None = None
) -> "CopilotTokenManager":
    """Create a token manager instance wired to this provider's context."""

    from ..manager import CopilotTokenManager

    return await CopilotTokenManager.create(
        storage=storage or self.storage,
        config=self.config,
        http_client=self.http_client,
        hook_manager=self.hook_manager,
        detection_service=self.detection_service,
    )

cleanup async

cleanup()

Cleanup resources.

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def cleanup(self) -> None:
    """Cleanup resources."""
    try:
        await self.client.close()
    except Exception as e:
        logger.error(
            "provider_cleanup_failed",
            error=str(e),
            exc_info=e,
        )

get_provider_info

get_provider_info()

Get provider information for registry.

Source code in ccproxy/plugins/copilot/oauth/provider.py
def get_provider_info(self) -> OAuthProviderInfo:
    """Get provider information for registry."""
    return OAuthProviderInfo(
        name=self.provider_name,
        display_name=self.provider_display_name,
        description="GitHub Copilot OAuth authentication",
        supports_pkce=self.supports_pkce,
        scopes=["read:user", "copilot"],
        is_available=True,
        plugin_name="copilot",
    )

exchange_manual_code async

exchange_manual_code(code)

Exchange manual authorization code for tokens.

Note: Copilot primarily uses device code flow, but this method is provided for completeness.

Parameters:

Name Type Description Default
code str

Authorization code from manual entry

required

Returns:

Type Description
Any

Copilot credentials object

Source code in ccproxy/plugins/copilot/oauth/provider.py
async def exchange_manual_code(self, code: str) -> Any:
    """Exchange manual authorization code for tokens.

    Note: Copilot primarily uses device code flow, but this method
    is provided for completeness.

    Args:
        code: Authorization code from manual entry

    Returns:
        Copilot credentials object
    """
    # Copilot doesn't typically support manual code entry as it uses device flow
    # This is a placeholder implementation
    raise NotImplementedError(
        "Copilot uses device code flow. Manual code entry is not supported."
    )

CopilotOAuthStorage

CopilotOAuthStorage(credentials_path=None)

Bases: BaseJsonStorage[CopilotCredentials]

Storage implementation for Copilot OAuth credentials.

Parameters:

Name Type Description Default
credentials_path Path | None

Path to credentials file (uses default if None)

None
Source code in ccproxy/plugins/copilot/oauth/storage.py
def __init__(self, credentials_path: Path | None = None) -> None:
    """Initialize storage with credentials path.

    Args:
        credentials_path: Path to credentials file (uses default if None)
    """
    if credentials_path is None:
        # Use standard GitHub Copilot storage location
        credentials_path = Path.home() / ".config" / "copilot" / "credentials.json"

    super().__init__(credentials_path)

save async

save(credentials)

Store Copilot credentials to file.

Parameters:

Name Type Description Default
credentials CopilotCredentials

Credentials to store

required
Source code in ccproxy/plugins/copilot/oauth/storage.py
async def save(self, credentials: CopilotCredentials) -> bool:
    """Store Copilot credentials to file.

    Args:
        credentials: Credentials to store
    """
    try:
        # Update timestamp
        credentials.refresh_updated_at()

        # Convert to dict for storage
        data = credentials.model_dump(mode="json", exclude_none=True)

        # Use parent class's atomic write with backup
        await self._write_json(data)

        logger.debug(
            "credentials_stored",
            path=str(self.file_path),
            account_type=credentials.account_type,
        )
        return True
    except Exception as e:
        logger.error("credentials_save_failed", error=str(e), exc_info=e)
        return False

load async

load()

Load Copilot credentials from file.

Returns:

Type Description
CopilotCredentials | None

Credentials if found and valid, None otherwise

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def load(self) -> CopilotCredentials | None:
    """Load Copilot credentials from file.

    Returns:
        Credentials if found and valid, None otherwise
    """
    try:
        # Use parent class's read method
        data = await self._read_json()
        if not data:
            logger.debug(
                "credentials_not_found",
                path=str(self.file_path),
            )
            return None

        credentials = CopilotCredentials.model_validate(data)
        logger.debug(
            "credentials_loaded",
            path=str(self.file_path),
            account_type=credentials.account_type,
            is_expired=credentials.is_expired(),
        )
        return credentials
    except Exception as e:
        logger.error(
            "credentials_load_failed",
            error=str(e),
            exc_info=e,
        )
        return None

delete async

delete()

Clear stored credentials.

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def delete(self) -> bool:
    """Clear stored credentials."""
    result = await super().delete()

    logger.debug(
        "credentials_cleared",
        path=str(self.file_path),
    )
    return result

update_oauth_token async

update_oauth_token(oauth_token)

Update OAuth token in stored credentials.

Parameters:

Name Type Description Default
oauth_token CopilotOAuthToken

New OAuth token to store

required
Source code in ccproxy/plugins/copilot/oauth/storage.py
async def update_oauth_token(self, oauth_token: CopilotOAuthToken) -> None:
    """Update OAuth token in stored credentials.

    Args:
        oauth_token: New OAuth token to store
    """
    credentials = await self.load()
    if not credentials:
        # Create new credentials with just the OAuth token
        credentials = CopilotCredentials(
            oauth_token=oauth_token, copilot_token=None
        )
    else:
        # Update existing credentials
        credentials.oauth_token = oauth_token

    await self.save(credentials)

update_copilot_token async

update_copilot_token(copilot_token)

Update Copilot service token in stored credentials.

Parameters:

Name Type Description Default
copilot_token CopilotTokenResponse

New Copilot token to store

required
Source code in ccproxy/plugins/copilot/oauth/storage.py
async def update_copilot_token(self, copilot_token: CopilotTokenResponse) -> None:
    """Update Copilot service token in stored credentials.

    Args:
        copilot_token: New Copilot token to store
    """
    credentials = await self.load()
    if not credentials:
        logger.warning(
            "no_oauth_credentials_for_copilot_token",
            message="Cannot store Copilot token without OAuth credentials",
        )
        raise ValueError(
            "OAuth credentials must exist before storing Copilot token"
        )

    credentials.copilot_token = copilot_token
    await self.save(credentials)

get_oauth_token async

get_oauth_token()

Get OAuth token from stored credentials.

Returns:

Type Description
CopilotOAuthToken | None

OAuth token if available, None otherwise

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def get_oauth_token(self) -> CopilotOAuthToken | None:
    """Get OAuth token from stored credentials.

    Returns:
        OAuth token if available, None otherwise
    """
    credentials = await self.load()
    return credentials.oauth_token if credentials else None

get_copilot_token async

get_copilot_token()

Get Copilot service token from stored credentials.

Returns:

Type Description
CopilotTokenResponse | None

Copilot token if available, None otherwise

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def get_copilot_token(self) -> CopilotTokenResponse | None:
    """Get Copilot service token from stored credentials.

    Returns:
        Copilot token if available, None otherwise
    """
    credentials = await self.load()
    return credentials.copilot_token if credentials else None

load_credentials async

load_credentials()

Legacy method name for backward compatibility.

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def load_credentials(self) -> CopilotCredentials | None:
    """Legacy method name for backward compatibility."""
    return await self.load()

store_credentials async

store_credentials(credentials)

Legacy method name for backward compatibility.

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def store_credentials(self, credentials: CopilotCredentials) -> None:
    """Legacy method name for backward compatibility."""
    await self.save(credentials)

save_credentials async

save_credentials(credentials)

Save credentials method for OAuth provider compatibility.

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def save_credentials(self, credentials: CopilotCredentials) -> None:
    """Save credentials method for OAuth provider compatibility."""
    await self.save(credentials)

clear_credentials async

clear_credentials()

Legacy method name for backward compatibility.

Source code in ccproxy/plugins/copilot/oauth/storage.py
async def clear_credentials(self) -> None:
    """Legacy method name for backward compatibility."""
    await self.delete()