Skip to content

ccproxy.auth.oauth.flows

ccproxy.auth.oauth.flows

OAuth flow engines for CLI authentication.

CLICallbackServer

CLICallbackServer(port, callback_path='/callback')

Temporary HTTP server for handling OAuth callbacks in CLI flows.

Parameters:

Name Type Description Default
port int

Port to bind the server to

required
callback_path str

Path to handle OAuth callbacks

'/callback'
Source code in ccproxy/auth/oauth/flows.py
def __init__(self, port: int, callback_path: str = "/callback") -> None:
    """Initialize the callback server.

    Args:
        port: Port to bind the server to
        callback_path: Path to handle OAuth callbacks
    """
    self.port = port
    self.callback_path = callback_path
    self.server: Any = None
    self._server_task: asyncio.Task[Any] | None = None
    self.callback_received = False
    self.callback_data: dict[str, Any] = {}
    self.callback_future: asyncio.Future[dict[str, Any]] | None = None

start async

start()

Start the callback server.

Source code in ccproxy/auth/oauth/flows.py
async def start(self) -> None:
    """Start the callback server."""
    import uvicorn

    # Create minimal ASGI app
    async def app(scope: dict[str, Any], receive: Any, send: Any) -> None:
        if scope["type"] == "http" and scope["path"] == self.callback_path:
            await self._handle_callback(scope, receive, send)
        else:
            # 404 for other paths
            await send(
                {
                    "type": "http.response.start",
                    "status": 404,
                    "headers": [[b"content-type", b"text/plain"]],
                }
            )
            await send(
                {
                    "type": "http.response.body",
                    "body": b"Not Found",
                }
            )

    # Create server config
    config = uvicorn.Config(
        app=app,
        host="localhost",
        port=self.port,
        log_level="error",  # Suppress uvicorn logs
    )

    # Create and start server
    self.server = uvicorn.Server(config)

    # Start server in background task with error handling
    async def _serve_with_error_handling() -> None:
        try:
            await self.server.serve()
        except (OSError, SystemExit) as e:
            # Uvicorn calls sys.exit(1) on startup errors, convert to PortBindError
            if isinstance(e, SystemExit):
                raise PortBindError(
                    f"Failed to start callback server on port {self.port}"
                ) from e
            elif e.errno == 48:  # Address already in use
                raise PortBindError(
                    f"Port {self.port} is already in use. Please close other applications using this port."
                ) from e
            else:
                raise PortBindError(
                    f"Failed to start callback server on port {self.port}: {e}"
                ) from e

    self._server_task = asyncio.create_task(_serve_with_error_handling())

    # Wait briefly and check if server started successfully
    await asyncio.sleep(0.1)
    if self._server_task.done():
        # Server failed to start, re-raise the exception
        await self._server_task

    logger.debug(
        "cli_callback_server_started", port=self.port, path=self.callback_path
    )

stop async

stop()

Stop the callback server.

Source code in ccproxy/auth/oauth/flows.py
async def stop(self) -> None:
    """Stop the callback server."""
    if self.server:
        self.server.should_exit = True
        if hasattr(self, "_server_task") and self._server_task is not None:
            try:
                await asyncio.wait_for(self._server_task, timeout=2.0)
            except TimeoutError:
                self._server_task.cancel()
        self.server = None
        logger.debug("cli_callback_server_stopped", port=self.port)

wait_for_callback async

wait_for_callback(expected_state=None, timeout=300)

Wait for OAuth callback with optional state validation.

Parameters:

Name Type Description Default
expected_state str | None

Expected OAuth state parameter for validation

None
timeout float

Timeout in seconds

300

Returns:

Type Description
dict[str, Any]

Callback data dictionary

Raises:

Type Description
TimeoutError

If callback is not received within timeout

ValueError

If state validation fails

Source code in ccproxy/auth/oauth/flows.py
async def wait_for_callback(
    self, expected_state: str | None = None, timeout: float = 300
) -> dict[str, Any]:
    """Wait for OAuth callback with optional state validation.

    Args:
        expected_state: Expected OAuth state parameter for validation
        timeout: Timeout in seconds

    Returns:
        Callback data dictionary

    Raises:
        asyncio.TimeoutError: If callback is not received within timeout
        ValueError: If state validation fails
    """
    self.callback_future = asyncio.Future()

    try:
        # Wait for callback with timeout
        callback_data = await asyncio.wait_for(
            self.callback_future, timeout=timeout
        )

        # Validate state if provided
        if expected_state and expected_state != "manual":
            received_state = callback_data.get("state")
            if received_state != expected_state:
                raise ValueError(
                    f"OAuth state mismatch: expected {expected_state}, got {received_state}"
                )

        # Check for OAuth errors
        if "error" in callback_data:
            error = callback_data.get("error")
            error_description = callback_data.get(
                "error_description", "No description provided"
            )
            raise ValueError(f"OAuth error: {error} - {error_description}")

        # Ensure we have an authorization code
        if "code" not in callback_data:
            raise ValueError("No authorization code received in callback")

        return callback_data

    except TimeoutError:
        logger.error("cli_callback_timeout", timeout=timeout, port=self.port)
        raise TimeoutError(f"No OAuth callback received within {timeout} seconds")

BrowserFlow

Browser-based OAuth flow with callback server.

run async

run(provider, no_browser, save_path=None)

Execute browser OAuth flow with fallback handling.

Source code in ccproxy/auth/oauth/flows.py
async def run(
    self,
    provider: OAuthProviderProtocol,
    no_browser: bool,
    save_path: str | Path | None = None,
) -> Any:
    """Execute browser OAuth flow with fallback handling."""
    cli_config = provider.cli

    # Try provider's fixed port
    try:
        callback_server = CLICallbackServer(
            cli_config.callback_port, cli_config.callback_path
        )
        await callback_server.start()
    except PortBindError as e:
        # Offer manual fallback for fixed-port providers
        if cli_config.fixed_redirect_uri:
            console.print(
                f"[yellow]Port {cli_config.callback_port} unavailable. Try --manual mode.[/yellow]"
            )
            raise AuthProviderError(
                f"Required port {cli_config.callback_port} unavailable"
            ) from e
        raise

    try:
        # Generate OAuth parameters with PKCE if supported
        state = secrets.token_urlsafe(32)
        code_verifier = None
        if provider.supports_pkce:
            code_verifier = (
                base64.urlsafe_b64encode(secrets.token_bytes(32))
                .decode("utf-8")
                .rstrip("=")
            )

        # Use fixed redirect URI or construct from config
        redirect_uri = (
            cli_config.fixed_redirect_uri
            or f"http://localhost:{cli_config.callback_port}{cli_config.callback_path}"
        )

        # Get authorization URL
        auth_url = await provider.get_authorization_url(
            state, code_verifier, redirect_uri
        )

        # Always show URL and QR code for fallback
        console.print(f"[bold]Visit: {auth_url}[/bold]")
        render_qr_code(auth_url)

        # Try to open browser unless explicitly disabled
        if not no_browser:
            try:
                webbrowser.open(auth_url)
                console.print("[dim]Opening browser...[/dim]")
            except Exception:
                console.print(
                    "[yellow]Could not open browser automatically[/yellow]"
                )

        # Wait for callback with timeout and state validation
        try:
            callback_data = await callback_server.wait_for_callback(
                state, timeout=300
            )
            credentials = await provider.handle_callback(
                callback_data["code"], state, code_verifier, redirect_uri
            )
            return await provider.save_credentials(credentials, save_path)
        except TimeoutError:
            # Fallback to manual code entry if callback times out
            console.print(
                "[yellow]Callback timed out. You can enter the code manually.[/yellow]"
            )
            if cli_config.supports_manual_code:
                # Use provider-specific manual redirect URI or fallback to OOB
                manual_redirect_uri = (
                    cli_config.manual_redirect_uri or "urn:ietf:wg:oauth:2.0:oob"
                )
                manual_auth_url = await provider.get_authorization_url(
                    state, code_verifier, manual_redirect_uri
                )
                console.print(f"[bold]Manual URL: {manual_auth_url}[/bold]")

                import typer

                raw_code = typer.prompt("Enter the authorization code")

                # Parse the code - some providers (like Claude) return code#state format
                # Extract the code and state parts
                code_parts = raw_code.split("#")
                code = code_parts[0].strip()

                # If there's a state in the input (Claude format), use it instead of our generated state
                if len(code_parts) > 1 and code_parts[1].strip():
                    actual_state = code_parts[1].strip()
                else:
                    actual_state = state

                credentials = await provider.handle_callback(
                    code, actual_state, code_verifier, manual_redirect_uri
                )
                return await provider.save_credentials(credentials, save_path)
            else:
                raise
    finally:
        await callback_server.stop()

DeviceCodeFlow

OAuth device code flow for headless environments.

run async

run(provider, save_path=None)

Execute device code flow with polling.

Source code in ccproxy/auth/oauth/flows.py
async def run(
    self, provider: OAuthProviderProtocol, save_path: str | Path | None = None
) -> Any:
    """Execute device code flow with polling."""
    (
        device_code,
        user_code,
        verification_uri,
        expires_in,
    ) = await provider.start_device_flow()

    console.print(f"[bold green]Visit: {verification_uri}[/bold green]")
    console.print(f"[bold green]Enter code: {user_code}[/bold green]")
    render_qr_code(verification_uri)  # QR code for mobile

    # Poll for completion with timeout
    with console.status("Waiting for authorization..."):
        credentials = await provider.complete_device_flow(
            device_code, 5, expires_in
        )

    return await provider.save_credentials(credentials, save_path)

ManualCodeFlow

Manual authorization code entry for restricted environments.

run async

run(provider, save_path=None)

Execute manual code entry flow.

Source code in ccproxy/auth/oauth/flows.py
async def run(
    self, provider: OAuthProviderProtocol, save_path: str | Path | None = None
) -> Any:
    """Execute manual code entry flow."""
    # Generate state for manual flow
    state = secrets.token_urlsafe(32)
    code_verifier = None
    if provider.supports_pkce:
        code_verifier = (
            base64.urlsafe_b64encode(secrets.token_bytes(32))
            .decode("utf-8")
            .rstrip("=")
        )

    # Get provider-specific manual redirect URI or fallback to OOB
    manual_redirect_uri = (
        provider.cli.manual_redirect_uri or "urn:ietf:wg:oauth:2.0:oob"
    )

    # Get authorization URL for manual entry
    auth_url = await provider.get_authorization_url(
        state, code_verifier, manual_redirect_uri
    )

    console.print(f"[bold green]Visit: {auth_url}[/bold green]")
    render_qr_code(auth_url)

    # Prompt for manual code entry
    raw_code = typer.prompt("[bold]Enter the authorization code[/bold]").strip()

    # Parse the code - some providers (like Claude) return code#state format
    # Extract the code and state parts
    code_parts = raw_code.split("#")
    code = code_parts[0].strip()

    # If there's a state in the input (Claude format), use it instead of our generated state
    if len(code_parts) > 1 and code_parts[1].strip():
        actual_state = code_parts[1].strip()
    else:
        actual_state = state

    # Use the provider's handle_callback method instead of exchange_manual_code
    # to properly handle state validation
    credentials = await provider.handle_callback(
        code, actual_state, code_verifier, manual_redirect_uri
    )
    return await provider.save_credentials(credentials, save_path)

render_qr_code

render_qr_code(url)

Render QR code for URL when TTY supports it.

Source code in ccproxy/auth/oauth/flows.py
def render_qr_code(url: str) -> None:
    """Render QR code for URL when TTY supports it."""
    if not sys.stdout.isatty():
        return

    try:
        import qrcode  # type: ignore[import-untyped]

        qr = qrcode.QRCode(border=1)
        qr.add_data(url)
        qr.print_ascii(invert=True)
        console.print("[dim]Scan QR code with mobile device[/dim]")
    except ImportError:
        # QR code library not available - graceful degradation
        pass