Skip to content

ccproxy.auth

ccproxy.auth

Authentication module for centralized auth handling.

BearerTokenAuthManager

BearerTokenAuthManager(token)

Bases: BaseAuthManager

Authentication manager for static bearer tokens.

Parameters:

Name Type Description Default
token str

Bearer token string

required
Source code in ccproxy/auth/bearer.py
def __init__(self, token: str) -> None:
    """Initialize with a static bearer token.

    Args:
        token: Bearer token string
    """
    self.token = token.strip()
    if not self.token:
        raise ValueError("Token cannot be empty")

get_access_token async

get_access_token()

Get the bearer token.

Returns:

Type Description
str

Bearer token string

Raises:

Type Description
AuthenticationError

If token is invalid

Source code in ccproxy/auth/bearer.py
async def get_access_token(self) -> str:
    """Get the bearer token.

    Returns:
        Bearer token string

    Raises:
        AuthenticationError: If token is invalid
    """
    if not self.token:
        raise AuthenticationError("No bearer token available")
    return self.token

get_credentials async

get_credentials()

Get credentials (not supported for bearer tokens).

Raises:

Type Description
AuthenticationError

Bearer tokens don't support full credentials

Source code in ccproxy/auth/bearer.py
async def get_credentials(self) -> ClaudeCredentials:
    """Get credentials (not supported for bearer tokens).

    Raises:
        AuthenticationError: Bearer tokens don't support full credentials
    """
    raise AuthenticationError(
        "Bearer token authentication doesn't support full credentials"
    )

is_authenticated async

is_authenticated()

Check if bearer token is available.

Returns:

Type Description
bool

True if token is available, False otherwise

Source code in ccproxy/auth/bearer.py
async def is_authenticated(self) -> bool:
    """Check if bearer token is available.

    Returns:
        True if token is available, False otherwise
    """
    return bool(self.token)

get_user_profile async

get_user_profile()

Get user profile (not supported for bearer tokens).

Returns:

Type Description
UserProfile | None

None - bearer tokens don't support user profiles

Source code in ccproxy/auth/bearer.py
async def get_user_profile(self) -> UserProfile | None:
    """Get user profile (not supported for bearer tokens).

    Returns:
        None - bearer tokens don't support user profiles
    """
    return None

CredentialsAuthManager

CredentialsAuthManager(credentials_manager=None)

Bases: BaseAuthManager

Adapter to make CredentialsManager compatible with AuthManager interface.

Parameters:

Name Type Description Default
credentials_manager CredentialsManager | None

CredentialsManager instance, creates new if None

None
Source code in ccproxy/auth/credentials_adapter.py
def __init__(self, credentials_manager: CredentialsManager | None = None) -> None:
    """Initialize with credentials manager.

    Args:
        credentials_manager: CredentialsManager instance, creates new if None
    """
    self._credentials_manager = credentials_manager or CredentialsManager()

get_access_token async

get_access_token()

Get valid access token from credentials manager.

Returns:

Type Description
str

Access token string

Raises:

Type Description
AuthenticationError

If authentication fails

Source code in ccproxy/auth/credentials_adapter.py
async def get_access_token(self) -> str:
    """Get valid access token from credentials manager.

    Returns:
        Access token string

    Raises:
        AuthenticationError: If authentication fails
    """
    try:
        return await self._credentials_manager.get_access_token()
    except CredentialsNotFoundError as e:
        raise AuthenticationError("No credentials found") from e
    except CredentialsExpiredError as e:
        raise AuthenticationError("Credentials expired") from e
    except CredentialsError as e:
        raise AuthenticationError(f"Credentials error: {e}") from e

get_credentials async

get_credentials()

Get valid credentials from credentials manager.

Returns:

Type Description
ClaudeCredentials

Valid credentials

Raises:

Type Description
AuthenticationError

If authentication fails

Source code in ccproxy/auth/credentials_adapter.py
async def get_credentials(self) -> ClaudeCredentials:
    """Get valid credentials from credentials manager.

    Returns:
        Valid credentials

    Raises:
        AuthenticationError: If authentication fails
    """
    try:
        return await self._credentials_manager.get_valid_credentials()
    except CredentialsNotFoundError as e:
        raise AuthenticationError("No credentials found") from e
    except CredentialsExpiredError as e:
        raise AuthenticationError("Credentials expired") from e
    except CredentialsError as e:
        raise AuthenticationError(f"Credentials error: {e}") from e

is_authenticated async

is_authenticated()

Check if current authentication is valid.

Returns:

Type Description
bool

True if authenticated, False otherwise

Source code in ccproxy/auth/credentials_adapter.py
async def is_authenticated(self) -> bool:
    """Check if current authentication is valid.

    Returns:
        True if authenticated, False otherwise
    """
    try:
        await self._credentials_manager.get_valid_credentials()
        return True
    except CredentialsError:
        return False

get_user_profile async

get_user_profile()

Get user profile information.

Returns:

Type Description
UserProfile | None

UserProfile if available, None otherwise

Source code in ccproxy/auth/credentials_adapter.py
async def get_user_profile(self) -> UserProfile | None:
    """Get user profile information.

    Returns:
        UserProfile if available, None otherwise
    """
    try:
        return await self._credentials_manager.fetch_user_profile()
    except CredentialsError:
        return None

AuthenticationError

Bases: Exception

Base authentication error.

AuthenticationRequiredError

Bases: AuthenticationError

Authentication is required but not provided.

CredentialsError

Bases: AuthenticationError

Base credentials error.

CredentialsExpiredError

Bases: CredentialsError

Credentials expired error.

CredentialsInvalidError

Bases: CredentialsError

Credentials are invalid or malformed.

CredentialsNotFoundError

Bases: CredentialsError

Credentials not found error.

CredentialsStorageError

Bases: CredentialsError

Error occurred during credentials storage operations.

InsufficientPermissionsError

Bases: AuthenticationError

Insufficient permissions for the requested operation.

InvalidTokenError

Bases: AuthenticationError

Invalid or expired token.

OAuthCallbackError

Bases: OAuthError

OAuth callback failed.

OAuthError

Bases: AuthenticationError

Base OAuth error.

OAuthLoginError

Bases: OAuthError

OAuth login failed.

OAuthTokenRefreshError

Bases: OAuthError

OAuth token refresh failed.

AuthManager

Bases: Protocol

Protocol for authentication managers.

get_access_token async

get_access_token()

Get valid access token.

Returns:

Type Description
str

Access token string

Raises:

Type Description
AuthenticationError

If authentication fails

Source code in ccproxy/auth/manager.py
async def get_access_token(self) -> str:
    """Get valid access token.

    Returns:
        Access token string

    Raises:
        AuthenticationError: If authentication fails
    """
    ...

get_credentials async

get_credentials()

Get valid credentials.

Returns:

Type Description
ClaudeCredentials

Valid credentials

Raises:

Type Description
AuthenticationError

If authentication fails

Source code in ccproxy/auth/manager.py
async def get_credentials(self) -> ClaudeCredentials:
    """Get valid credentials.

    Returns:
        Valid credentials

    Raises:
        AuthenticationError: If authentication fails
    """
    ...

is_authenticated async

is_authenticated()

Check if current authentication is valid.

Returns:

Type Description
bool

True if authenticated, False otherwise

Source code in ccproxy/auth/manager.py
async def is_authenticated(self) -> bool:
    """Check if current authentication is valid.

    Returns:
        True if authenticated, False otherwise
    """
    ...

get_user_profile async

get_user_profile()

Get user profile information.

Returns:

Type Description
UserProfile | None

UserProfile if available, None otherwise

Source code in ccproxy/auth/manager.py
async def get_user_profile(self) -> UserProfile | None:
    """Get user profile information.

    Returns:
        UserProfile if available, None otherwise
    """
    ...

BaseAuthManager

Bases: ABC

Base class for authentication managers.

get_access_token abstractmethod async

get_access_token()

Get valid access token.

Returns:

Type Description
str

Access token string

Raises:

Type Description
AuthenticationError

If authentication fails

Source code in ccproxy/auth/manager.py
@abstractmethod
async def get_access_token(self) -> str:
    """Get valid access token.

    Returns:
        Access token string

    Raises:
        AuthenticationError: If authentication fails
    """
    pass

get_credentials abstractmethod async

get_credentials()

Get valid credentials.

Returns:

Type Description
ClaudeCredentials

Valid credentials

Raises:

Type Description
AuthenticationError

If authentication fails

Source code in ccproxy/auth/manager.py
@abstractmethod
async def get_credentials(self) -> ClaudeCredentials:
    """Get valid credentials.

    Returns:
        Valid credentials

    Raises:
        AuthenticationError: If authentication fails
    """
    pass

is_authenticated abstractmethod async

is_authenticated()

Check if current authentication is valid.

Returns:

Type Description
bool

True if authenticated, False otherwise

Source code in ccproxy/auth/manager.py
@abstractmethod
async def is_authenticated(self) -> bool:
    """Check if current authentication is valid.

    Returns:
        True if authenticated, False otherwise
    """
    pass

get_user_profile async

get_user_profile()

Get user profile information.

Returns:

Type Description
UserProfile | None

UserProfile if available, None otherwise

Source code in ccproxy/auth/manager.py
async def get_user_profile(self) -> UserProfile | None:
    """Get user profile information.

    Returns:
        UserProfile if available, None otherwise
    """
    return None

JsonFileTokenStorage

JsonFileTokenStorage(file_path)

Bases: TokenStorage

JSON file storage implementation for Claude credentials with keyring fallback.

Parameters:

Name Type Description Default
file_path Path

Path to the JSON credentials file

required
Source code in ccproxy/auth/storage/json_file.py
def __init__(self, file_path: Path):
    """Initialize JSON file storage.

    Args:
        file_path: Path to the JSON credentials file
    """
    self.file_path = file_path

load async

load()

Load credentials from JSON file .

Returns:

Type Description
ClaudeCredentials | None

Parsed credentials if found and valid, None otherwise

Raises:

Type Description
CredentialsInvalidError

If the JSON file is invalid

CredentialsStorageError

If there's an error reading the file

Source code in ccproxy/auth/storage/json_file.py
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from JSON file .

    Returns:
        Parsed credentials if found and valid, None otherwise

    Raises:
        CredentialsInvalidError: If the JSON file is invalid
        CredentialsStorageError: If there's an error reading the file
    """
    if not await self.exists():
        logger.debug("credentials_file_not_found", path=str(self.file_path))
        return None

    try:
        logger.debug(
            "credentials_load_start", source="file", path=str(self.file_path)
        )
        with self.file_path.open() as f:
            data = json.load(f)

        credentials = ClaudeCredentials.model_validate(data)
        logger.debug("credentials_load_completed", source="file")

        return credentials

    except json.JSONDecodeError as e:
        raise CredentialsInvalidError(
            f"Failed to parse credentials file {self.file_path}: {e}"
        ) from e
    except Exception as e:
        raise CredentialsStorageError(
            f"Error loading credentials from {self.file_path}: {e}"
        ) from e

save async

save(credentials)

Save credentials to both keyring and JSON file.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error writing the file

Source code in ccproxy/auth/storage/json_file.py
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to both keyring and JSON file.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error writing the file
    """
    try:
        # Convert to dict with proper aliases
        data = credentials.model_dump(by_alias=True, mode="json")

        # Always save to file as well
        # Ensure parent directory exists
        self.file_path.parent.mkdir(parents=True, exist_ok=True)

        # Use atomic write: write to temp file then rename
        temp_path = self.file_path.with_suffix(".tmp")

        try:
            with temp_path.open("w") as f:
                json.dump(data, f, indent=2)

            # Set appropriate file permissions (read/write for owner only)
            temp_path.chmod(0o600)

            # Atomically replace the original file
            Path.replace(temp_path, self.file_path)

            logger.debug(
                "credentials_save_completed",
                source="file",
                path=str(self.file_path),
            )
            return True
        except Exception as e:
            raise
        finally:
            # Clean up temp file if it exists
            if temp_path.exists():
                with contextlib.suppress(Exception):
                    temp_path.unlink()

    except Exception as e:
        raise CredentialsStorageError(f"Error saving credentials: {e}") from e

exists async

exists()

Check if credentials file exists.

Returns:

Type Description
bool

True if file exists, False otherwise

Source code in ccproxy/auth/storage/json_file.py
async def exists(self) -> bool:
    """Check if credentials file exists.

    Returns:
        True if file exists, False otherwise
    """
    return self.file_path.exists() and self.file_path.is_file()

delete async

delete()

Delete credentials from both keyring and file.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error deleting the file

Source code in ccproxy/auth/storage/json_file.py
async def delete(self) -> bool:
    """Delete credentials from both keyring and file.

    Returns:
        True if deleted successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error deleting the file
    """
    deleted = False

    # Delete from file
    try:
        if await self.exists():
            self.file_path.unlink()
            logger.debug(
                "credentials_delete_completed",
                source="file",
                path=str(self.file_path),
            )
            deleted = True
    except Exception as e:
        if not deleted:  # Only raise if we failed to delete from both
            raise CredentialsStorageError(f"Error deleting credentials: {e}") from e
        logger.debug("credentials_delete_partial", source="file", error=str(e))

    return deleted

get_location

get_location()

Get the storage location description.

Returns:

Type Description
str

Path to the JSON file with keyring info if available

Source code in ccproxy/auth/storage/json_file.py
def get_location(self) -> str:
    """Get the storage location description.

    Returns:
        Path to the JSON file with keyring info if available
    """
    return str(self.file_path)

KeyringTokenStorage

KeyringTokenStorage(
    service_name="claude-code-proxy", username="default"
)

Bases: TokenStorage

OS keyring storage implementation for Claude credentials.

Parameters:

Name Type Description Default
service_name str

Name of the service in the keyring

'claude-code-proxy'
username str

Username to associate with the stored credentials

'default'
Source code in ccproxy/auth/storage/keyring.py
def __init__(
    self, service_name: str = "claude-code-proxy", username: str = "default"
):
    """Initialize keyring storage.

    Args:
        service_name: Name of the service in the keyring
        username: Username to associate with the stored credentials
    """
    self.service_name = service_name
    self.username = username

load async

load()

Load credentials from the OS keyring.

Returns:

Type Description
ClaudeCredentials | None

Parsed credentials if found and valid, None otherwise

Raises:

Type Description
CredentialsStorageError

If the stored data is invalid

CredentialsStorageError

If there's an error reading from keyring

Source code in ccproxy/auth/storage/keyring.py
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from the OS keyring.

    Returns:
        Parsed credentials if found and valid, None otherwise

    Raises:
        CredentialsStorageError: If the stored data is invalid
        CredentialsStorageError: If there's an error reading from keyring
    """
    try:
        import keyring
    except ImportError as e:
        raise CredentialsStorageError(
            "keyring package is required for keyring storage. "
            "Install it with: pip install keyring"
        ) from e

    try:
        logger.debug(
            "credentials_load_start",
            source="keyring",
            service_name=self.service_name,
        )
        password = keyring.get_password(self.service_name, self.username)

        if password is None:
            logger.debug(
                "credentials_not_found",
                source="keyring",
                service_name=self.service_name,
            )
            return None

        # Parse the stored JSON
        data = json.loads(password)
        credentials = ClaudeCredentials.model_validate(data)

        self._log_credential_details(credentials)
        return credentials

    except json.JSONDecodeError as e:
        raise CredentialsStorageError(
            f"Failed to parse credentials from keyring: {e}"
        ) from e
    except Exception as e:
        raise CredentialsStorageError(
            f"Error loading credentials from keyring: {e}"
        ) from e

save async

save(credentials)

Save credentials to the OS keyring.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error writing to keyring

Source code in ccproxy/auth/storage/keyring.py
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to the OS keyring.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error writing to keyring
    """
    try:
        import keyring
    except ImportError as e:
        raise CredentialsStorageError(
            "keyring package is required for keyring storage. "
            "Install it with: pip install keyring"
        ) from e

    try:
        # Convert to JSON string
        data = credentials.model_dump(by_alias=True)
        json_data = json.dumps(data)

        # Store in keyring
        keyring.set_password(self.service_name, self.username, json_data)

        logger.debug(
            "credentials_save_completed",
            source="keyring",
            service_name=self.service_name,
        )
        return True

    except Exception as e:
        raise CredentialsStorageError(
            f"Error saving credentials to keyring: {e}"
        ) from e

exists async

exists()

Check if credentials exist in the keyring.

Returns:

Type Description
bool

True if credentials exist, False otherwise

Source code in ccproxy/auth/storage/keyring.py
async def exists(self) -> bool:
    """Check if credentials exist in the keyring.

    Returns:
        True if credentials exist, False otherwise
    """
    try:
        import keyring
    except ImportError:
        return False

    try:
        password = keyring.get_password(self.service_name, self.username)
        return password is not None
    except Exception:
        return False

delete async

delete()

Delete credentials from the keyring.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error deleting from keyring

Source code in ccproxy/auth/storage/keyring.py
async def delete(self) -> bool:
    """Delete credentials from the keyring.

    Returns:
        True if deleted successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error deleting from keyring
    """
    try:
        import keyring
    except ImportError as e:
        raise CredentialsStorageError(
            "keyring package is required for keyring storage. "
            "Install it with: pip install keyring"
        ) from e

    try:
        if await self.exists():
            keyring.delete_password(self.service_name, self.username)
            logger.debug(
                "credentials_delete_completed",
                source="keyring",
                service_name=self.service_name,
            )
            return True
        return False
    except Exception as e:
        raise CredentialsStorageError(
            f"Error deleting credentials from keyring: {e}"
        ) from e

get_location

get_location()

Get the storage location description.

Returns:

Type Description
str

Description of the keyring storage location

Source code in ccproxy/auth/storage/keyring.py
def get_location(self) -> str:
    """Get the storage location description.

    Returns:
        Description of the keyring storage location
    """
    return f"OS keyring (service: {self.service_name}, user: {self.username})"

TokenStorage

Bases: ABC

Abstract interface for token storage operations.

load abstractmethod async

load()

Load credentials from storage.

Returns:

Type Description
ClaudeCredentials | None

Parsed credentials if found and valid, None otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from storage.

    Returns:
        Parsed credentials if found and valid, None otherwise
    """
    pass

save abstractmethod async

save(credentials)

Save credentials to storage.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to storage.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise
    """
    pass

exists abstractmethod async

exists()

Check if credentials exist in storage.

Returns:

Type Description
bool

True if credentials exist, False otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def exists(self) -> bool:
    """Check if credentials exist in storage.

    Returns:
        True if credentials exist, False otherwise
    """
    pass

delete abstractmethod async

delete()

Delete credentials from storage.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def delete(self) -> bool:
    """Delete credentials from storage.

    Returns:
        True if deleted successfully, False otherwise
    """
    pass

get_location abstractmethod

get_location()

Get the storage location description.

Returns:

Type Description
str

Human-readable description of where credentials are stored

Source code in ccproxy/auth/storage/base.py
@abstractmethod
def get_location(self) -> str:
    """Get the storage location description.

    Returns:
        Human-readable description of where credentials are stored
    """
    pass

CredentialsManager

CredentialsManager(
    config=None,
    storage=None,
    oauth_client=None,
    http_client=None,
)

Manager for Claude credentials with storage and OAuth support.

Parameters:

Name Type Description Default
config AuthSettings | None

Credentials configuration (uses defaults if not provided)

None
storage TokenStorage | None

Storage backend (uses JSON file storage if not provided)

None
oauth_client OAuthClient | None

OAuth client (creates one if not provided)

None
http_client AsyncClient | None

HTTP client for OAuth operations

None
Source code in ccproxy/services/credentials/manager.py
def __init__(
    self,
    config: AuthSettings | None = None,
    storage: CredentialsStorageBackend | None = None,
    oauth_client: OAuthClient | None = None,
    http_client: httpx.AsyncClient | None = None,
):
    """Initialize credentials manager.

    Args:
        config: Credentials configuration (uses defaults if not provided)
        storage: Storage backend (uses JSON file storage if not provided)
        oauth_client: OAuth client (creates one if not provided)
        http_client: HTTP client for OAuth operations
    """
    self.config = config or AuthSettings()
    self._storage = storage
    self._oauth_client = oauth_client
    self._http_client = http_client
    self._owns_http_client = http_client is None
    self._refresh_lock = asyncio.Lock()

    # Initialize OAuth client if not provided
    if self._oauth_client is None:
        self._oauth_client = OAuthClient(
            config=self.config.oauth,
        )

storage property

storage

Get the storage backend, creating default if needed.

find_credentials_file async

find_credentials_file()

Find existing credentials file in configured paths.

Returns:

Type Description
Path | None

Path to credentials file if found, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def find_credentials_file(self) -> Path | None:
    """Find existing credentials file in configured paths.

    Returns:
        Path to credentials file if found, None otherwise
    """
    for path_str in self.config.storage.storage_paths:
        path = Path(path_str).expanduser()
        logger.debug("checking_credentials_path", path=str(path))
        if path.exists() and path.is_file():
            logger.info("credentials_file_found", path=str(path))
            return path
        else:
            logger.debug("credentials_path_not_found", path=str(path))

    logger.warning(
        "no_credentials_files_found",
        searched_paths=self.config.storage.storage_paths,
    )
    return None

load async

load()

Load credentials from storage.

Returns:

Type Description
ClaudeCredentials | None

Credentials if found and valid, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from storage.

    Returns:
        Credentials if found and valid, None otherwise
    """
    try:
        return await self.storage.load()
    except Exception as e:
        logger.error("credentials_load_failed", error=str(e))
        return None

save async

save(credentials)

Save credentials to storage.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Source code in ccproxy/services/credentials/manager.py
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to storage.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise
    """
    try:
        return await self.storage.save(credentials)
    except Exception as e:
        logger.error("credentials_save_failed", error=str(e))
        return False

login async

login()

Perform OAuth login and save credentials.

Returns:

Type Description
ClaudeCredentials

New credentials from login

Raises:

Type Description
OAuthLoginError

If login fails

Source code in ccproxy/services/credentials/manager.py
async def login(self) -> ClaudeCredentials:
    """Perform OAuth login and save credentials.

    Returns:
        New credentials from login

    Raises:
        OAuthLoginError: If login fails
    """
    if self._oauth_client is None:
        raise RuntimeError("OAuth client not initialized")
    credentials = await self._oauth_client.login()

    # Fetch and save user profile after successful login
    try:
        profile = await self._oauth_client.fetch_user_profile(
            credentials.claude_ai_oauth.access_token
        )
        if profile:
            # Save profile data
            await self._save_account_profile(profile)

            # Update subscription type based on profile
            determined_subscription = self._determine_subscription_type(profile)
            credentials.claude_ai_oauth.subscription_type = determined_subscription

            logger.debug(
                "subscription_type_set", subscription_type=determined_subscription
            )
        else:
            logger.debug(
                "profile_fetch_skipped", context="login", reason="no_profile_data"
            )
    except Exception as e:
        logger.warning("profile_fetch_failed", context="login", error=str(e))
        # Continue with login even if profile fetch fails

    await self.save(credentials)
    return credentials

get_valid_credentials async

get_valid_credentials()

Get valid credentials, refreshing if necessary.

Returns:

Type Description
ClaudeCredentials

Valid credentials

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

CredentialsExpiredError

If credentials expired and refresh fails

Source code in ccproxy/services/credentials/manager.py
async def get_valid_credentials(self) -> ClaudeCredentials:
    """Get valid credentials, refreshing if necessary.

    Returns:
        Valid credentials

    Raises:
        CredentialsNotFoundError: If no credentials found
        CredentialsExpiredError: If credentials expired and refresh fails
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError("No credentials found. Please login first.")

    # Check if token needs refresh
    oauth_token = credentials.claude_ai_oauth
    should_refresh = self._should_refresh_token(oauth_token)

    if should_refresh:
        async with self._refresh_lock:
            # Re-check if refresh is still needed after acquiring lock
            # Another request might have already refreshed the token
            credentials = await self.load()
            if not credentials:
                raise CredentialsNotFoundError(
                    "No credentials found. Please login first."
                )

            oauth_token = credentials.claude_ai_oauth
            should_refresh = self._should_refresh_token(oauth_token)

            if should_refresh:
                logger.info(
                    "token_refresh_start", reason="expired_or_expiring_soon"
                )
                try:
                    credentials = await self._refresh_token_with_profile(
                        credentials
                    )
                except Exception as e:
                    logger.error(
                        "token_refresh_failed", error=str(e), exc_info=True
                    )
                    if oauth_token.is_expired:
                        raise CredentialsExpiredError(
                            "Token expired and refresh failed. Please login again."
                        ) from e
                    # If not expired yet but refresh failed, return existing token
                    logger.warning(
                        "token_refresh_fallback",
                        reason="refresh_failed_but_token_not_expired",
                    )

    return credentials

get_access_token async

get_access_token()

Get valid access token, refreshing if necessary.

Returns:

Type Description
str

Access token string

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

CredentialsExpiredError

If credentials expired and refresh fails

Source code in ccproxy/services/credentials/manager.py
async def get_access_token(self) -> str:
    """Get valid access token, refreshing if necessary.

    Returns:
        Access token string

    Raises:
        CredentialsNotFoundError: If no credentials found
        CredentialsExpiredError: If credentials expired and refresh fails
    """
    credentials = await self.get_valid_credentials()
    return credentials.claude_ai_oauth.access_token

refresh_token async

refresh_token()

Refresh the access token without checking expiration.

This method directly refreshes the token regardless of whether it's expired. Useful for force-refreshing tokens or testing.

Returns:

Type Description
ClaudeCredentials

Updated credentials with new token

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

RuntimeError

If OAuth client not initialized

ValueError

If no refresh token available

Exception

If token refresh fails

Source code in ccproxy/services/credentials/manager.py
async def refresh_token(self) -> ClaudeCredentials:
    """Refresh the access token without checking expiration.

    This method directly refreshes the token regardless of whether it's expired.
    Useful for force-refreshing tokens or testing.

    Returns:
        Updated credentials with new token

    Raises:
        CredentialsNotFoundError: If no credentials found
        RuntimeError: If OAuth client not initialized
        ValueError: If no refresh token available
        Exception: If token refresh fails
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError("No credentials found. Please login first.")

    logger.info("token_refresh_start", reason="forced")
    return await self._refresh_token_with_profile(credentials)

fetch_user_profile async

fetch_user_profile()

Fetch user profile information.

Returns:

Type Description
UserProfile | None

UserProfile if successful, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def fetch_user_profile(self) -> UserProfile | None:
    """Fetch user profile information.

    Returns:
        UserProfile if successful, None otherwise
    """
    try:
        credentials = await self.get_valid_credentials()
        if self._oauth_client is None:
            raise RuntimeError("OAuth client not initialized")
        profile = await self._oauth_client.fetch_user_profile(
            credentials.claude_ai_oauth.access_token,
        )
        return profile
    except Exception as e:
        logger.error(
            "user_profile_fetch_failed",
            error=str(e),
            exc_info=True,
        )
        return None

get_account_profile async

get_account_profile()

Get saved account profile information.

Returns:

Type Description
UserProfile | None

UserProfile if available, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def get_account_profile(self) -> UserProfile | None:
    """Get saved account profile information.

    Returns:
        UserProfile if available, None otherwise
    """
    return await self._load_account_profile()

validate async

validate()

Validate current credentials.

Returns:

Type Description
ValidationResult

ValidationResult with credentials status and details

Source code in ccproxy/services/credentials/manager.py
async def validate(self) -> ValidationResult:
    """Validate current credentials.

    Returns:
        ValidationResult with credentials status and details
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError()

    return ValidationResult(
        valid=True,
        expired=credentials.claude_ai_oauth.is_expired,
        credentials=credentials,
        path=self.storage.get_location(),
    )

logout async

logout()

Delete stored credentials.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Source code in ccproxy/services/credentials/manager.py
async def logout(self) -> bool:
    """Delete stored credentials.

    Returns:
        True if deleted successfully, False otherwise
    """
    try:
        # Delete both credentials and account profile
        success = await self.storage.delete()
        await self._delete_account_profile()
        return success
    except Exception as e:
        logger.error("credentials_delete_failed", error=str(e), exc_info=True)
        return False

get_access_token async

get_access_token(auth_manager)

Get access token from authenticated manager.

Parameters:

Name Type Description Default
auth_manager Annotated[AuthManager, Depends(require_auth)]

Authentication manager

required

Returns:

Type Description
str

Access token string

Raises:

Type Description
HTTPException

If token retrieval fails

Source code in ccproxy/auth/dependencies.py
async def get_access_token(
    auth_manager: Annotated[AuthManager, Depends(require_auth)],
) -> str:
    """Get access token from authenticated manager.

    Args:
        auth_manager: Authentication manager

    Returns:
        Access token string

    Raises:
        HTTPException: If token retrieval fails
    """
    try:
        return await auth_manager.get_access_token()
    except AuthenticationError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        ) from e

get_auth_manager async

get_auth_manager(credentials=None)

Get authentication manager with fallback strategy.

Try bearer token first, then fall back to credentials.

Parameters:

Name Type Description Default
credentials Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_scheme)]

HTTP authorization credentials

None

Returns:

Type Description
AuthManager

AuthManager instance

Raises:

Type Description
HTTPException

If no valid authentication available

Source code in ccproxy/auth/dependencies.py
async def get_auth_manager(
    credentials: Annotated[
        HTTPAuthorizationCredentials | None, Depends(bearer_scheme)
    ] = None,
) -> AuthManager:
    """Get authentication manager with fallback strategy.

    Try bearer token first, then fall back to credentials.

    Args:
        credentials: HTTP authorization credentials

    Returns:
        AuthManager instance

    Raises:
        HTTPException: If no valid authentication available
    """
    # Import here to avoid circular imports
    from ccproxy.config.settings import get_settings

    settings = get_settings()
    return await _get_auth_manager_with_settings(credentials, settings)

get_bearer_auth_manager async

get_bearer_auth_manager(credentials)

Get bearer token authentication manager.

Parameters:

Name Type Description Default
credentials Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_scheme)]

HTTP authorization credentials

required

Returns:

Type Description
AuthManager

BearerTokenAuthManager instance

Raises:

Type Description
HTTPException

If no valid bearer token provided

Source code in ccproxy/auth/dependencies.py
async def get_bearer_auth_manager(
    credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_scheme)],
) -> AuthManager:
    """Get bearer token authentication manager.

    Args:
        credentials: HTTP authorization credentials

    Returns:
        BearerTokenAuthManager instance

    Raises:
        HTTPException: If no valid bearer token provided
    """
    if not credentials or not credentials.credentials:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Bearer token required",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return BearerTokenAuthManager(credentials.credentials)

get_credentials_auth_manager async

get_credentials_auth_manager()

Get credentials-based authentication manager.

Returns:

Type Description
AuthManager

CredentialsAuthManager instance

Source code in ccproxy/auth/dependencies.py
async def get_credentials_auth_manager() -> AuthManager:
    """Get credentials-based authentication manager.

    Returns:
        CredentialsAuthManager instance
    """
    return CredentialsAuthManager()

require_auth async

require_auth(auth_manager)

Require authentication for endpoint.

Parameters:

Name Type Description Default
auth_manager Annotated[AuthManager, Depends(get_auth_manager)]

Authentication manager

required

Returns:

Type Description
AuthManager

AuthManager instance

Raises:

Type Description
HTTPException

If authentication fails

Source code in ccproxy/auth/dependencies.py
async def require_auth(
    auth_manager: Annotated[AuthManager, Depends(get_auth_manager)],
) -> AuthManager:
    """Require authentication for endpoint.

    Args:
        auth_manager: Authentication manager

    Returns:
        AuthManager instance

    Raises:
        HTTPException: If authentication fails
    """
    try:
        if not await auth_manager.is_authenticated():
            raise AuthenticationRequiredError("Authentication required")
        return auth_manager
    except AuthenticationError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e),
            headers={"WWW-Authenticate": "Bearer"},
        ) from e