EnhancedTokenManager(
storage,
credentials_ttl=None,
refresh_grace_seconds=None,
)
Bases: BaseTokenManager[CredentialsT]
Enhanced token manager with automatic refresh capability.
Source code in ccproxy/auth/managers/base.py
| def __init__(
self,
storage: TokenStorage[CredentialsT],
credentials_ttl: float | None = None,
refresh_grace_seconds: float | None = None,
):
"""Initialize token manager.
Args:
storage: Token storage backend that matches the credential type
"""
self.storage = storage
self._auth_cache = AuthStatusCache(ttl=60.0) # 1 minute TTL for auth status
self._profile_cache: Any = None # For subclasses that cache profiles
# In-memory credentials cache to reduce file checks
self._credentials_cache: CredentialsT | None = None
self._credentials_loaded_at: float | None = None
# TTL for rechecking credentials from storage (config-driven)
# Prefer explicit parameter; fallback to environment; then default.
if credentials_ttl is not None:
try:
ttl_val = float(credentials_ttl)
self._credentials_ttl = ttl_val if ttl_val >= 0 else 30.0
except Exception:
self._credentials_ttl = 30.0
else:
env_val = os.getenv("AUTH__CREDENTIALS_TTL_SECONDS")
try:
self._credentials_ttl = float(env_val) if env_val is not None else 30.0
if self._credentials_ttl < 0:
self._credentials_ttl = 30.0
except Exception:
self._credentials_ttl = 30.0
# Grace period before expiry to trigger proactive refresh
if refresh_grace_seconds is not None:
try:
grace_val = float(refresh_grace_seconds)
self._refresh_grace_seconds = grace_val if grace_val >= 0 else 0.0
except Exception:
self._refresh_grace_seconds = 120.0
else:
env_grace = os.getenv("AUTH__REFRESH_GRACE_SECONDS")
try:
grace_val = float(env_grace) if env_grace is not None else 120.0
if grace_val < 0:
grace_val = 0.0
self._refresh_grace_seconds = grace_val
except Exception:
self._refresh_grace_seconds = 120.0
|
get_access_token_with_refresh
async
get_access_token_with_refresh()
Get valid access token, automatically refreshing if expired.
Returns:
| Type |
Description |
str | None
|
Access token if available and valid, None otherwise
|
Source code in ccproxy/auth/managers/base_enhanced.py
| async def get_access_token_with_refresh(self) -> str | None:
"""Get valid access token, automatically refreshing if expired.
Returns:
Access token if available and valid, None otherwise
"""
credentials = await self.load_credentials()
if not credentials:
logger.debug("no_credentials_found")
return None
# Check if token is expired
if self.should_refresh(credentials):
expires_in = self.seconds_until_expiration(credentials)
reason = "expired" if self.is_expired(credentials) else "expiring_soon"
logger.info(
"token_refresh_needed",
reason=reason,
expires_in=expires_in,
)
try:
refreshed = await self.refresh_token()
except Exception as exc: # pragma: no cover - defensive
logger.warning(
"token_refresh_exception", error=str(exc), category="auth"
)
raise OAuthTokenRefreshError("Token refresh failed") from exc
if refreshed:
logger.info("token_refreshed_successfully")
credentials = refreshed
else:
logger.warning("token_refresh_failed")
raise OAuthTokenRefreshError("Token refresh failed")
snapshot = self._safe_token_snapshot(credentials)
if snapshot and snapshot.access_token:
return snapshot.access_token
return None
|
ensure_valid_token
async
Ensure we have a valid (non-expired) token, refreshing if needed.
Returns:
| Type |
Description |
bool
|
True if we have a valid token (after refresh if needed), False otherwise
|
Source code in ccproxy/auth/managers/base_enhanced.py
| async def ensure_valid_token(self) -> bool:
"""Ensure we have a valid (non-expired) token, refreshing if needed.
Returns:
True if we have a valid token (after refresh if needed), False otherwise
"""
token = await self.get_access_token_with_refresh()
return token is not None
|