Skip to content

ccproxy.auth.oauth.base

ccproxy.auth.oauth.base

Base OAuth client with common PKCE flow implementation.

BaseOAuthClient

BaseOAuthClient(
    client_id,
    redirect_uri,
    base_url,
    scopes,
    storage=None,
    http_client=None,
    hook_manager=None,
    settings=None,
)

Bases: ABC, Generic[CredentialsT]

Abstract base class for OAuth PKCE flow implementations.

Parameters:

Name Type Description Default
client_id str

OAuth client ID

required
redirect_uri str

OAuth callback redirect URI

required
base_url str

OAuth provider base URL

required
scopes list[str]

List of OAuth scopes to request

required
storage TokenStorage[CredentialsT] | None

Optional token storage backend

None
http_client AsyncClient | None

Optional HTTP client (for request tracing support)

None
hook_manager Any | None

Optional hook manager for emitting events

None
settings Settings | None

Optional settings for HTTP client configuration

None
Source code in ccproxy/auth/oauth/base.py
def __init__(
    self,
    client_id: str,
    redirect_uri: str,
    base_url: str,
    scopes: list[str],
    storage: TokenStorage[CredentialsT] | None = None,
    http_client: httpx.AsyncClient | None = None,
    hook_manager: Any | None = None,
    settings: Settings | None = None,
):
    """Initialize OAuth client with common parameters.

    Args:
        client_id: OAuth client ID
        redirect_uri: OAuth callback redirect URI
        base_url: OAuth provider base URL
        scopes: List of OAuth scopes to request
        storage: Optional token storage backend
        http_client: Optional HTTP client (for request tracing support)
        hook_manager: Optional hook manager for emitting events
        settings: Optional settings for HTTP client configuration
    """
    self.client_id = client_id
    self.redirect_uri = redirect_uri
    self.base_url = base_url
    self.scopes = scopes
    self.storage = storage
    self.hook_manager = hook_manager

    # Always have an HTTP client
    if http_client:
        self.http_client = http_client
        self._owns_http_client = False  # Don't close provided client
        logger.debug(
            "oauth_client_using_provided_http_client",
            http_client_id=id(http_client),
            has_hooks=hasattr(http_client, "hook_manager")
            and http_client.hook_manager is not None,
            hook_manager_id=id(hook_manager) if hook_manager else None,
        )
    else:
        # Create client with hook support if hook_manager is provided
        self.http_client = HTTPClientFactory.create_client(
            settings=settings,
            timeout_connect=10.0,
            timeout_read=30.0,
            http2=True,
            hook_manager=hook_manager,  # Pass hook manager to client
        )
        self._owns_http_client = True  # We own it, close on cleanup
        logger.debug(
            "oauth_client_created_new_http_client",
            http_client_id=id(self.http_client),
            has_hooks=hasattr(self.http_client, "hook_manager")
            and self.http_client.hook_manager is not None,
            hook_manager_id=id(hook_manager) if hook_manager else None,
        )

    self._callback_server: asyncio.Task[None] | None = None
    self._auth_complete = asyncio.Event()
    self._auth_result: Any | None = None
    self._auth_error: str | None = None

close async

close()

Close resources if we own them.

Source code in ccproxy/auth/oauth/base.py
async def close(self) -> None:
    """Close resources if we own them."""
    if self._owns_http_client and self.http_client:
        await self.http_client.aclose()

parse_token_response abstractmethod async

parse_token_response(data)

Parse provider-specific token response into credentials.

Parameters:

Name Type Description Default
data dict[str, Any]

Raw token response from provider

required

Returns:

Type Description
CredentialsT

Provider-specific credentials object

Source code in ccproxy/auth/oauth/base.py
@abstractmethod
async def parse_token_response(self, data: dict[str, Any]) -> CredentialsT:
    """Parse provider-specific token response into credentials.

    Args:
        data: Raw token response from provider

    Returns:
        Provider-specific credentials object
    """
    pass

get_custom_auth_params

get_custom_auth_params()

Get provider-specific authorization parameters.

Override this to add custom parameters to auth URL.

Returns:

Type Description
dict[str, str]

Dictionary of custom parameters (empty by default)

Source code in ccproxy/auth/oauth/base.py
def get_custom_auth_params(self) -> dict[str, str]:
    """Get provider-specific authorization parameters.

    Override this to add custom parameters to auth URL.

    Returns:
        Dictionary of custom parameters (empty by default)
    """
    return {}

get_custom_token_params

get_custom_token_params()

Get provider-specific token exchange parameters.

Override this to add custom parameters to token request.

Returns:

Type Description
dict[str, str]

Dictionary of custom parameters (empty by default)

Source code in ccproxy/auth/oauth/base.py
def get_custom_token_params(self) -> dict[str, str]:
    """Get provider-specific token exchange parameters.

    Override this to add custom parameters to token request.

    Returns:
        Dictionary of custom parameters (empty by default)
    """
    return {}

get_custom_headers

get_custom_headers()

Get provider-specific HTTP headers.

Override this to add custom headers to requests.

Returns:

Type Description
dict[str, str]

Dictionary of custom headers (empty by default)

Source code in ccproxy/auth/oauth/base.py
def get_custom_headers(self) -> dict[str, str]:
    """Get provider-specific HTTP headers.

    Override this to add custom headers to requests.

    Returns:
        Dictionary of custom headers (empty by default)
    """
    return {}

authenticate async

authenticate(code_verifier=None, state=None)

Start OAuth authentication flow.

Parameters:

Name Type Description Default
code_verifier str | None

Optional pre-generated PKCE verifier

None
state str | None

Optional pre-generated state parameter

None

Returns:

Type Description
tuple[str, str, str]

Tuple of (auth_url, code_verifier, state)

Source code in ccproxy/auth/oauth/base.py
async def authenticate(
    self, code_verifier: str | None = None, state: str | None = None
) -> tuple[str, str, str]:
    """Start OAuth authentication flow.

    Args:
        code_verifier: Optional pre-generated PKCE verifier
        state: Optional pre-generated state parameter

    Returns:
        Tuple of (auth_url, code_verifier, state)
    """
    # Generate PKCE parameters if not provided
    if not code_verifier:
        code_verifier, code_challenge = self._generate_pkce_pair()
    else:
        # Calculate challenge from provided verifier
        challenge_bytes = hashlib.sha256(code_verifier.encode()).digest()
        code_challenge = (
            base64.urlsafe_b64encode(challenge_bytes).decode().rstrip("=")
        )

    # Generate state if not provided
    if not state:
        state = self._generate_state()

    # Build authorization URL
    auth_url = self._build_auth_url(code_challenge, state)

    logger.info(
        "oauth_flow_started",
        provider=self.__class__.__name__,
        has_storage=bool(self.storage),
        scopes=self.scopes,
    )

    return auth_url, code_verifier, state

handle_callback async

handle_callback(code, state, code_verifier)

Handle OAuth callback and exchange code for tokens.

Parameters:

Name Type Description Default
code str

Authorization code from callback

required
state str

State parameter from callback

required
code_verifier str

PKCE code verifier

required

Returns:

Type Description
CredentialsT

Provider-specific credentials object

Raises:

Type Description
OAuthError

If callback handling fails

Source code in ccproxy/auth/oauth/base.py
async def handle_callback(
    self, code: str, state: str, code_verifier: str
) -> CredentialsT:
    """Handle OAuth callback and exchange code for tokens.

    Args:
        code: Authorization code from callback
        state: State parameter from callback
        code_verifier: PKCE code verifier

    Returns:
        Provider-specific credentials object

    Raises:
        OAuthError: If callback handling fails
    """
    try:
        # Exchange code for tokens
        token_response = await self._exchange_code_for_tokens(
            code, code_verifier, state
        )

        # Parse provider-specific response
        credentials: CredentialsT = await self.parse_token_response(token_response)

        # Save to storage if available
        if self.storage:
            success = await self.storage.save(credentials)
            if not success:
                logger.warning(
                    "credentials_save_failed", provider=self.__class__.__name__
                )

        logger.info(
            "oauth_callback_success",
            provider=self.__class__.__name__,
            has_refresh_token=bool(token_response.get("refresh_token")),
        )

        return credentials

    except OAuthTokenRefreshError:
        raise
    except Exception as e:
        logger.error(
            "oauth_callback_error",
            provider=self.__class__.__name__,
            error=str(e),
            exc_info=e,
        )
        raise OAuthError(f"OAuth callback failed: {e}") from e