Skip to content

ccproxy.cli.commands.auth

ccproxy.cli.commands.auth

Authentication and credential management commands.

discover_oauth_providers async

discover_oauth_providers(container)

Return available OAuth providers discovered via the plugin loader.

Source code in ccproxy/cli/commands/auth.py
async def discover_oauth_providers(
    container: ServiceContainer,
) -> dict[str, tuple[str, str]]:
    """Return available OAuth providers discovered via the plugin loader."""
    providers: dict[str, tuple[str, str]] = {}
    try:
        settings = container.get_service(Settings)
        # For discovery, we can load all plugins temporarily since we don't initialize them
        from ccproxy.core.plugins import load_plugin_system

        registry, _ = load_plugin_system(settings)
        for name, factory in registry.factories.items():
            from ccproxy.core.plugins import AuthProviderPluginFactory

            if isinstance(factory, AuthProviderPluginFactory):
                if name == "oauth_claude":
                    providers["claude-api"] = ("oauth", "Claude API OAuth")
                elif name == "oauth_codex":
                    providers["codex"] = ("oauth", "OpenAI Codex OAuth")
                elif name == "copilot":
                    providers["copilot"] = ("oauth", "GitHub Copilot OAuth")
    except Exception as e:
        logger.debug("discover_oauth_providers_failed", error=str(e), exc_info=e)
    return providers

get_oauth_provider_choices

get_oauth_provider_choices()

Get list of available OAuth provider names for CLI choices.

Source code in ccproxy/cli/commands/auth.py
def get_oauth_provider_choices() -> list[str]:
    """Get list of available OAuth provider names for CLI choices."""
    container = _get_service_container()
    providers = asyncio.run(discover_oauth_providers(container))
    return list(providers.keys())

get_oauth_client_for_provider async

get_oauth_client_for_provider(
    provider, registry, container
)

Get OAuth client for the specified provider.

Source code in ccproxy/cli/commands/auth.py
async def get_oauth_client_for_provider(
    provider: str,
    registry: OAuthRegistry,
    container: ServiceContainer,
) -> Any:
    """Get OAuth client for the specified provider."""
    oauth_provider = await get_oauth_provider_for_name(provider, registry, container)
    if not oauth_provider:
        raise ValueError(f"Provider '{provider}' not found")
    oauth_client = getattr(oauth_provider, "client", None)
    if not oauth_client:
        raise ValueError(f"Provider '{provider}' does not implement OAuth client")
    return oauth_client

check_provider_credentials async

check_provider_credentials(provider, registry, container)

Check if provider has valid stored credentials.

Source code in ccproxy/cli/commands/auth.py
async def check_provider_credentials(
    provider: str,
    registry: OAuthRegistry,
    container: ServiceContainer,
) -> dict[str, Any]:
    """Check if provider has valid stored credentials."""
    try:
        oauth_provider = await get_oauth_provider_for_name(
            provider, registry, container
        )
        if not oauth_provider:
            return {
                "has_credentials": False,
                "expired": True,
                "path": None,
                "credentials": None,
            }

        creds = await oauth_provider.load_credentials()
        has_credentials = creds is not None

        return {
            "has_credentials": has_credentials,
            "expired": not has_credentials,
            "path": None,
            "credentials": None,
        }

    except AttributeError as e:
        logger.debug(
            "credentials_check_missing_attribute",
            provider=provider,
            error=str(e),
            exc_info=e,
        )
        return {
            "has_credentials": False,
            "expired": True,
            "path": None,
            "credentials": None,
        }
    except FileNotFoundError as e:
        logger.debug(
            "credentials_file_not_found", provider=provider, error=str(e), exc_info=e
        )
        return {
            "has_credentials": False,
            "expired": True,
            "path": None,
            "credentials": None,
        }
    except Exception as e:
        logger.debug(
            "credentials_check_failed", provider=provider, error=str(e), exc_info=e
        )
        return {
            "has_credentials": False,
            "expired": True,
            "path": None,
            "credentials": None,
        }

list_providers

list_providers()

List all available OAuth providers.

Source code in ccproxy/cli/commands/auth.py
@app.command(name="providers")
def list_providers() -> None:
    """List all available OAuth providers."""
    _ensure_logging_configured()
    toolkit = get_rich_toolkit()
    toolkit.print("[bold cyan]Available OAuth Providers[/bold cyan]", centered=True)
    toolkit.print_line()

    try:
        container = _get_service_container()
        providers = asyncio.run(discover_oauth_providers(container))

        if not providers:
            toolkit.print("No OAuth providers found", tag="warning")
            return

        table = Table(
            show_header=True,
            header_style="bold cyan",
            box=box.ROUNDED,
            title="OAuth Providers",
            title_style="bold white",
        )
        table.add_column("Provider", style="cyan")
        table.add_column("Auth Type", style="white")
        table.add_column("Description", style="dim")

        for name, (auth_type, description) in providers.items():
            table.add_row(name, auth_type, description)

        console.print(table)

    except ImportError as e:
        toolkit.print(f"Plugin import error: {e}", tag="error")
        raise typer.Exit(1) from e
    except AttributeError as e:
        toolkit.print(f"Plugin configuration error: {e}", tag="error")
        raise typer.Exit(1) from e
    except Exception as e:
        toolkit.print(f"Error listing providers: {e}", tag="error")
        raise typer.Exit(1) from e

login_command

login_command(
    provider,
    no_browser=False,
    manual=False,
    output_file=None,
    force=False,
)

Login to a provider using OAuth authentication.

Source code in ccproxy/cli/commands/auth.py
@app.command(name="login")
def login_command(
    provider: Annotated[
        str,
        typer.Argument(
            help="Provider to authenticate with (claude-api, codex, copilot)"
        ),
    ],
    no_browser: Annotated[
        bool,
        typer.Option("--no-browser", help="Don't automatically open browser for OAuth"),
    ] = False,
    manual: Annotated[
        bool,
        typer.Option(
            "--manual", "-m", help="Skip callback server and enter code manually"
        ),
    ] = False,
    output_file: Annotated[
        Path | None,
        typer.Option(
            "--file",
            help="Write credentials to this path instead of the default storage",
        ),
    ] = None,
    force: Annotated[
        bool,
        typer.Option(
            "--force",
            help="Overwrite existing credential file when using --file",
        ),
    ] = False,
) -> None:
    """Login to a provider using OAuth authentication."""
    _ensure_logging_configured()
    # Capture plugin-injected CLI args for potential use by auth providers
    try:
        from ccproxy.cli.helpers import get_plugin_cli_args

        _ = get_plugin_cli_args()
        # Currently not used directly here, but available to providers
    except Exception:
        pass
    toolkit = get_rich_toolkit()

    if force and output_file is None:
        toolkit.print("--force can only be used together with --file", tag="error")
        raise typer.Exit(1)

    custom_path: Path | None = None
    if output_file is not None:
        custom_path = output_file.expanduser()
        try:
            custom_path = custom_path.resolve()
        except FileNotFoundError:
            # Path.resolve() on some platforms raises when parents missing; fallback to absolute()
            custom_path = custom_path.absolute()

        if custom_path.exists() and custom_path.is_dir():
            toolkit.print(
                f"Target path '{custom_path}' is a directory. Provide a file path.",
                tag="error",
            )
            raise typer.Exit(1)

        if custom_path.exists() and not force:
            toolkit.print(
                f"Credential file '{custom_path}' already exists. Use --force to overwrite.",
                tag="error",
            )
            raise typer.Exit(1)

        try:
            custom_path.parent.mkdir(parents=True, exist_ok=True)
        except Exception as exc:
            toolkit.print(
                f"Failed to create directory '{custom_path.parent}': {exc}",
                tag="error",
            )
            raise typer.Exit(1)

    provider = provider.strip().lower()
    display_name = provider.replace("_", "-").title()

    toolkit.print(
        f"[bold cyan]OAuth Login - {display_name}[/bold cyan]",
        centered=True,
    )
    toolkit.print_line()

    custom_path_str = str(custom_path) if custom_path else None

    try:
        container = _get_service_container()
        registry = container.get_oauth_registry()
        oauth_provider = asyncio.run(
            get_oauth_provider_for_name(provider, registry, container)
        )

        if not oauth_provider:
            providers = asyncio.run(discover_oauth_providers(container))
            available = ", ".join(providers.keys()) if providers else "none"
            toolkit.print(
                f"Provider '{provider}' not found. Available: {available}",
                tag="error",
            )
            raise typer.Exit(1)

        # Get CLI configuration from provider
        cli_config = oauth_provider.cli

        # Flow engine selection with fallback logic
        flow_engine: ManualCodeFlow | DeviceCodeFlow | BrowserFlow
        try:
            with _temporary_disable_provider_storage(
                oauth_provider, disable=custom_path is not None
            ):
                if manual:
                    # Manual mode requested
                    if not cli_config.supports_manual_code:
                        raise AuthProviderError(
                            f"Provider '{provider}' doesn't support manual code entry"
                        )
                    flow_engine = ManualCodeFlow()
                    success = asyncio.run(
                        flow_engine.run(oauth_provider, save_path=custom_path_str)
                    )

                elif (
                    cli_config.preferred_flow == FlowType.device
                    and cli_config.supports_device_flow
                ):
                    # Device flow preferred and supported
                    flow_engine = DeviceCodeFlow()
                    success = asyncio.run(
                        flow_engine.run(oauth_provider, save_path=custom_path_str)
                    )

                else:
                    # Browser flow (default)
                    flow_engine = BrowserFlow()
                    success = asyncio.run(
                        flow_engine.run(
                            oauth_provider,
                            no_browser=no_browser,
                            save_path=custom_path_str,
                        )
                    )

        except PortBindError as e:
            # Port binding failed - offer manual fallback
            if cli_config.supports_manual_code:
                console.print(
                    "[yellow]Port binding failed. Falling back to manual mode.[/yellow]"
                )
                with _temporary_disable_provider_storage(
                    oauth_provider, disable=custom_path is not None
                ):
                    flow_engine = ManualCodeFlow()
                    success = asyncio.run(
                        flow_engine.run(oauth_provider, save_path=custom_path_str)
                    )
            else:
                console.print(
                    f"[red]Port {cli_config.callback_port} unavailable and manual mode not supported[/red]"
                )
                raise typer.Exit(1) from e

        except AuthTimedOutError:
            console.print("[red]Authentication timed out[/red]")
            raise typer.Exit(1)

        except AuthUserAbortedError:
            console.print("[yellow]Authentication cancelled by user[/yellow]")
            raise typer.Exit(1)

        except AuthProviderError as e:
            console.print(f"[red]Authentication failed: {e}[/red]")
            raise typer.Exit(1) from e

        except NetworkError as e:
            console.print(f"[red]Network error: {e}[/red]")
            raise typer.Exit(1) from e

        if success:
            console.print("[green]✓[/green] Authentication successful!")
            if custom_path:
                console.print(
                    f"[dim]Credentials saved to {custom_path}[/dim]",
                )
        else:
            console.print("[red]✗[/red] Authentication failed")
            raise typer.Exit(1)

    except KeyboardInterrupt:
        console.print("\n[yellow]Login cancelled by user.[/yellow]")
        raise typer.Exit(2) from None
    except ImportError as e:
        toolkit.print(f"Plugin import error: {e}", tag="error")
        raise typer.Exit(1) from e
    except typer.Exit:
        # Re-raise typer exits
        raise
    except Exception as e:
        toolkit.print(f"Error during login: {e}", tag="error")
        logger.error("login_command_error", error=str(e), exc_info=e)
        raise typer.Exit(1) from e

refresh_command

refresh_command(provider, credential_file=None)

Refresh stored credentials using the provider's refresh token.

Source code in ccproxy/cli/commands/auth.py
@app.command(name="refresh")
def refresh_command(
    provider: Annotated[
        str,
        typer.Argument(help="Provider to refresh (claude-api, codex, copilot)"),
    ],
    credential_file: Annotated[
        Path | None,
        typer.Option(
            "--file",
            help=(
                "Refresh credentials stored at this path instead of the default storage"
            ),
        ),
    ] = None,
) -> None:
    """Refresh stored credentials using the provider's refresh token."""
    _ensure_logging_configured()
    _refresh_provider_tokens(provider, credential_file)

renew_command

renew_command(provider, credential_file=None)

Alias for refresh.

Source code in ccproxy/cli/commands/auth.py
@app.command(name="renew")
def renew_command(
    provider: Annotated[
        str,
        typer.Argument(help="Alias for refresh command"),
    ],
    credential_file: Annotated[
        Path | None,
        typer.Option(
            "--file",
            help=(
                "Refresh credentials stored at this path instead of the default storage"
            ),
        ),
    ] = None,
) -> None:
    """Alias for refresh."""
    _ensure_logging_configured()
    _refresh_provider_tokens(provider, credential_file)

status_command

status_command(
    provider, detailed=False, credential_file=None
)

Check authentication status and info for specified provider.

Source code in ccproxy/cli/commands/auth.py
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
@app.command(name="status")
def status_command(
    provider: Annotated[
        str,
        typer.Argument(help="Provider to check status (claude-api, codex)"),
    ],
    detailed: Annotated[
        bool,
        typer.Option("--detailed", "-d", help="Show detailed credential information"),
    ] = False,
    credential_file: Annotated[
        Path | None,
        typer.Option(
            "--file",
            help=("Read credentials from this path instead of the default storage"),
        ),
    ] = None,
) -> None:
    """Check authentication status and info for specified provider."""
    _ensure_logging_configured()
    toolkit = get_rich_toolkit()

    credential_path = _normalize_credentials_file_option(
        toolkit, credential_file, require_exists=False
    )
    credential_missing = bool(credential_path and not credential_path.exists())
    load_kwargs: dict[str, Any] = {}
    if credential_path is not None:
        load_kwargs["custom_path"] = credential_path

    provider = provider.strip().lower()
    display_name = provider.replace("_", "-").title()

    toolkit.print(
        f"[bold cyan]{display_name} Authentication Status[/bold cyan]",
        centered=True,
    )
    toolkit.print_line()

    try:
        container = _get_service_container()
        registry = container.get_oauth_registry()
        oauth_provider = asyncio.run(
            get_oauth_provider_for_name(provider, registry, container)
        )
        if not oauth_provider:
            providers = asyncio.run(discover_oauth_providers(container))
            available = ", ".join(providers.keys()) if providers else "none"
            expected = _expected_plugin_class_name(provider)
            toolkit.print(
                f"Provider '{provider}' not found. Available: {available}. Expected plugin class '{expected}'.",
                tag="error",
            )
            raise typer.Exit(1)

        profile_info = None
        credentials = None
        snapshot: TokenSnapshot | None = None

        if oauth_provider:
            try:
                # Delegate to provider; providers may internally use their managers
                credentials = asyncio.run(
                    oauth_provider.load_credentials(**load_kwargs)
                )

                if credential_missing and not credentials:
                    toolkit.print(
                        f"Credential file '{credential_path}' not found.",
                        tag="warning",
                    )

                # Optionally obtain a token manager via provider API (if exposed)
                manager = None
                if credential_path is None:
                    try:
                        if hasattr(oauth_provider, "create_token_manager"):
                            manager = asyncio.run(oauth_provider.create_token_manager())
                        elif hasattr(oauth_provider, "get_token_manager"):
                            mgr = oauth_provider.get_token_manager()  # may be sync
                            # If coroutine, run it; else use directly
                            if hasattr(mgr, "__await__"):
                                manager = asyncio.run(mgr)
                            else:
                                manager = mgr
                    except Exception as e:
                        logger.debug("token_manager_unavailable", error=str(e))

                if manager and hasattr(manager, "get_token_snapshot"):
                    with contextlib.suppress(Exception):
                        result = manager.get_token_snapshot()
                        if asyncio.iscoroutine(result):
                            snapshot = asyncio.run(result)
                        else:
                            snapshot = cast(TokenSnapshot | None, result)

                if not snapshot and credentials:
                    snapshot = _token_snapshot_from_credentials(credentials, provider)

                if credentials:
                    if provider == "codex":
                        standard_profile = None
                        if hasattr(oauth_provider, "get_standard_profile"):
                            with contextlib.suppress(Exception):
                                standard_profile = asyncio.run(
                                    oauth_provider.get_standard_profile(credentials)
                                )
                        if not standard_profile and hasattr(
                            oauth_provider,
                            "_extract_standard_profile",
                        ):
                            with contextlib.suppress(Exception):
                                standard_profile = (
                                    oauth_provider._extract_standard_profile(
                                        credentials
                                    )
                                )
                        if standard_profile is not None:
                            try:
                                profile_info = standard_profile.model_dump(
                                    exclude={"raw_profile_data"}
                                )
                            except Exception:
                                profile_info = {
                                    "provider": provider,
                                    "authenticated": True,
                                }
                        else:
                            profile_info = {"provider": provider, "authenticated": True}
                    else:
                        quick = None
                        # Prefer provider-supplied quick profile methods if available
                        if credential_path is None and hasattr(
                            oauth_provider, "get_unified_profile_quick"
                        ):
                            with contextlib.suppress(Exception):
                                quick = asyncio.run(
                                    oauth_provider.get_unified_profile_quick()
                                )
                        if (
                            credential_path is None
                            and (not quick or quick == {})
                            and hasattr(oauth_provider, "get_unified_profile")
                        ):
                            with contextlib.suppress(Exception):
                                quick = asyncio.run(
                                    oauth_provider.get_unified_profile()
                                )
                        if quick and isinstance(quick, dict) and quick != {}:
                            profile_info = quick
                            try:
                                prov = (
                                    profile_info.get("provider_type")
                                    or profile_info.get("provider")
                                    or ""
                                ).lower()
                                extras = (
                                    profile_info.get("extras")
                                    if isinstance(profile_info.get("extras"), dict)
                                    else None
                                )
                                if (
                                    prov in {"claude-api", "claude_api", "claude"}
                                    and extras
                                ):
                                    account = (
                                        extras.get("account", {})
                                        if isinstance(extras.get("account"), dict)
                                        else {}
                                    )
                                    org = (
                                        extras.get("organization", {})
                                        if isinstance(extras.get("organization"), dict)
                                        else {}
                                    )
                                    if account.get("has_claude_max") is True:
                                        profile_info["subscription_type"] = "max"
                                        profile_info["subscription_status"] = "active"
                                    elif account.get("has_claude_pro") is True:
                                        profile_info["subscription_type"] = "pro"
                                        profile_info["subscription_status"] = "active"
                                    features = {}
                                    if isinstance(account.get("has_claude_max"), bool):
                                        features["claude_max"] = account.get(
                                            "has_claude_max"
                                        )
                                    if isinstance(account.get("has_claude_pro"), bool):
                                        features["claude_pro"] = account.get(
                                            "has_claude_pro"
                                        )
                                    if features:
                                        profile_info["features"] = {
                                            **features,
                                            **(profile_info.get("features") or {}),
                                        }
                                    if org.get("name") and not profile_info.get(
                                        "organization_name"
                                    ):
                                        profile_info["organization_name"] = org.get(
                                            "name"
                                        )
                                    if not profile_info.get("organization_role"):
                                        profile_info["organization_role"] = "member"
                            except Exception:
                                pass
                        else:
                            standard_profile = None
                            if hasattr(oauth_provider, "get_standard_profile"):
                                with contextlib.suppress(Exception):
                                    standard_profile = asyncio.run(
                                        oauth_provider.get_standard_profile(credentials)
                                    )
                            if standard_profile is not None:
                                try:
                                    profile_info = standard_profile.model_dump(
                                        exclude={"raw_profile_data"}
                                    )
                                except Exception:
                                    profile_info = {
                                        "provider": provider,
                                        "authenticated": True,
                                    }
                            else:
                                profile_info = {
                                    "provider": provider,
                                    "authenticated": True,
                                }

                    if profile_info is not None and "provider" not in profile_info:
                        profile_info["provider"] = provider

                    try:
                        prov_dbg = (
                            profile_info.get("provider_type")
                            or profile_info.get("provider")
                            or ""
                        ).lower()
                        missing = []
                        for f in (
                            "subscription_type",
                            "organization_name",
                            "display_name",
                        ):
                            if not profile_info.get(f):
                                missing.append(f)
                        if missing:
                            reasons: list[str] = []
                            qextra = (
                                quick.get("extras") if isinstance(quick, dict) else None
                            )
                            if prov_dbg in {"codex", "openai"}:
                                auth_claims = None
                                if isinstance(qextra, dict):
                                    auth_claims = qextra.get(
                                        "https://api.openai.com/auth"
                                    )
                                if not auth_claims:
                                    reasons.append("missing_openai_auth_claims")
                                else:
                                    if "chatgpt_plan_type" not in auth_claims:
                                        reasons.append("plan_type_not_in_claims")
                                    orgs = (
                                        auth_claims.get("organizations")
                                        if isinstance(auth_claims, dict)
                                        else None
                                    )
                                    if not orgs:
                                        reasons.append("no_organizations_in_claims")
                                has_id_token = bool(
                                    snapshot and snapshot.extras.get("id_token_present")
                                )
                                if not has_id_token:
                                    reasons.append("no_id_token_available")
                            elif prov_dbg in {"claude", "claude-api", "claude_api"}:
                                if not (
                                    isinstance(qextra, dict) and qextra.get("account")
                                ):
                                    reasons.append("missing_claude_account_extras")
                            if reasons:
                                logger.debug(
                                    "profile_fields_missing",
                                    provider=prov_dbg,
                                    missing_fields=missing,
                                    reasons=reasons,
                                )
                    except Exception:
                        pass

            except Exception as e:
                logger.debug(f"{provider}_status_error", error=str(e), exc_info=e)

        token_snapshot = snapshot
        if not token_snapshot and credentials:
            token_snapshot = _token_snapshot_from_credentials(credentials, provider)

        if token_snapshot:
            # Ensure we surface token metadata in the rendered profile table
            if not profile_info:
                profile_info = {
                    "provider_type": token_snapshot.provider or provider,
                    "authenticated": True,
                }

            if token_snapshot.expires_at:
                profile_info["token_expires_at"] = token_snapshot.expires_at

            profile_info["has_refresh_token"] = token_snapshot.has_refresh_token()
            profile_info["has_access_token"] = token_snapshot.has_access_token()

            has_id_token = bool(
                token_snapshot.extras.get("id_token_present")
                or token_snapshot.extras.get("has_id_token")
            )
            if not has_id_token and credentials and hasattr(credentials, "id_token"):
                with contextlib.suppress(Exception):
                    has_id_token = bool(credentials.id_token)
            profile_info["has_id_token"] = has_id_token

            if token_snapshot.scopes and not profile_info.get("scopes"):
                profile_info["scopes"] = list(token_snapshot.scopes)

        if profile_info:
            console.print("[green]✓[/green] Authenticated with valid credentials")

            if "provider_type" not in profile_info and "provider" in profile_info:
                try:
                    profile_info["provider_type"] = str(
                        profile_info["provider"]
                    ).replace("_", "-")
                except Exception:
                    profile_info["provider_type"] = (
                        str(profile_info["provider"])
                        if profile_info.get("provider")
                        else None
                    )

            _render_profile_table(profile_info, title="Account Information")
            _render_profile_features(profile_info)

            if detailed and token_snapshot:
                preview = token_snapshot.access_token_preview()
                if preview:
                    console.print(f"\n  Token: [dim]{preview}[/dim]")
        else:
            console.print("[red]✗[/red] Not authenticated or provider not found")
            console.print(f"  Run 'ccproxy auth login {provider}' to authenticate")

    except ImportError as e:
        console.print(f"[red]✗[/red] Failed to import required modules: {e}")
        raise typer.Exit(1) from e
    except AttributeError as e:
        console.print(f"[red]✗[/red] Configuration or plugin error: {e}")
        raise typer.Exit(1) from e
    except Exception as e:
        console.print(f"[red]✗[/red] Error checking status: {e}")
        raise typer.Exit(1) from e

logout_command

logout_command(provider)

Logout and remove stored credentials for specified provider.

Source code in ccproxy/cli/commands/auth.py
@app.command(name="logout")
def logout_command(
    provider: Annotated[
        str, typer.Argument(help="Provider to logout from (claude-api, codex)")
    ],
) -> None:
    """Logout and remove stored credentials for specified provider."""
    _ensure_logging_configured()
    toolkit = get_rich_toolkit()

    provider = provider.strip().lower()

    toolkit.print(f"[bold cyan]{provider.title()} Logout[/bold cyan]", centered=True)
    toolkit.print_line()

    try:
        container = _get_service_container()
        registry = container.get_oauth_registry()
        oauth_provider = asyncio.run(
            get_oauth_provider_for_name(provider, registry, container)
        )

        if not oauth_provider:
            providers = asyncio.run(discover_oauth_providers(container))
            available = ", ".join(providers.keys()) if providers else "none"
            expected = _expected_plugin_class_name(provider)
            toolkit.print(
                f"Provider '{provider}' not found. Available: {available}. Expected plugin class '{expected}'.",
                tag="error",
            )
            raise typer.Exit(1)

        existing_creds = None
        with contextlib.suppress(Exception):
            existing_creds = asyncio.run(oauth_provider.load_credentials())

        if not existing_creds:
            console.print("[yellow]No credentials found. Already logged out.[/yellow]")
            return

        confirm = typer.confirm(
            "Are you sure you want to logout and remove credentials?"
        )
        if not confirm:
            console.print("Logout cancelled.")
            return

        success = False
        try:
            storage = oauth_provider.get_storage()
            if storage and hasattr(storage, "delete"):
                success = asyncio.run(storage.delete())
            elif storage and hasattr(storage, "clear"):
                success = asyncio.run(storage.clear())
            else:
                success = asyncio.run(oauth_provider.save_credentials(None))
        except Exception as e:
            logger.debug("logout_error", error=str(e), exc_info=e)

        if success:
            toolkit.print(f"Successfully logged out from {provider}!", tag="success")
            console.print("Credentials have been removed.")
        else:
            toolkit.print("Failed to remove credentials", tag="error")
            raise typer.Exit(1)

    except FileNotFoundError:
        toolkit.print("No credentials found to remove.", tag="warning")
    except OSError as e:
        toolkit.print(f"Failed to remove credential files: {e}", tag="error")
        raise typer.Exit(1) from e
    except ImportError as e:
        toolkit.print(f"Failed to import required modules: {e}", tag="error")
        raise typer.Exit(1) from e
    except Exception as e:
        toolkit.print(f"Error during logout: {e}", tag="error")
        raise typer.Exit(1) from e

get_oauth_provider_for_name async

get_oauth_provider_for_name(provider, registry, container)

Get OAuth provider instance for the specified provider name.

Source code in ccproxy/cli/commands/auth.py
async def get_oauth_provider_for_name(
    provider: str,
    registry: OAuthRegistry,
    container: ServiceContainer,
) -> Any:
    """Get OAuth provider instance for the specified provider name."""
    existing = registry.get(provider)
    if existing:
        return existing

    provider_instance = await _lazy_register_oauth_provider(
        provider, registry, container
    )
    if provider_instance:
        return provider_instance

    return None