Skip to content

ccproxy.plugins.credential_balancer.manager

ccproxy.plugins.credential_balancer.manager

Credential rotation manager for the credential balancer plugin.

CredentialEntry dataclass

CredentialEntry(
    config,
    manager,
    max_failures,
    cooldown_seconds,
    logger,
    _failure_count=0,
    _disabled_until=None,
)

Wrapper for an AuthManager with failure tracking and cooldown logic.

label property

label

Return a stable label for this credential entry.

get_access_token async

get_access_token()

Get access token from the composed manager.

Returns:

Type Description
str

Access token string

Raises:

Type Description
AuthenticationError

If no valid token available

Source code in ccproxy/plugins/credential_balancer/manager.py
async def get_access_token(self) -> str:
    """Get access token from the composed manager.

    Returns:
        Access token string

    Raises:
        AuthenticationError: If no valid token available
    """
    async with self._lock:
        return await self.manager.get_access_token()

get_access_token_with_refresh async

get_access_token_with_refresh()

Get access token with automatic refresh if supported.

Returns:

Type Description
str

Access token string

Raises:

Type Description
AuthenticationError

If no valid token available

Source code in ccproxy/plugins/credential_balancer/manager.py
async def get_access_token_with_refresh(self) -> str:
    """Get access token with automatic refresh if supported.

    Returns:
        Access token string

    Raises:
        AuthenticationError: If no valid token available
    """
    async with self._lock:
        # Try to use enhanced refresh if available
        if hasattr(self.manager, "get_access_token_with_refresh"):
            return await self.manager.get_access_token_with_refresh()  # type: ignore
        # Fallback to basic get_access_token
        return await self.manager.get_access_token()

is_authenticated async

is_authenticated()

Check if manager has valid authentication.

Returns:

Type Description
bool

True if authenticated, False otherwise

Source code in ccproxy/plugins/credential_balancer/manager.py
async def is_authenticated(self) -> bool:
    """Check if manager has valid authentication.

    Returns:
        True if authenticated, False otherwise
    """
    try:
        async with self._lock:
            return await self.manager.is_authenticated()
    except Exception:
        return False

mark_failure

mark_failure()

Record a failure and potentially disable this credential.

Source code in ccproxy/plugins/credential_balancer/manager.py
def mark_failure(self) -> None:
    """Record a failure and potentially disable this credential."""
    self._failure_count += 1
    self.logger.debug(
        "credential_balancer_failure_recorded",
        credential=self.label,
        failures=self._failure_count,
    )
    if self._failure_count >= self.max_failures:
        if self.cooldown_seconds > 0:
            self._disabled_until = time.monotonic() + self.cooldown_seconds
        else:
            self._disabled_until = float("inf")
        self.logger.warning(
            "credential_balancer_credential_disabled",
            credential=self.label,
            cooldown_seconds=self.cooldown_seconds,
            failures=self._failure_count,
        )

reset_failures

reset_failures()

Reset failure count and re-enable this credential.

Source code in ccproxy/plugins/credential_balancer/manager.py
def reset_failures(self) -> None:
    """Reset failure count and re-enable this credential."""
    if self._failure_count or self._disabled_until:
        self.logger.debug(
            "credential_balancer_failure_reset",
            credential=self.label,
        )
    self._failure_count = 0
    self._disabled_until = None

is_disabled

is_disabled(now)

Check if this credential is currently disabled.

Parameters:

Name Type Description Default
now float

Current monotonic time

required

Returns:

Type Description
bool

True if disabled, False if available

Source code in ccproxy/plugins/credential_balancer/manager.py
def is_disabled(self, now: float) -> bool:
    """Check if this credential is currently disabled.

    Args:
        now: Current monotonic time

    Returns:
        True if disabled, False if available
    """
    if self._disabled_until is None:
        return False
    if self._disabled_until == float("inf"):
        return True
    if now >= self._disabled_until:
        self.logger.debug(
            "credential_balancer_cooldown_expired",
            credential=self.label,
        )
        self._disabled_until = None
        self._failure_count = 0
        return False
    return True

CredentialBalancerTokenManager

CredentialBalancerTokenManager(
    config, entries, *, logger=None
)

Bases: AuthManager

Auth manager that rotates across multiple credential sources.

Parameters:

Name Type Description Default
config CredentialPoolConfig

Pool configuration

required
entries list[CredentialEntry]

List of credential entries with composed managers

required
logger TraceBoundLogger | None

Optional logger for this manager

None
Source code in ccproxy/plugins/credential_balancer/manager.py
def __init__(
    self,
    config: CredentialPoolConfig,
    entries: list[CredentialEntry],
    *,
    logger: TraceBoundLogger | None = None,
) -> None:
    """Initialize credential balancer with pre-created entries.

    Args:
        config: Pool configuration
        entries: List of credential entries with composed managers
        logger: Optional logger for this manager
    """
    self._config = config
    self._logger = (logger or get_plugin_logger(__name__)).bind(
        manager=config.manager_name,
        provider=config.provider,
    )
    self._entries = entries
    self._strategy = config.strategy
    self._failure_codes = set(config.failure_status_codes)
    self._lock = asyncio.Lock()
    self._state_lock = asyncio.Lock()
    self._request_states: dict[str, _RequestState] = {}
    self._active_index = 0
    self._next_index = 0

create async classmethod

create(config, factory=None, *, logger=None)

Async factory to create balancer with composed managers.

Parameters:

Name Type Description Default
config CredentialPoolConfig

Pool configuration

required
factory AuthManagerFactory | None

Auth manager factory for creating managers from sources

None
logger TraceBoundLogger | None

Optional logger for this manager

None

Returns:

Type Description
CredentialBalancerTokenManager

Initialized CredentialBalancerTokenManager instance

Source code in ccproxy/plugins/credential_balancer/manager.py
@classmethod
async def create(
    cls,
    config: CredentialPoolConfig,
    factory: AuthManagerFactory | None = None,
    *,
    logger: TraceBoundLogger | None = None,
) -> CredentialBalancerTokenManager:
    """Async factory to create balancer with composed managers.

    Args:
        config: Pool configuration
        factory: Auth manager factory for creating managers from sources
        logger: Optional logger for this manager

    Returns:
        Initialized CredentialBalancerTokenManager instance
    """
    from ccproxy.plugins.credential_balancer.factory import AuthManagerFactory

    if factory is None:
        factory = AuthManagerFactory(logger=logger)

    bound_logger = (logger or get_plugin_logger(__name__)).bind(
        manager=config.manager_name,
        provider=config.provider,
    )

    # Create entries with composed managers
    entries: list[CredentialEntry] = []
    failed_credentials: list[str] = []

    for credential in config.credentials:
        try:
            manager = await factory.create_from_source(credential, config.provider)
            entry = CredentialEntry(
                config=credential,
                manager=manager,
                max_failures=config.max_failures_before_disable,
                cooldown_seconds=config.cooldown_seconds,
                logger=bound_logger.bind(credential=credential.resolved_label),
            )
            entries.append(entry)
        except AuthenticationError as e:
            # Log clean warning for failed credential without stack trace
            label = credential.resolved_label
            bound_logger.warning(
                "credential_balancer_credential_skipped",
                credential=label,
                reason=str(e),
                category="auth",
            )
            failed_credentials.append(label)
            continue
        except Exception as e:
            # Unexpected errors still get logged with type info
            label = credential.resolved_label
            bound_logger.error(
                "credential_balancer_credential_failed",
                credential=label,
                error=str(e),
                error_type=type(e).__name__,
                category="auth",
            )
            failed_credentials.append(label)
            continue

    # Warn if some credentials failed
    if failed_credentials:
        bound_logger.warning(
            "credential_balancer_partial_initialization",
            total=len(config.credentials),
            failed=len(failed_credentials),
            succeeded=len(entries),
            failed_labels=failed_credentials,
        )

    # Ensure we have at least one valid credential
    if not entries:
        raise AuthenticationError(
            f"No valid credentials available for {config.manager_name}. "
            f"All {len(config.credentials)} credential(s) failed to load."
        )

    return cls(config, entries, logger=logger)

get_access_token async

get_access_token()

Get access token from selected credential entry.

Returns:

Type Description
str

Access token string

Raises:

Type Description
AuthenticationError

If no valid token available

Source code in ccproxy/plugins/credential_balancer/manager.py
async def get_access_token(self) -> str:
    """Get access token from selected credential entry.

    Returns:
        Access token string

    Raises:
        AuthenticationError: If no valid token available
    """
    entry = await self._select_entry()
    try:
        token = await entry.get_access_token()
        request_id = await self._register_request(entry)
        self._logger.debug(
            "credential_balancer_token_selected",
            credential=entry.label,
            request_id=request_id,
        )
        return token
    except AuthenticationError:
        entry.mark_failure()
        await self._handle_entry_failure(entry)
        raise

get_access_token_with_refresh async

get_access_token_with_refresh()

Get access token with automatic refresh if supported.

Returns:

Type Description
str

Access token string

Raises:

Type Description
AuthenticationError

If no valid token available

Source code in ccproxy/plugins/credential_balancer/manager.py
async def get_access_token_with_refresh(self) -> str:
    """Get access token with automatic refresh if supported.

    Returns:
        Access token string

    Raises:
        AuthenticationError: If no valid token available
    """
    try:
        return await self.get_access_token()
    except AuthenticationError as exc:
        # Try to refresh the active entry's token
        entry = await self._select_entry(require_active=True)
        try:
            token = await entry.get_access_token_with_refresh()
            request_id = await self._register_request(entry)
            self._logger.debug(
                "credential_balancer_manual_refresh_succeeded",
                credential=entry.label,
                request_id=request_id,
            )
            return token
        except AuthenticationError:
            self._logger.debug(
                "credential_balancer_manual_refresh_failed",
                credential=entry.label,
            )
            raise exc

is_authenticated async

is_authenticated()

Check if any credential is authenticated.

Returns:

Type Description
bool

True if at least one credential is authenticated, False otherwise

Source code in ccproxy/plugins/credential_balancer/manager.py
async def is_authenticated(self) -> bool:
    """Check if any credential is authenticated.

    Returns:
        True if at least one credential is authenticated, False otherwise
    """
    try:
        entry = await self._select_entry()
    except AuthenticationError:
        return False
    return await entry.is_authenticated()

get_user_profile async

get_user_profile()

Get user profile (not available for balancer).

Returns:

Type Description
StandardProfileFields | None

None, as balancer aggregates multiple credentials

Source code in ccproxy/plugins/credential_balancer/manager.py
async def get_user_profile(self) -> StandardProfileFields | None:
    """Get user profile (not available for balancer).

    Returns:
        None, as balancer aggregates multiple credentials
    """
    return None

get_profile_quick async

get_profile_quick()

Get profile information without I/O (for compatibility).

Returns:

Type Description
Any

None, as balancer doesn't maintain profile cache

Source code in ccproxy/plugins/credential_balancer/manager.py
async def get_profile_quick(self) -> Any:
    """Get profile information without I/O (for compatibility).

    Returns:
        None, as balancer doesn't maintain profile cache
    """
    return None

validate_credentials async

validate_credentials()

Validate that credentials are available and valid.

Returns:

Type Description
bool

True if valid credentials available, False otherwise

Source code in ccproxy/plugins/credential_balancer/manager.py
async def validate_credentials(self) -> bool:
    """Validate that credentials are available and valid.

    Returns:
        True if valid credentials available, False otherwise
    """
    return await self.is_authenticated()

get_provider_name

get_provider_name()

Get the provider name for this balancer.

Returns:

Type Description
str

Provider name string

Source code in ccproxy/plugins/credential_balancer/manager.py
def get_provider_name(self) -> str:
    """Get the provider name for this balancer.

    Returns:
        Provider name string
    """
    return self._config.provider

load_credentials async

load_credentials()

Load token snapshots from all credential entries.

Returns:

Type Description
dict[str, TokenSnapshot | None]

Dictionary mapping credential labels to their token snapshots

Source code in ccproxy/plugins/credential_balancer/manager.py
async def load_credentials(self) -> dict[str, TokenSnapshot | None]:
    """Load token snapshots from all credential entries.

    Returns:
        Dictionary mapping credential labels to their token snapshots
    """
    results: dict[str, TokenSnapshot | None] = {}
    for entry in self._entries:
        # Try to get token snapshot from manager if supported
        if hasattr(entry.manager, "get_token_snapshot"):
            try:
                # Cast to avoid mypy errors with protocol
                get_snapshot = cast(Any, entry.manager).get_token_snapshot
                snapshot = cast(TokenSnapshot | None, await get_snapshot())
                results[entry.label] = snapshot
            except Exception:
                results[entry.label] = None
        else:
            results[entry.label] = None
    return results

get_token_snapshot async

get_token_snapshot()

Get token snapshot from selected credential entry.

Returns:

Type Description
TokenSnapshot | None

TokenSnapshot if available, None otherwise

Source code in ccproxy/plugins/credential_balancer/manager.py
async def get_token_snapshot(self) -> TokenSnapshot | None:
    """Get token snapshot from selected credential entry.

    Returns:
        TokenSnapshot if available, None otherwise
    """
    entry = await self._select_entry()
    if hasattr(entry.manager, "get_token_snapshot"):
        try:
            # Cast to avoid mypy errors with protocol
            get_snapshot = cast(Any, entry.manager).get_token_snapshot
            return cast(TokenSnapshot | None, await get_snapshot())
        except Exception:
            return None
    return None