Skip to content

ccproxy.auth.oauth.router

ccproxy.auth.oauth.router

Central OAuth router that delegates to plugin providers.

This module provides unified OAuth endpoints that dynamically route to the appropriate plugin-based OAuth provider.

OAuthProvidersResponse

Bases: BaseModel

Response for listing OAuth providers.

OAuthLoginResponse

Bases: BaseModel

Response for OAuth login initiation.

OAuthErrorResponse

Bases: BaseModel

Response for OAuth errors.

list_oauth_providers async

list_oauth_providers(request)

List all available OAuth providers.

Returns:

Type Description
OAuthProvidersResponse

Dictionary of available OAuth providers with their information

Source code in ccproxy/auth/oauth/router.py
@oauth_router.get("/providers", response_model=OAuthProvidersResponse)
async def list_oauth_providers(request: Request) -> OAuthProvidersResponse:
    """List all available OAuth providers.

    Returns:
        Dictionary of available OAuth providers with their information
    """
    # Get registry from app state (app-scoped)
    registry = getattr(request.app.state, "oauth_registry", None)
    if registry is None:
        raise HTTPException(status_code=503, detail="OAuth registry not initialized")
    providers = registry.list()

    logger.info("oauth_providers_listed", count=len(providers), category="auth")

    return OAuthProvidersResponse(providers=providers)

initiate_oauth_login async

initiate_oauth_login(
    request,
    provider,
    redirect_uri=Query(
        None, description="Optional redirect URI override"
    ),
    scopes=Query(
        None,
        description="Optional scope override (comma-separated)",
    ),
)

Initiate OAuth login flow for a specific provider.

Parameters:

Name Type Description Default
provider str

Provider name (e.g., 'claude-api', 'codex')

required
redirect_uri str | None

Optional redirect URI override

Query(None, description='Optional redirect URI override')
scopes str | None

Optional scope override

Query(None, description='Optional scope override (comma-separated)')

Returns:

Type Description
RedirectResponse

Redirect to provider's authorization URL

Raises:

Type Description
HTTPException

If provider not found or error generating auth URL

Source code in ccproxy/auth/oauth/router.py
@oauth_router.get("/{provider}/login")
async def initiate_oauth_login(
    request: Request,
    provider: str,
    redirect_uri: str | None = Query(
        None, description="Optional redirect URI override"
    ),
    scopes: str | None = Query(
        None, description="Optional scope override (comma-separated)"
    ),
) -> RedirectResponse:
    """Initiate OAuth login flow for a specific provider.

    Args:
        provider: Provider name (e.g., 'claude-api', 'codex')
        redirect_uri: Optional redirect URI override
        scopes: Optional scope override

    Returns:
        Redirect to provider's authorization URL

    Raises:
        HTTPException: If provider not found or error generating auth URL
    """
    registry = getattr(request.app.state, "oauth_registry", None)
    if registry is None:
        raise HTTPException(status_code=503, detail="OAuth registry not initialized")
    oauth_provider = registry.get(provider)

    if not oauth_provider:
        logger.error("oauth_provider_not_found", provider=provider, category="auth")
        raise HTTPException(
            status_code=404,
            detail=f"OAuth provider '{provider}' not found",
        )

    # Generate OAuth state for CSRF protection
    state = secrets.token_urlsafe(32)

    # Generate PKCE code verifier if provider supports it
    code_verifier = None
    if oauth_provider.supports_pkce:
        # Generate PKCE pair
        code_verifier = (
            base64.urlsafe_b64encode(secrets.token_bytes(32))
            .decode("utf-8")
            .rstrip("=")
        )

    # Store OAuth session data
    session_manager = get_oauth_session_manager()
    session_data = {
        "provider": provider,
        "state": state,
        "redirect_uri": redirect_uri,
        "scopes": scopes.split(",") if scopes else None,
    }
    if code_verifier:
        session_data["code_verifier"] = code_verifier

    await session_manager.create_session(state, session_data)

    try:
        # Get authorization URL from provider
        auth_url = await oauth_provider.get_authorization_url(state, code_verifier)

        logger.info(
            "oauth_login_initiated",
            provider=provider,
            state=state,
            has_pkce=bool(code_verifier),
            category="auth",
        )

        # Redirect to provider's authorization page
        return RedirectResponse(url=auth_url, status_code=302)

    except Exception as e:
        logger.error(
            "oauth_login_error",
            provider=provider,
            error=str(e),
            exc_info=e,
            category="auth",
        )
        await session_manager.delete_session(state)
        raise HTTPException(
            status_code=500,
            detail=f"Failed to initiate OAuth login: {str(e)}",
        ) from e

handle_oauth_callback async

handle_oauth_callback(
    provider,
    request,
    code=Query(None, description="Authorization code"),
    state=Query(None, description="OAuth state"),
    error=Query(None, description="OAuth error"),
    error_description=Query(
        None, description="Error description"
    ),
)

Handle OAuth callback from provider.

Parameters:

Name Type Description Default
provider str

Provider name

required
request Request

FastAPI request

required
code str | None

Authorization code from provider

Query(None, description='Authorization code')
state str | None

OAuth state for validation

Query(None, description='OAuth state')
error str | None

OAuth error code

Query(None, description='OAuth error')
error_description str | None

OAuth error description

Query(None, description='Error description')

Returns:

Type Description
HTMLResponse

HTML response with success or error message

Raises:

Type Description
HTTPException

If provider not found or callback handling fails

Source code in ccproxy/auth/oauth/router.py
@oauth_router.get("/{provider}/callback")
async def handle_oauth_callback(
    provider: str,
    request: Request,
    code: str | None = Query(None, description="Authorization code"),
    state: str | None = Query(None, description="OAuth state"),
    error: str | None = Query(None, description="OAuth error"),
    error_description: str | None = Query(None, description="Error description"),
) -> HTMLResponse:
    """Handle OAuth callback from provider.

    Args:
        provider: Provider name
        request: FastAPI request
        code: Authorization code from provider
        state: OAuth state for validation
        error: OAuth error code
        error_description: OAuth error description

    Returns:
        HTML response with success or error message

    Raises:
        HTTPException: If provider not found or callback handling fails
    """
    # Handle OAuth errors
    if error:
        logger.error(
            "oauth_callback_error",
            provider=provider,
            error=error,
            error_description=error_description,
            category="auth",
        )

        return OAuthTemplates.callback_error(
            error=error,
            error_description=error_description,
        )

    # Validate required parameters
    if not code or not state:
        logger.error(
            "oauth_callback_missing_params",
            provider=provider,
            has_code=bool(code),
            has_state=bool(state),
            category="auth",
        )
        return OAuthTemplates.error(
            error_message="No authorization code was received.",
            title="Missing Authorization Code",
            error_detail="The OAuth server did not provide an authorization code. Please try again.",
            status_code=400,
        )

    # Get OAuth session
    session_manager = get_oauth_session_manager()
    session_data = await session_manager.get_session(state)

    if not session_data:
        logger.error(
            "oauth_callback_invalid_state",
            provider=provider,
            state=state,
            category="auth",
        )
        return OAuthTemplates.error(
            error_message="The authentication state is invalid or has expired.",
            title="Invalid State",
            error_detail="This may indicate a CSRF attack or an expired authentication session. Please start the authentication process again.",
            status_code=400,
        )

    # Validate provider matches
    if session_data.get("provider") != provider:
        logger.error(
            "oauth_callback_provider_mismatch",
            expected=session_data.get("provider"),
            actual=provider,
            category="auth",
        )
        await session_manager.delete_session(state)
        return OAuthTemplates.error(
            error_message="Provider mismatch in OAuth callback",
        )

    # Get provider instance
    registry = getattr(request.app.state, "oauth_registry", None)
    if registry is None:
        raise HTTPException(status_code=503, detail="OAuth registry not initialized")
    oauth_provider = registry.get(provider)

    if not oauth_provider:
        logger.error("oauth_provider_not_found", provider=provider, category="auth")
        await session_manager.delete_session(state)
        raise HTTPException(
            status_code=404,
            detail=f"OAuth provider '{provider}' not found",
        )

    try:
        # Exchange code for tokens
        code_verifier = session_data.get("code_verifier")
        credentials = await oauth_provider.handle_callback(code, state, code_verifier)

        # Clean up session
        await session_manager.delete_session(state)

        logger.info(
            "oauth_callback_success",
            provider=provider,
            has_credentials=bool(credentials),
            category="auth",
        )

        # Return success page
        return OAuthTemplates.success(
            message="Authentication successful! You can close this window.",
        )

    except Exception as e:
        logger.error(
            "oauth_callback_exchange_error",
            provider=provider,
            error=str(e),
            exc_info=e,
            category="auth",
        )
        await session_manager.delete_session(state)

        return OAuthTemplates.error(
            error_message="Failed to exchange authorization code for tokens.",
            title="Token Exchange Failed",
            error_detail=str(e),
            status_code=500,
        )

refresh_oauth_token async

refresh_oauth_token(request, provider, refresh_token)

Refresh OAuth access token.

Parameters:

Name Type Description Default
provider str

Provider name

required
refresh_token str

Refresh token

required

Returns:

Type Description
JSONResponse

New token response

Raises:

Type Description
HTTPException

If provider not found or refresh fails

Source code in ccproxy/auth/oauth/router.py
@oauth_router.post("/{provider}/refresh")
async def refresh_oauth_token(
    request: Request,
    provider: str,
    refresh_token: str,
) -> JSONResponse:
    """Refresh OAuth access token.

    Args:
        provider: Provider name
        refresh_token: Refresh token

    Returns:
        New token response

    Raises:
        HTTPException: If provider not found or refresh fails
    """
    registry = getattr(request.app.state, "oauth_registry", None)
    if registry is None:
        raise HTTPException(status_code=503, detail="OAuth registry not initialized")
    oauth_provider = registry.get(provider)

    if not oauth_provider:
        logger.error("oauth_provider_not_found", provider=provider, category="auth")
        raise HTTPException(
            status_code=404,
            detail=f"OAuth provider '{provider}' not found",
        )

    try:
        new_tokens = await oauth_provider.refresh_access_token(refresh_token)

        logger.info("oauth_token_refreshed", provider=provider, category="auth")

        return JSONResponse(content=new_tokens, status_code=200)

    except Exception as e:
        logger.error(
            "oauth_refresh_error",
            provider=provider,
            error=str(e),
            exc_info=e,
            category="auth",
        )
        raise HTTPException(
            status_code=500,
            detail=f"Failed to refresh token: {str(e)}",
        ) from e

revoke_oauth_token async

revoke_oauth_token(request, provider, token)

Revoke an OAuth token.

Parameters:

Name Type Description Default
provider str

Provider name

required
token str

Token to revoke

required

Returns:

Type Description
Response

Empty response on success

Raises:

Type Description
HTTPException

If provider not found or revocation fails

Source code in ccproxy/auth/oauth/router.py
@oauth_router.post("/{provider}/revoke")
async def revoke_oauth_token(
    request: Request,
    provider: str,
    token: str,
) -> Response:
    """Revoke an OAuth token.

    Args:
        provider: Provider name
        token: Token to revoke

    Returns:
        Empty response on success

    Raises:
        HTTPException: If provider not found or revocation fails
    """
    registry = getattr(request.app.state, "oauth_registry", None)
    if registry is None:
        raise HTTPException(status_code=503, detail="OAuth registry not initialized")
    oauth_provider = registry.get(provider)

    if not oauth_provider:
        logger.error("oauth_provider_not_found", provider=provider, category="auth")
        raise HTTPException(
            status_code=404,
            detail=f"OAuth provider '{provider}' not found",
        )

    try:
        await oauth_provider.revoke_token(token)

        logger.info("oauth_token_revoked", provider=provider, category="auth")

        return Response(status_code=204)

    except Exception as e:
        logger.error(
            "oauth_revoke_error",
            provider=provider,
            error=str(e),
            exc_info=e,
            category="auth",
        )
        raise HTTPException(
            status_code=500,
            detail=f"Failed to revoke token: {str(e)}",
        ) from e