Skip to content

ccproxy.plugins.oauth_codex.manager

ccproxy.plugins.oauth_codex.manager

OpenAI/Codex token manager implementation for the Codex plugin.

CodexTokenManager

CodexTokenManager(storage=None)

Bases: BaseTokenManager[OpenAICredentials]

Manager for Codex/OpenAI token storage and operations.

Uses the generic storage and wrapper pattern for consistency.

Parameters:

Name Type Description Default
storage TokenStorage[OpenAICredentials] | None

Optional custom storage, defaults to standard location

None
Source code in ccproxy/plugins/oauth_codex/manager.py
def __init__(
    self,
    storage: TokenStorage[OpenAICredentials] | None = None,
):
    """Initialize Codex token manager.

    Args:
        storage: Optional custom storage, defaults to standard location
    """
    if storage is None:
        # Use the Codex-specific storage for ~/.codex/auth.json
        from .storage import CodexTokenStorage

        storage = CodexTokenStorage()
    super().__init__(storage)
    self._profile_cache: OpenAIProfileInfo | None = None

create async classmethod

create(storage=None)

Async factory for parity with other managers.

Codex/OpenAI does not need to preload remote data, but this keeps a consistent async creation API across managers.

Source code in ccproxy/plugins/oauth_codex/manager.py
@classmethod
async def create(
    cls, storage: TokenStorage[OpenAICredentials] | None = None
) -> "CodexTokenManager":
    """Async factory for parity with other managers.

    Codex/OpenAI does not need to preload remote data, but this keeps a
    consistent async creation API across managers.
    """
    return cls(storage=storage)

refresh_token async

refresh_token()

Refresh the access token using the refresh token.

Returns:

Type Description
OpenAICredentials | None

Updated credentials or None if refresh failed

Source code in ccproxy/plugins/oauth_codex/manager.py
async def refresh_token(self) -> OpenAICredentials | None:
    """Refresh the access token using the refresh token.

    Returns:
        Updated credentials or None if refresh failed
    """
    # Load current credentials
    credentials = await self.load_credentials()
    if not credentials:
        logger.error("no_credentials_to_refresh", category="auth")
        return None

    if not credentials.refresh_token:
        logger.error("no_refresh_token_available", category="auth")
        return None

    try:
        # Refresh directly using a local OAuth client/provider (no global registry)
        from .provider import CodexOAuthProvider

        provider = CodexOAuthProvider()
        new_credentials: OpenAICredentials = await provider.refresh_access_token(
            credentials.refresh_token
        )

        # Preserve account_id if not in new credentials
        if not new_credentials.account_id and credentials.account_id:
            # Preserve via nested tokens structure
            new_credentials.tokens.account_id = credentials.account_id

        # Save updated credentials
        if await self.save_credentials(new_credentials):
            logger.info(
                "Token refreshed successfully",
                account_id=new_credentials.account_id,
                category="auth",
            )
            # Clear profile cache as token changed
            self._profile_cache = None
            return new_credentials

        logger.error("failed_to_save_refreshed_credentials", category="auth")
        return None

    except Exception as e:
        logger.error(
            "Token refresh failed",
            error=str(e),
            exc_info=False,
            category="auth",
        )
        return None

is_expired

is_expired(credentials)

Check if credentials are expired using wrapper.

Source code in ccproxy/plugins/oauth_codex/manager.py
def is_expired(self, credentials: OpenAICredentials) -> bool:
    """Check if credentials are expired using wrapper."""
    if isinstance(credentials, OpenAICredentials):
        wrapper = OpenAITokenWrapper(credentials=credentials)
        return bool(wrapper.is_expired)

    expires_at = getattr(credentials, "expires_at", None)
    if not expires_at:
        return False

    if isinstance(expires_at, datetime):
        return expires_at <= datetime.now(UTC)

    return False

get_account_id

get_account_id(credentials)

Get account ID from credentials.

Source code in ccproxy/plugins/oauth_codex/manager.py
def get_account_id(self, credentials: OpenAICredentials) -> str | None:
    """Get account ID from credentials."""
    return credentials.account_id

get_expiration_time

get_expiration_time(credentials)

Get expiration time as datetime.

Source code in ccproxy/plugins/oauth_codex/manager.py
def get_expiration_time(self, credentials: OpenAICredentials) -> datetime | None:
    """Get expiration time as datetime."""
    return credentials.expires_at

get_profile_quick async

get_profile_quick()

Lightweight profile from cached data or JWT claims.

Avoids any remote calls. Uses cache if populated, otherwise derives directly from stored credentials' JWT claims.

Source code in ccproxy/plugins/oauth_codex/manager.py
async def get_profile_quick(self) -> OpenAIProfileInfo | None:
    """Lightweight profile from cached data or JWT claims.

    Avoids any remote calls. Uses cache if populated, otherwise derives
    directly from stored credentials' JWT claims.
    """
    if self._profile_cache:
        return self._profile_cache

    credentials = await self.load_credentials()
    if not credentials or self.is_expired(credentials):
        return None

    self._profile_cache = OpenAIProfileInfo.from_token(credentials)
    return self._profile_cache

get_profile async

get_profile()

Get user profile from JWT token.

OpenAI doesn't provide a profile API, so we extract all information from the JWT token claims.

Returns:

Type Description
OpenAIProfileInfo | None

OpenAIProfileInfo or None if not authenticated

Source code in ccproxy/plugins/oauth_codex/manager.py
async def get_profile(self) -> OpenAIProfileInfo | None:
    """Get user profile from JWT token.

    OpenAI doesn't provide a profile API, so we extract
    all information from the JWT token claims.

    Returns:
        OpenAIProfileInfo or None if not authenticated
    """
    if self._profile_cache:
        return self._profile_cache

    credentials = await self.load_credentials()
    if not credentials or self.is_expired(credentials):
        return None

    # Extract profile from JWT token claims
    self._profile_cache = OpenAIProfileInfo.from_token(credentials)
    return self._profile_cache

get_access_token_with_refresh async

get_access_token_with_refresh()

Get valid access token, automatically refreshing if expired.

Returns:

Type Description
str | None

Access token if available and valid, None otherwise

Source code in ccproxy/plugins/oauth_codex/manager.py
async def get_access_token_with_refresh(self) -> str | None:
    """Get valid access token, automatically refreshing if expired.

    Returns:
        Access token if available and valid, None otherwise
    """
    credentials = await self.load_credentials()
    if not credentials:
        logger.debug("no_credentials_found", category="auth")
        return None

    needs_refresh = self.should_refresh(credentials)

    if needs_refresh:
        logger.info(
            "openai_token_refresh_needed",
            reason="expired" if self.is_expired(credentials) else "expiring_soon",
            expires_in=self.seconds_until_expiration(credentials),
            category="auth",
        )

        if credentials.refresh_token:
            try:
                refreshed = await self.refresh_token()
            except Exception as exc:  # pragma: no cover - defensive
                logger.warning(
                    "openai_token_refresh_exception",
                    error=str(exc),
                    category="auth",
                )
                raise OAuthTokenRefreshError("OpenAI token refresh failed") from exc

            if refreshed:
                logger.info("OpenAI token refreshed successfully", category="auth")
                credentials = refreshed
            else:
                logger.warning("openai_token_refresh_failed", category="auth")
                raise OAuthTokenRefreshError("OpenAI token refresh failed")
        else:
            logger.warning(
                "Cannot refresh OpenAI token - no refresh token available",
                category="auth",
            )
            raise OAuthTokenRefreshError("OpenAI token refresh failed")

    return credentials.access_token

get_access_token async

get_access_token()

Override base method to return token even if expired.

Will attempt refresh if expired but still returns the token even if refresh fails, letting the API handle authorization.

Returns:

Type Description
str | None

Access token if available (expired or not), None only if no credentials

Source code in ccproxy/plugins/oauth_codex/manager.py
async def get_access_token(self) -> str | None:
    """Override base method to return token even if expired.

    Will attempt refresh if expired but still returns the token
    even if refresh fails, letting the API handle authorization.

    Returns:
        Access token if available (expired or not), None only if no credentials
    """
    credentials = await self.load_credentials()
    if not credentials:
        logger.debug("no_credentials_found", category="auth")
        return None

    # Check if token is expired
    needs_refresh = self.should_refresh(credentials)

    if needs_refresh:
        try:
            return await self.get_access_token_with_refresh()
        except OAuthTokenRefreshError as exc:
            logger.warning(
                "OpenAI token refresh failed, using existing token",
                error=str(exc),
                category="auth",
            )

    return credentials.access_token