Skip to content

ccproxy.services.credentials

ccproxy.services.credentials

Credentials management package.

CredentialsError

Bases: AuthenticationError

Base credentials error.

CredentialsExpiredError

Bases: CredentialsError

Credentials expired error.

CredentialsInvalidError

Bases: CredentialsError

Credentials are invalid or malformed.

CredentialsNotFoundError

Bases: CredentialsError

Credentials not found error.

CredentialsStorageError

Bases: CredentialsError

Error occurred during credentials storage operations.

OAuthCallbackError

Bases: OAuthError

OAuth callback failed.

OAuthError

Bases: AuthenticationError

Base OAuth error.

OAuthLoginError

Bases: OAuthError

OAuth login failed.

OAuthTokenRefreshError

Bases: OAuthError

OAuth token refresh failed.

AccountInfo

Bases: BaseModel

Account information from OAuth API.

email_address property

email_address

Compatibility property for email_address.

ClaudeCredentials

Bases: BaseModel

Claude credentials from the credentials file.

OAuthToken

Bases: BaseModel

OAuth token information from Claude credentials.

is_expired property

is_expired

Check if the token is expired.

expires_at_datetime property

expires_at_datetime

Get expiration as datetime object.

OrganizationInfo

Bases: BaseModel

Organization information from OAuth API.

UserProfile

Bases: BaseModel

User profile information from Anthropic OAuth API.

JsonFileStorage

JsonFileStorage(file_path)

Bases: TokenStorage

JSON file storage implementation for Claude credentials with keyring fallback.

Parameters:

Name Type Description Default
file_path Path

Path to the JSON credentials file

required
Source code in ccproxy/auth/storage/json_file.py
def __init__(self, file_path: Path):
    """Initialize JSON file storage.

    Args:
        file_path: Path to the JSON credentials file
    """
    self.file_path = file_path

load async

load()

Load credentials from JSON file .

Returns:

Type Description
ClaudeCredentials | None

Parsed credentials if found and valid, None otherwise

Raises:

Type Description
CredentialsInvalidError

If the JSON file is invalid

CredentialsStorageError

If there's an error reading the file

Source code in ccproxy/auth/storage/json_file.py
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from JSON file .

    Returns:
        Parsed credentials if found and valid, None otherwise

    Raises:
        CredentialsInvalidError: If the JSON file is invalid
        CredentialsStorageError: If there's an error reading the file
    """
    if not await self.exists():
        logger.debug("credentials_file_not_found", path=str(self.file_path))
        return None

    try:
        logger.debug(
            "credentials_load_start", source="file", path=str(self.file_path)
        )
        with self.file_path.open() as f:
            data = json.load(f)

        credentials = ClaudeCredentials.model_validate(data)
        logger.debug("credentials_load_completed", source="file")

        return credentials

    except json.JSONDecodeError as e:
        raise CredentialsInvalidError(
            f"Failed to parse credentials file {self.file_path}: {e}"
        ) from e
    except Exception as e:
        raise CredentialsStorageError(
            f"Error loading credentials from {self.file_path}: {e}"
        ) from e

save async

save(credentials)

Save credentials to both keyring and JSON file.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error writing the file

Source code in ccproxy/auth/storage/json_file.py
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to both keyring and JSON file.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error writing the file
    """
    try:
        # Convert to dict with proper aliases
        data = credentials.model_dump(by_alias=True, mode="json")

        # Always save to file as well
        # Ensure parent directory exists
        self.file_path.parent.mkdir(parents=True, exist_ok=True)

        # Use atomic write: write to temp file then rename
        temp_path = self.file_path.with_suffix(".tmp")

        try:
            with temp_path.open("w") as f:
                json.dump(data, f, indent=2)

            # Set appropriate file permissions (read/write for owner only)
            temp_path.chmod(0o600)

            # Atomically replace the original file
            Path.replace(temp_path, self.file_path)

            logger.debug(
                "credentials_save_completed",
                source="file",
                path=str(self.file_path),
            )
            return True
        except Exception as e:
            raise
        finally:
            # Clean up temp file if it exists
            if temp_path.exists():
                with contextlib.suppress(Exception):
                    temp_path.unlink()

    except Exception as e:
        raise CredentialsStorageError(f"Error saving credentials: {e}") from e

exists async

exists()

Check if credentials file exists.

Returns:

Type Description
bool

True if file exists, False otherwise

Source code in ccproxy/auth/storage/json_file.py
async def exists(self) -> bool:
    """Check if credentials file exists.

    Returns:
        True if file exists, False otherwise
    """
    return self.file_path.exists() and self.file_path.is_file()

delete async

delete()

Delete credentials from both keyring and file.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Raises:

Type Description
CredentialsStorageError

If there's an error deleting the file

Source code in ccproxy/auth/storage/json_file.py
async def delete(self) -> bool:
    """Delete credentials from both keyring and file.

    Returns:
        True if deleted successfully, False otherwise

    Raises:
        CredentialsStorageError: If there's an error deleting the file
    """
    deleted = False

    # Delete from file
    try:
        if await self.exists():
            self.file_path.unlink()
            logger.debug(
                "credentials_delete_completed",
                source="file",
                path=str(self.file_path),
            )
            deleted = True
    except Exception as e:
        if not deleted:  # Only raise if we failed to delete from both
            raise CredentialsStorageError(f"Error deleting credentials: {e}") from e
        logger.debug("credentials_delete_partial", source="file", error=str(e))

    return deleted

get_location

get_location()

Get the storage location description.

Returns:

Type Description
str

Path to the JSON file with keyring info if available

Source code in ccproxy/auth/storage/json_file.py
def get_location(self) -> str:
    """Get the storage location description.

    Returns:
        Path to the JSON file with keyring info if available
    """
    return str(self.file_path)

CredentialsStorageBackend

Bases: ABC

Abstract interface for token storage operations.

load abstractmethod async

load()

Load credentials from storage.

Returns:

Type Description
ClaudeCredentials | None

Parsed credentials if found and valid, None otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from storage.

    Returns:
        Parsed credentials if found and valid, None otherwise
    """
    pass

save abstractmethod async

save(credentials)

Save credentials to storage.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to storage.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise
    """
    pass

exists abstractmethod async

exists()

Check if credentials exist in storage.

Returns:

Type Description
bool

True if credentials exist, False otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def exists(self) -> bool:
    """Check if credentials exist in storage.

    Returns:
        True if credentials exist, False otherwise
    """
    pass

delete abstractmethod async

delete()

Delete credentials from storage.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Source code in ccproxy/auth/storage/base.py
@abstractmethod
async def delete(self) -> bool:
    """Delete credentials from storage.

    Returns:
        True if deleted successfully, False otherwise
    """
    pass

get_location abstractmethod

get_location()

Get the storage location description.

Returns:

Type Description
str

Human-readable description of where credentials are stored

Source code in ccproxy/auth/storage/base.py
@abstractmethod
def get_location(self) -> str:
    """Get the storage location description.

    Returns:
        Human-readable description of where credentials are stored
    """
    pass

CredentialsConfig

Bases: BaseModel

Configuration for credentials management.

OAuthConfig

Bases: BaseModel

OAuth configuration settings.

CredentialsManager

CredentialsManager(
    config=None,
    storage=None,
    oauth_client=None,
    http_client=None,
)

Manager for Claude credentials with storage and OAuth support.

Parameters:

Name Type Description Default
config AuthSettings | None

Credentials configuration (uses defaults if not provided)

None
storage TokenStorage | None

Storage backend (uses JSON file storage if not provided)

None
oauth_client OAuthClient | None

OAuth client (creates one if not provided)

None
http_client AsyncClient | None

HTTP client for OAuth operations

None
Source code in ccproxy/services/credentials/manager.py
def __init__(
    self,
    config: AuthSettings | None = None,
    storage: CredentialsStorageBackend | None = None,
    oauth_client: OAuthClient | None = None,
    http_client: httpx.AsyncClient | None = None,
):
    """Initialize credentials manager.

    Args:
        config: Credentials configuration (uses defaults if not provided)
        storage: Storage backend (uses JSON file storage if not provided)
        oauth_client: OAuth client (creates one if not provided)
        http_client: HTTP client for OAuth operations
    """
    self.config = config or AuthSettings()
    self._storage = storage
    self._oauth_client = oauth_client
    self._http_client = http_client
    self._owns_http_client = http_client is None
    self._refresh_lock = asyncio.Lock()

    # Initialize OAuth client if not provided
    if self._oauth_client is None:
        self._oauth_client = OAuthClient(
            config=self.config.oauth,
        )

storage property

storage

Get the storage backend, creating default if needed.

find_credentials_file async

find_credentials_file()

Find existing credentials file in configured paths.

Returns:

Type Description
Path | None

Path to credentials file if found, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def find_credentials_file(self) -> Path | None:
    """Find existing credentials file in configured paths.

    Returns:
        Path to credentials file if found, None otherwise
    """
    for path_str in self.config.storage.storage_paths:
        path = Path(path_str).expanduser()
        logger.debug("checking_credentials_path", path=str(path))
        if path.exists() and path.is_file():
            logger.info("credentials_file_found", path=str(path))
            return path
        else:
            logger.debug("credentials_path_not_found", path=str(path))

    logger.warning(
        "no_credentials_files_found",
        searched_paths=self.config.storage.storage_paths,
    )
    return None

load async

load()

Load credentials from storage.

Returns:

Type Description
ClaudeCredentials | None

Credentials if found and valid, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def load(self) -> ClaudeCredentials | None:
    """Load credentials from storage.

    Returns:
        Credentials if found and valid, None otherwise
    """
    try:
        return await self.storage.load()
    except Exception as e:
        logger.error("credentials_load_failed", error=str(e))
        return None

save async

save(credentials)

Save credentials to storage.

Parameters:

Name Type Description Default
credentials ClaudeCredentials

Credentials to save

required

Returns:

Type Description
bool

True if saved successfully, False otherwise

Source code in ccproxy/services/credentials/manager.py
async def save(self, credentials: ClaudeCredentials) -> bool:
    """Save credentials to storage.

    Args:
        credentials: Credentials to save

    Returns:
        True if saved successfully, False otherwise
    """
    try:
        return await self.storage.save(credentials)
    except Exception as e:
        logger.error("credentials_save_failed", error=str(e))
        return False

login async

login()

Perform OAuth login and save credentials.

Returns:

Type Description
ClaudeCredentials

New credentials from login

Raises:

Type Description
OAuthLoginError

If login fails

Source code in ccproxy/services/credentials/manager.py
async def login(self) -> ClaudeCredentials:
    """Perform OAuth login and save credentials.

    Returns:
        New credentials from login

    Raises:
        OAuthLoginError: If login fails
    """
    if self._oauth_client is None:
        raise RuntimeError("OAuth client not initialized")
    credentials = await self._oauth_client.login()

    # Fetch and save user profile after successful login
    try:
        profile = await self._oauth_client.fetch_user_profile(
            credentials.claude_ai_oauth.access_token
        )
        if profile:
            # Save profile data
            await self._save_account_profile(profile)

            # Update subscription type based on profile
            determined_subscription = self._determine_subscription_type(profile)
            credentials.claude_ai_oauth.subscription_type = determined_subscription

            logger.debug(
                "subscription_type_set", subscription_type=determined_subscription
            )
        else:
            logger.debug(
                "profile_fetch_skipped", context="login", reason="no_profile_data"
            )
    except Exception as e:
        logger.warning("profile_fetch_failed", context="login", error=str(e))
        # Continue with login even if profile fetch fails

    await self.save(credentials)
    return credentials

get_valid_credentials async

get_valid_credentials()

Get valid credentials, refreshing if necessary.

Returns:

Type Description
ClaudeCredentials

Valid credentials

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

CredentialsExpiredError

If credentials expired and refresh fails

Source code in ccproxy/services/credentials/manager.py
async def get_valid_credentials(self) -> ClaudeCredentials:
    """Get valid credentials, refreshing if necessary.

    Returns:
        Valid credentials

    Raises:
        CredentialsNotFoundError: If no credentials found
        CredentialsExpiredError: If credentials expired and refresh fails
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError("No credentials found. Please login first.")

    # Check if token needs refresh
    oauth_token = credentials.claude_ai_oauth
    should_refresh = self._should_refresh_token(oauth_token)

    if should_refresh:
        async with self._refresh_lock:
            # Re-check if refresh is still needed after acquiring lock
            # Another request might have already refreshed the token
            credentials = await self.load()
            if not credentials:
                raise CredentialsNotFoundError(
                    "No credentials found. Please login first."
                )

            oauth_token = credentials.claude_ai_oauth
            should_refresh = self._should_refresh_token(oauth_token)

            if should_refresh:
                logger.info(
                    "token_refresh_start", reason="expired_or_expiring_soon"
                )
                try:
                    credentials = await self._refresh_token_with_profile(
                        credentials
                    )
                except Exception as e:
                    logger.error(
                        "token_refresh_failed", error=str(e), exc_info=True
                    )
                    if oauth_token.is_expired:
                        raise CredentialsExpiredError(
                            "Token expired and refresh failed. Please login again."
                        ) from e
                    # If not expired yet but refresh failed, return existing token
                    logger.warning(
                        "token_refresh_fallback",
                        reason="refresh_failed_but_token_not_expired",
                    )

    return credentials

get_access_token async

get_access_token()

Get valid access token, refreshing if necessary.

Returns:

Type Description
str

Access token string

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

CredentialsExpiredError

If credentials expired and refresh fails

Source code in ccproxy/services/credentials/manager.py
async def get_access_token(self) -> str:
    """Get valid access token, refreshing if necessary.

    Returns:
        Access token string

    Raises:
        CredentialsNotFoundError: If no credentials found
        CredentialsExpiredError: If credentials expired and refresh fails
    """
    credentials = await self.get_valid_credentials()
    return credentials.claude_ai_oauth.access_token

refresh_token async

refresh_token()

Refresh the access token without checking expiration.

This method directly refreshes the token regardless of whether it's expired. Useful for force-refreshing tokens or testing.

Returns:

Type Description
ClaudeCredentials

Updated credentials with new token

Raises:

Type Description
CredentialsNotFoundError

If no credentials found

RuntimeError

If OAuth client not initialized

ValueError

If no refresh token available

Exception

If token refresh fails

Source code in ccproxy/services/credentials/manager.py
async def refresh_token(self) -> ClaudeCredentials:
    """Refresh the access token without checking expiration.

    This method directly refreshes the token regardless of whether it's expired.
    Useful for force-refreshing tokens or testing.

    Returns:
        Updated credentials with new token

    Raises:
        CredentialsNotFoundError: If no credentials found
        RuntimeError: If OAuth client not initialized
        ValueError: If no refresh token available
        Exception: If token refresh fails
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError("No credentials found. Please login first.")

    logger.info("token_refresh_start", reason="forced")
    return await self._refresh_token_with_profile(credentials)

fetch_user_profile async

fetch_user_profile()

Fetch user profile information.

Returns:

Type Description
UserProfile | None

UserProfile if successful, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def fetch_user_profile(self) -> UserProfile | None:
    """Fetch user profile information.

    Returns:
        UserProfile if successful, None otherwise
    """
    try:
        credentials = await self.get_valid_credentials()
        if self._oauth_client is None:
            raise RuntimeError("OAuth client not initialized")
        profile = await self._oauth_client.fetch_user_profile(
            credentials.claude_ai_oauth.access_token,
        )
        return profile
    except Exception as e:
        logger.error(
            "user_profile_fetch_failed",
            error=str(e),
            exc_info=True,
        )
        return None

get_account_profile async

get_account_profile()

Get saved account profile information.

Returns:

Type Description
UserProfile | None

UserProfile if available, None otherwise

Source code in ccproxy/services/credentials/manager.py
async def get_account_profile(self) -> UserProfile | None:
    """Get saved account profile information.

    Returns:
        UserProfile if available, None otherwise
    """
    return await self._load_account_profile()

validate async

validate()

Validate current credentials.

Returns:

Type Description
ValidationResult

ValidationResult with credentials status and details

Source code in ccproxy/services/credentials/manager.py
async def validate(self) -> ValidationResult:
    """Validate current credentials.

    Returns:
        ValidationResult with credentials status and details
    """
    credentials = await self.load()
    if not credentials:
        raise CredentialsNotFoundError()

    return ValidationResult(
        valid=True,
        expired=credentials.claude_ai_oauth.is_expired,
        credentials=credentials,
        path=self.storage.get_location(),
    )

logout async

logout()

Delete stored credentials.

Returns:

Type Description
bool

True if deleted successfully, False otherwise

Source code in ccproxy/services/credentials/manager.py
async def logout(self) -> bool:
    """Delete stored credentials.

    Returns:
        True if deleted successfully, False otherwise
    """
    try:
        # Delete both credentials and account profile
        success = await self.storage.delete()
        await self._delete_account_profile()
        return success
    except Exception as e:
        logger.error("credentials_delete_failed", error=str(e), exc_info=True)
        return False

OAuthClient

OAuthClient(config=None)

OAuth client for handling Anthropic OAuth flows.

Parameters:

Name Type Description Default
config OAuthSettings | None

OAuth configuration, uses default if not provided

None
Source code in ccproxy/services/credentials/oauth_client.py
def __init__(self, config: OAuthSettings | None = None):
    """Initialize OAuth client.

    Args:
        config: OAuth configuration, uses default if not provided
    """
    self.config = config or OAuthConfig()

generate_pkce_pair

generate_pkce_pair()

Generate PKCE code verifier and challenge pair.

Returns:

Type Description
tuple[str, str]

Tuple of (code_verifier, code_challenge)

Source code in ccproxy/services/credentials/oauth_client.py
def generate_pkce_pair(self) -> tuple[str, str]:
    """Generate PKCE code verifier and challenge pair.

    Returns:
        Tuple of (code_verifier, code_challenge)
    """
    # Generate code verifier (43-128 characters, URL-safe)
    code_verifier = secrets.token_urlsafe(96)  # 128 base64url chars

    # For now, use plain method (Anthropic supports this)
    # In production, should use SHA256 method
    code_challenge = code_verifier

    return code_verifier, code_challenge

build_authorization_url

build_authorization_url(state, code_challenge)

Build authorization URL for OAuth flow.

Parameters:

Name Type Description Default
state str

State parameter for CSRF protection

required
code_challenge str

PKCE code challenge

required

Returns:

Type Description
str

Authorization URL

Source code in ccproxy/services/credentials/oauth_client.py
def build_authorization_url(self, state: str, code_challenge: str) -> str:
    """Build authorization URL for OAuth flow.

    Args:
        state: State parameter for CSRF protection
        code_challenge: PKCE code challenge

    Returns:
        Authorization URL
    """
    params = {
        "response_type": "code",
        "client_id": self.config.client_id,
        "redirect_uri": self.config.redirect_uri,
        "scope": " ".join(self.config.scopes),
        "state": state,
        "code_challenge": code_challenge,
        "code_challenge_method": "plain",  # Using plain for simplicity
    }

    query_string = urllib.parse.urlencode(params)
    return f"{self.config.authorize_url}?{query_string}"

exchange_code_for_tokens async

exchange_code_for_tokens(authorization_code, code_verifier)

Exchange authorization code for access tokens.

Parameters:

Name Type Description Default
authorization_code str

Authorization code from callback

required
code_verifier str

PKCE code verifier

required

Returns:

Type Description
OAuthTokenResponse

Token response

Raises:

Type Description
HTTPError

If token exchange fails

Source code in ccproxy/services/credentials/oauth_client.py
async def exchange_code_for_tokens(
    self,
    authorization_code: str,
    code_verifier: str,
) -> OAuthTokenResponse:
    """Exchange authorization code for access tokens.

    Args:
        authorization_code: Authorization code from callback
        code_verifier: PKCE code verifier

    Returns:
        Token response

    Raises:
        httpx.HTTPError: If token exchange fails
    """
    token_request = OAuthTokenRequest(
        code=authorization_code,
        redirect_uri=self.config.redirect_uri,
        client_id=self.config.client_id,
        code_verifier=code_verifier,
    )

    headers = {
        "Content-Type": "application/json",
        "anthropic-beta": self.config.beta_version,
        "User-Agent": self.config.user_agent,
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(
            self.config.token_url,
            headers=headers,
            json=token_request.model_dump(),
            timeout=self.config.request_timeout,
        )

        if response.status_code != 200:
            _log_http_error_compact("Token exchange", response)
            response.raise_for_status()

        data = response.json()
        return OAuthTokenResponse.model_validate(data)

refresh_access_token async

refresh_access_token(refresh_token)

Refresh access token using refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token

required

Returns:

Type Description
OAuthTokenResponse

New token response

Raises:

Type Description
HTTPError

If token refresh fails

Source code in ccproxy/services/credentials/oauth_client.py
async def refresh_access_token(self, refresh_token: str) -> OAuthTokenResponse:
    """Refresh access token using refresh token.

    Args:
        refresh_token: Refresh token

    Returns:
        New token response

    Raises:
        httpx.HTTPError: If token refresh fails
    """
    refresh_request = {
        "grant_type": "refresh_token",
        "refresh_token": refresh_token,
        "client_id": self.config.client_id,
    }

    headers = {
        "Content-Type": "application/json",
        "anthropic-beta": self.config.beta_version,
        "User-Agent": self.config.user_agent,
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(
            self.config.token_url,
            headers=headers,
            json=refresh_request,
            timeout=self.config.request_timeout,
        )

        if response.status_code != 200:
            _log_http_error_compact("Token refresh", response)
            response.raise_for_status()

        data = response.json()
        return OAuthTokenResponse.model_validate(data)

refresh_token async

refresh_token(refresh_token)

Refresh token using refresh token - compatibility method for tests.

Parameters:

Name Type Description Default
refresh_token str

Refresh token

required

Returns:

Type Description
OAuthToken

New OAuth token

Raises:

Type Description
OAuthTokenRefreshError

If token refresh fails

Source code in ccproxy/services/credentials/oauth_client.py
async def refresh_token(self, refresh_token: str) -> "OAuthToken":
    """Refresh token using refresh token - compatibility method for tests.

    Args:
        refresh_token: Refresh token

    Returns:
        New OAuth token

    Raises:
        OAuthTokenRefreshError: If token refresh fails
    """
    from datetime import UTC, datetime

    from ccproxy.auth.exceptions import OAuthTokenRefreshError
    from ccproxy.auth.models import OAuthToken

    try:
        token_response = await self.refresh_access_token(refresh_token)

        expires_in = (
            token_response.expires_in if token_response.expires_in else 3600
        )

        # Convert to OAuthToken format expected by tests
        expires_at_ms = int((datetime.now(UTC).timestamp() + expires_in) * 1000)

        return OAuthToken(
            accessToken=token_response.access_token,
            refreshToken=token_response.refresh_token or refresh_token,
            expiresAt=expires_at_ms,
            scopes=token_response.scope.split() if token_response.scope else [],
            subscriptionType="pro",  # Default value
        )
    except Exception as e:
        raise OAuthTokenRefreshError(f"Token refresh failed: {e}") from e

fetch_user_profile async

fetch_user_profile(access_token)

Fetch user profile information using access token.

Parameters:

Name Type Description Default
access_token str

Valid OAuth access token

required

Returns:

Type Description
UserProfile | None

User profile information

Raises:

Type Description
HTTPError

If profile fetch fails

Source code in ccproxy/services/credentials/oauth_client.py
async def fetch_user_profile(self, access_token: str) -> UserProfile | None:
    """Fetch user profile information using access token.

    Args:
        access_token: Valid OAuth access token

    Returns:
        User profile information

    Raises:
        httpx.HTTPError: If profile fetch fails
    """
    from ccproxy.auth.models import UserProfile

    headers = {
        "Authorization": f"Bearer {access_token}",
        "anthropic-beta": self.config.beta_version,
        "User-Agent": self.config.user_agent,
        "Content-Type": "application/json",
    }

    # Use the profile url
    async with httpx.AsyncClient() as client:
        response = await client.get(
            self.config.profile_url,
            headers=headers,
            timeout=self.config.request_timeout,
        )

        if response.status_code == 404:
            # Userinfo endpoint not available - this is expected for some OAuth providers
            logger.debug(
                "userinfo_endpoint_unavailable", endpoint=self.config.profile_url
            )
            return None
        elif response.status_code != 200:
            _log_http_error_compact("Profile fetch", response)
            response.raise_for_status()

        data = response.json()
        logger.debug("user_profile_fetched", endpoint=self.config.profile_url)
        return UserProfile.model_validate(data)

login async

login()

Perform OAuth login flow.

Returns:

Type Description
ClaudeCredentials

ClaudeCredentials with OAuth token

Raises:

Type Description
OAuthLoginError

If login fails

OAuthCallbackError

If callback processing fails

Source code in ccproxy/services/credentials/oauth_client.py
async def login(self) -> ClaudeCredentials:
    """Perform OAuth login flow.

    Returns:
        ClaudeCredentials with OAuth token

    Raises:
        OAuthLoginError: If login fails
        OAuthCallbackError: If callback processing fails
    """
    # Generate state parameter for security
    state = secrets.token_urlsafe(32)

    # Generate PKCE parameters
    code_verifier = secrets.token_urlsafe(32)
    code_challenge = (
        base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
        .decode()
        .rstrip("=")
    )

    authorization_code = None
    error = None

    class OAuthCallbackHandler(BaseHTTPRequestHandler):
        def do_GET(self) -> None:  # noqa: N802
            nonlocal authorization_code, error

            # Ignore favicon requests
            if self.path == "/favicon.ico":
                self.send_response(404)
                self.end_headers()
                return

            parsed_url = urlparse(self.path)
            query_params = parse_qs(parsed_url.query)

            # Check state parameter
            received_state = query_params.get("state", [None])[0]

            if received_state != state:
                error = "Invalid state parameter"
                self.send_response(400)
                self.end_headers()
                self.wfile.write(b"Error: Invalid state parameter")
                return

            # Check for authorization code
            if "code" in query_params:
                authorization_code = query_params["code"][0]
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"Login successful! You can close this window.")
            elif "error" in query_params:
                error = query_params.get("error_description", ["Unknown error"])[0]
                self.send_response(400)
                self.end_headers()
                self.wfile.write(f"Error: {error}".encode())
            else:
                error = "No authorization code received"
                self.send_response(400)
                self.end_headers()
                self.wfile.write(b"Error: No authorization code received")

        def log_message(self, format: str, *args: Any) -> None:
            # Suppress HTTP server logs
            pass

    # Start local HTTP server for OAuth callback
    server = HTTPServer(
        ("localhost", self.config.callback_port), OAuthCallbackHandler
    )
    server_thread = Thread(target=server.serve_forever)
    server_thread.daemon = True
    server_thread.start()

    try:
        # Build authorization URL
        auth_params = {
            "response_type": "code",
            "client_id": self.config.client_id,
            "redirect_uri": self.config.redirect_uri,
            "scope": " ".join(self.config.scopes),
            "state": state,
            "code_challenge": code_challenge,
            "code_challenge_method": "S256",
        }

        auth_url = (
            f"{self.config.authorize_url}?{urllib.parse.urlencode(auth_params)}"
        )

        logger.info("oauth_browser_opening", auth_url=auth_url)
        logger.info(
            "oauth_manual_url",
            message="If browser doesn't open, visit this URL",
            auth_url=auth_url,
        )

        # Open browser
        webbrowser.open(auth_url)

        # Wait for callback (with timeout)
        import time

        start_time = time.time()

        while authorization_code is None and error is None:
            if time.time() - start_time > self.config.callback_timeout:
                error = "Login timeout"
                break
            await asyncio.sleep(0.1)

        if error:
            raise OAuthCallbackError(f"OAuth callback failed: {error}")

        if not authorization_code:
            raise OAuthLoginError("No authorization code received")

        # Exchange authorization code for tokens
        token_data = {
            "grant_type": "authorization_code",
            "code": authorization_code,
            "redirect_uri": self.config.redirect_uri,
            "client_id": self.config.client_id,
            "code_verifier": code_verifier,
            "state": state,
        }

        headers = {
            "Content-Type": "application/json",
            "anthropic-beta": self.config.beta_version,
            "User-Agent": self.config.user_agent,
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.config.token_url,
                headers=headers,
                json=token_data,
                timeout=30.0,
            )

        if response.status_code == 200:
            result = response.json()

            # Calculate expires_at from expires_in
            expires_in = result.get("expires_in")
            expires_at = None
            if expires_in:
                expires_at = int(
                    (datetime.now(UTC).timestamp() + expires_in) * 1000
                )

            # Create credentials object
            oauth_data = {
                "accessToken": result.get("access_token"),
                "refreshToken": result.get("refresh_token"),
                "expiresAt": expires_at,
                "scopes": result.get("scope", "").split()
                if result.get("scope")
                else self.config.scopes,
                "subscriptionType": result.get("subscription_type", "unknown"),
            }

            credentials = ClaudeCredentials(claudeAiOauth=OAuthToken(**oauth_data))

            logger.info("oauth_login_completed", client_id=self.config.client_id)
            return credentials

        else:
            # Use compact logging for the error message
            import os

            verbose_api = (
                os.environ.get("CCPROXY_VERBOSE_API", "false").lower() == "true"
            )

            if verbose_api:
                error_detail = response.text
            else:
                response_text = response.text
                if len(response_text) > 200:
                    error_detail = f"{response_text[:100]}...{response_text[-50:]}"
                elif len(response_text) > 100:
                    error_detail = f"{response_text[:100]}..."
                else:
                    error_detail = response_text

            raise OAuthLoginError(
                f"Token exchange failed: {response.status_code} - {error_detail}"
            )

    except Exception as e:
        if isinstance(e, OAuthLoginError | OAuthCallbackError):
            raise
        raise OAuthLoginError(f"OAuth login failed: {e}") from e

    finally:
        # Stop the HTTP server
        server.shutdown()
        server_thread.join(timeout=1)