Skip to content

ccproxy.services.credentials.manager

ccproxy.services.credentials.manager

Credentials manager for coordinating storage and OAuth operations.

CredentialsManager

CredentialsManager(
    config=None,
    storage=None,
    oauth_client=None,
    http_client=None,
)

Manager for Claude credentials with storage and OAuth support.

Parameters:

Name Type Description Default
config AuthSettings | None

Credentials configuration (uses defaults if not provided)

None
storage TokenStorage | None

Storage backend (uses JSON file storage if not provided)

None
oauth_client OAuthClient | None

OAuth client (creates one if not provided)

None
http_client AsyncClient | None

HTTP client for OAuth operations

None
Source code in ccproxy/services/credentials/manager.py
def __init__(
    self,
    config: AuthSettings | None = None,
    storage: CredentialsStorageBackend | None = None,
    oauth_client: OAuthClient | None = None,
    http_client: httpx.AsyncClient | None = None,
):
    """Initialize credentials manager.

    Args:
        config: Credentials configuration (uses defaults if not provided)
        storage: Storage backend (uses JSON file storage if not provided)
        oauth_client: OAuth client (creates one if not provided)
        http_client: HTTP client for OAuth operations
    """
    self.config = config or AuthSettings()
    self._storage = storage
    self._oauth_client = oauth_client
    self._http_client = http_client
    self._owns_http_client = http_client is None
    self._refresh_lock = asyncio.Lock()

    # Initialize OAuth client if not provided
    if self._oauth_client is None:
        self._oauth_client = OAuthClient(
            config=self.config.oauth,
        )

storage property

storage

Get the storage backend, creating default if needed.

find_credentials_file async

find_credentials_file()

Find existing credentials file in configured paths.

Returns:

Type Description
Path | None

Path to credentials file if found, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def find_credentials_file(self) -> Path | None:
    """Find existing credentials file in configured paths.

    Returns:
        Path to credentials file if found, None otherwise
    """
    for path_str in self.config.storage.storage_paths:
        path = Path(path_str).expanduser()
        logger.debug("checking_credentials_path", path=str(path))
        if path.exists() and path.is_file():
            logger.info("credentials_file_found", path=str(path))
            return path
        else:
            logger.debug("credentials_path_not_found", path=str(path))

    logger.warning(
        "no_credentials_files_found",
        searched_paths=self.config.storage.storage_paths,
    )
    return None

load async

load()

Load credentials from storage.

Returns:

Type Description
ClaudeCredentials | None

Credentials if found and valid, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from storage.

    Returns:
        Credentials if found and valid, None otherwise
    """
    try:
        return await self.storage.load()
    except Exception as e:
        logger.error("credentials_load_failed", error=str(e))
        return None

save async

save(credentials)

Save credentials to storage.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Source code in ccproxy/services/credentials/manager.py
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to storage.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise
    """
    try:
        return await self.storage.save(credentials)
    except Exception as e:
        logger.error("credentials_save_failed", error=str(e))
        return False

login async

login()

Perform OAuth login and save credentials.

Returns:

Type Description
ClaudeCredentials

New credentials from login

Raises:

Type Description
OAuthLoginError

If login fails

Source code in ccproxy/services/credentials/manager.py
async def login(self) -> ClaudeCredentials:
    """Perform OAuth login and save credentials.

    Returns:
        New credentials from login

    Raises:
        OAuthLoginError: If login fails
    """
    if self._oauth_client is None:
        raise RuntimeError("OAuth client not initialized")
    credentials = await self._oauth_client.login()

    # Fetch and save user profile after successful login
    try:
        profile = await self._oauth_client.fetch_user_profile(
            credentials.claude_ai_oauth.access_token
        )
        if profile:
            # Save profile data
            await self._save_account_profile(profile)

            # Update subscription type based on profile
            determined_subscription = self._determine_subscription_type(profile)
            credentials.claude_ai_oauth.subscription_type = determined_subscription

            logger.debug(
                "subscription_type_set", subscription_type=determined_subscription
            )
        else:
            logger.debug(
                "profile_fetch_skipped", context="login", reason="no_profile_data"
            )
    except Exception as e:
        logger.warning("profile_fetch_failed", context="login", error=str(e))
        # Continue with login even if profile fetch fails

    await self.save(credentials)
    return credentials

get_valid_credentials async

get_valid_credentials()

Get valid credentials, refreshing if necessary.

Returns:

Type Description
ClaudeCredentials

Valid credentials

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

CredentialsExpiredError

If credentials expired and refresh fails

Source code in ccproxy/services/credentials/manager.py
async def get_valid_credentials(self) -> ClaudeCredentials:
    """Get valid credentials, refreshing if necessary.

    Returns:
        Valid credentials

    Raises:
        CredentialsNotFoundError: If no credentials found
        CredentialsExpiredError: If credentials expired and refresh fails
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError("No credentials found. Please login first.")

    # Check if token needs refresh
    oauth_token = credentials.claude_ai_oauth
    should_refresh = self._should_refresh_token(oauth_token)

    if should_refresh:
        async with self._refresh_lock:
            # Re-check if refresh is still needed after acquiring lock
            # Another request might have already refreshed the token
            credentials = await self.load()
            if not credentials:
                raise CredentialsNotFoundError(
                    "No credentials found. Please login first."
                )

            oauth_token = credentials.claude_ai_oauth
            should_refresh = self._should_refresh_token(oauth_token)

            if should_refresh:
                logger.info(
                    "token_refresh_start", reason="expired_or_expiring_soon"
                )
                try:
                    credentials = await self._refresh_token_with_profile(
                        credentials
                    )
                except Exception as e:
                    logger.error(
                        "token_refresh_failed", error=str(e), exc_info=True
                    )
                    if oauth_token.is_expired:
                        raise CredentialsExpiredError(
                            "Token expired and refresh failed. Please login again."
                        ) from e
                    # If not expired yet but refresh failed, return existing token
                    logger.warning(
                        "token_refresh_fallback",
                        reason="refresh_failed_but_token_not_expired",
                    )

    return credentials

get_access_token async

get_access_token()

Get valid access token, refreshing if necessary.

Returns:

Type Description
str

Access token string

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

CredentialsExpiredError

If credentials expired and refresh fails

Source code in ccproxy/services/credentials/manager.py
async def get_access_token(self) -> str:
    """Get valid access token, refreshing if necessary.

    Returns:
        Access token string

    Raises:
        CredentialsNotFoundError: If no credentials found
        CredentialsExpiredError: If credentials expired and refresh fails
    """
    credentials = await self.get_valid_credentials()
    return credentials.claude_ai_oauth.access_token

refresh_token async

refresh_token()

Refresh the access token without checking expiration.

This method directly refreshes the token regardless of whether it's expired. Useful for force-refreshing tokens or testing.

Returns:

Type Description
ClaudeCredentials

Updated credentials with new token

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

RuntimeError

If OAuth client not initialized

ValueError

If no refresh token available

Exception

If token refresh fails

Source code in ccproxy/services/credentials/manager.py
async def refresh_token(self) -> ClaudeCredentials:
    """Refresh the access token without checking expiration.

    This method directly refreshes the token regardless of whether it's expired.
    Useful for force-refreshing tokens or testing.

    Returns:
        Updated credentials with new token

    Raises:
        CredentialsNotFoundError: If no credentials found
        RuntimeError: If OAuth client not initialized
        ValueError: If no refresh token available
        Exception: If token refresh fails
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError("No credentials found. Please login first.")

    logger.info("token_refresh_start", reason="forced")
    return await self._refresh_token_with_profile(credentials)

fetch_user_profile async

fetch_user_profile()

Fetch user profile information.

Returns:

Type Description
UserProfile | None

UserProfile if successful, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def fetch_user_profile(self) -> UserProfile | None:
    """Fetch user profile information.

    Returns:
        UserProfile if successful, None otherwise
    """
    try:
        credentials = await self.get_valid_credentials()
        if self._oauth_client is None:
            raise RuntimeError("OAuth client not initialized")
        profile = await self._oauth_client.fetch_user_profile(
            credentials.claude_ai_oauth.access_token,
        )
        return profile
    except Exception as e:
        logger.error(
            "user_profile_fetch_failed",
            error=str(e),
            exc_info=True,
        )
        return None

get_account_profile async

get_account_profile()

Get saved account profile information.

Returns:

Type Description
UserProfile | None

UserProfile if available, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def get_account_profile(self) -> UserProfile | None:
    """Get saved account profile information.

    Returns:
        UserProfile if available, None otherwise
    """
    return await self._load_account_profile()

validate async

validate()

Validate current credentials.

Returns:

Type Description
ValidationResult

ValidationResult with credentials status and details

Source code in ccproxy/services/credentials/manager.py
async def validate(self) -> ValidationResult:
    """Validate current credentials.

    Returns:
        ValidationResult with credentials status and details
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError()

    return ValidationResult(
        valid=True,
        expired=credentials.claude_ai_oauth.is_expired,
        credentials=credentials,
        path=self.storage.get_location(),
    )

logout async

logout()

Delete stored credentials.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Source code in ccproxy/services/credentials/manager.py
async def logout(self) -> bool:
    """Delete stored credentials.

    Returns:
        True if deleted successfully, False otherwise
    """
    try:
        # Delete both credentials and account profile
        success = await self.storage.delete()
        await self._delete_account_profile()
        return success
    except Exception as e:
        logger.error("credentials_delete_failed", error=str(e), exc_info=True)
        return False