Skip to content

ccproxy.core.plugins

ccproxy.core.plugins

CCProxy Plugin System public API (minimal re-exports).

This module exposes the common symbols used by plugins and app code while keeping imports straightforward to avoid circular dependencies.

AuthCommandSpec dataclass

AuthCommandSpec(
    command_name, description, handler, options=dict()
)

Specification for auth commands.

FormatAdapterSpec dataclass

FormatAdapterSpec(
    from_format,
    to_format,
    adapter_factory,
    priority=100,
    description="",
)

Specification for format adapter registration.

format_pair property

format_pair

Get the format pair tuple.

HookSpec dataclass

HookSpec(hook_class, kwargs=dict())

Specification for plugin hooks.

MiddlewareLayer

Bases: IntEnum

Middleware layers for ordering.

MiddlewareSpec dataclass

MiddlewareSpec(
    middleware_class, priority=APPLICATION, kwargs=dict()
)

Specification for plugin middleware.

PluginContext

PluginContext()

Context provided to plugin runtime during initialization.

Source code in ccproxy/core/plugins/declaration.py
def __init__(self) -> None:
    """Initialize plugin context."""
    # Application settings
    self.settings: Settings | None = None
    self.http_client: httpx.AsyncClient | None = None
    self.logger: structlog.BoundLogger | None = None
    self.scheduler: Scheduler | None = None
    self.config: BaseModel | None = None
    self.cli_detection_service: CLIDetectionService | None = None
    self.plugin_registry: PluginRegistry | None = None

    # Core app and hook system
    self.app: FastAPI | None = None
    self.hook_registry: HookRegistry | None = None
    self.hook_manager: HookManager | None = None

    # Observability and streaming
    self.request_tracer: IRequestTracer | None = None
    self.streaming_handler: StreamingMetrics | None = None
    self.metrics: IMetricsCollector | None = None

    # Provider-specific
    self.adapter: BaseAdapter | None = None
    self.detection_service: Any = None
    self.credentials_manager: Any = None
    self.oauth_registry: OAuthRegistry | None = None
    self.http_pool_manager: Any = None
    self.service_container: Any = None
    self.auth_provider: Any = None
    self.token_manager: Any = None
    self.storage: Any = None

    self.format_registry: Any = None
    self.model_mapper: Any = None

    # Testing/utilities
    self.proxy_service: Any = None

    # Internal service mapping for type-safe access
    self._service_map: dict[type[Any], str] = {}
    self._initialize_service_map()

get_service

get_service(service_type)

Get a service instance by type with proper type safety.

Parameters:

Name Type Description Default
service_type type[T]

The type of service to retrieve

required

Returns:

Type Description
T

The service instance

Raises:

Type Description
ValueError

If the service is not available

Source code in ccproxy/core/plugins/declaration.py
def get_service(self, service_type: type[T]) -> T:
    """Get a service instance by type with proper type safety.

    Args:
        service_type: The type of service to retrieve

    Returns:
        The service instance

    Raises:
        ValueError: If the service is not available
    """
    # Create service mappings dynamically to access current values
    service_mappings: dict[type[Any], Any] = {}

    # Common concrete types
    if self.settings is not None:
        service_mappings[type(self.settings)] = self.settings
    if self.http_client is not None:
        service_mappings[httpx.AsyncClient] = self.http_client
    if self.logger is not None:
        service_mappings[structlog.BoundLogger] = self.logger
    if self.config is not None:
        service_mappings[type(self.config)] = self.config
        service_mappings[BaseModel] = self.config

    # Check if service type directly matches a known service
    if service_type in service_mappings:
        return service_mappings[service_type]  # type: ignore[no-any-return]

    # Check all attributes for an instance of the requested type
    for attr_name in dir(self):
        if not attr_name.startswith("_"):  # Skip private attributes
            attr_value = getattr(self, attr_name)
            if attr_value is not None and isinstance(attr_value, service_type):
                return attr_value  # type: ignore[no-any-return]

    # Service not found
    type_name = getattr(service_type, "__name__", str(service_type))
    raise ValueError(f"Service {type_name} not available in plugin context")

get

get(key_or_type, default=None)

Get service by type (new) or by string key (backward compatibility).

Parameters:

Name Type Description Default
key_or_type type[T] | str

Service type for type-safe access or string key for compatibility

required
default Any

Default value for string-based access (ignored for type-safe access)

None

Returns:

Type Description
T | Any

Service instance for type-safe access, or attribute value for string access

Source code in ccproxy/core/plugins/declaration.py
def get(self, key_or_type: type[T] | str, default: Any = None) -> T | Any:
    """Get service by type (new) or by string key (backward compatibility).

    Args:
        key_or_type: Service type for type-safe access or string key for compatibility
        default: Default value for string-based access (ignored for type-safe access)

    Returns:
        Service instance for type-safe access, or attribute value for string access
    """
    if isinstance(key_or_type, str):
        # Backward compatibility: string-based access
        return getattr(self, key_or_type, default)
    else:
        # Type-safe access
        return self.get_service(key_or_type)

get_attr

get_attr(key, default=None)

Get attribute by string name - for backward compatibility.

Parameters:

Name Type Description Default
key str

String attribute name

required
default Any

Default value if attribute not found

None

Returns:

Type Description
Any

Attribute value or default

Source code in ccproxy/core/plugins/declaration.py
def get_attr(self, key: str, default: Any = None) -> Any:
    """Get attribute by string name - for backward compatibility.

    Args:
        key: String attribute name
        default: Default value if attribute not found

    Returns:
        Attribute value or default
    """
    return getattr(self, key, default)

keys

keys()

Backward compatibility: Return list of available service keys.

Source code in ccproxy/core/plugins/declaration.py
def keys(self) -> list[str]:
    """Backward compatibility: Return list of available service keys."""
    return [
        attr
        for attr in dir(self)
        if not attr.startswith("_")
        and not callable(getattr(self, attr))
        and getattr(self, attr) is not None
    ]

PluginManifest dataclass

PluginManifest(
    name,
    version,
    description="",
    dependencies=list(),
    is_provider=False,
    provides=list(),
    requires=list(),
    optional_requires=list(),
    middleware=list(),
    routes=list(),
    tasks=list(),
    hooks=list(),
    auth_commands=list(),
    config_class=None,
    tool_accumulator_class=None,
    oauth_client_factory=None,
    oauth_provider_factory=None,
    token_manager_factory=None,
    oauth_config_class=None,
    oauth_routes=list(),
    format_adapters=list(),
    requires_format_adapters=list(),
    cli_commands=list(),
    cli_arguments=list(),
)

Complete static declaration of a plugin's capabilities.

This manifest is created at module import time and contains all static information needed to integrate the plugin into the application.

validate_dependencies

validate_dependencies(available_plugins)

Validate that all dependencies are available.

Parameters:

Name Type Description Default
available_plugins set[str]

Set of available plugin names

required

Returns:

Type Description
list[str]

List of missing dependencies

Source code in ccproxy/core/plugins/declaration.py
def validate_dependencies(self, available_plugins: set[str]) -> list[str]:
    """Validate that all dependencies are available.

    Args:
        available_plugins: Set of available plugin names

    Returns:
        List of missing dependencies
    """
    return [dep for dep in self.dependencies if dep not in available_plugins]

validate_service_dependencies

validate_service_dependencies(available_services)

Validate that required services are available.

Parameters:

Name Type Description Default
available_services set[str]

Set of available service names

required

Returns:

Type Description
list[str]

List of missing required services

Source code in ccproxy/core/plugins/declaration.py
def validate_service_dependencies(self, available_services: set[str]) -> list[str]:
    """Validate that required services are available.

    Args:
        available_services: Set of available service names

    Returns:
        List of missing required services
    """
    missing = []
    for required in self.requires:
        if required not in available_services:
            missing.append(required)
    return missing

get_sorted_middleware

get_sorted_middleware()

Get middleware sorted by priority.

Source code in ccproxy/core/plugins/declaration.py
def get_sorted_middleware(self) -> list[MiddlewareSpec]:
    """Get middleware sorted by priority."""
    return sorted(self.middleware)

validate_format_adapter_requirements

validate_format_adapter_requirements(available_adapters)

Validate that required format adapters are available.

Source code in ccproxy/core/plugins/declaration.py
def validate_format_adapter_requirements(
    self, available_adapters: set[FormatPair]
) -> list[FormatPair]:
    """Validate that required format adapters are available."""
    return [
        req
        for req in self.requires_format_adapters
        if req not in available_adapters
    ]

PluginRuntimeProtocol

Bases: Protocol

Protocol for plugin runtime instances.

initialize async

initialize(context)

Initialize the plugin with runtime context.

Source code in ccproxy/core/plugins/declaration.py
async def initialize(self, context: PluginContext) -> None:
    """Initialize the plugin with runtime context."""
    ...

shutdown async

shutdown()

Cleanup on shutdown.

Source code in ccproxy/core/plugins/declaration.py
async def shutdown(self) -> None:
    """Cleanup on shutdown."""
    ...

validate async

validate()

Validate plugin is ready.

Source code in ccproxy/core/plugins/declaration.py
async def validate(self) -> bool:
    """Validate plugin is ready."""
    ...

health_check async

health_check()

Perform health check.

Source code in ccproxy/core/plugins/declaration.py
async def health_check(self) -> dict[str, Any]:
    """Perform health check."""
    ...

RouteSpec dataclass

RouteSpec(router, prefix, tags=list(), dependencies=list())

Specification for plugin routes.

TaskSpec dataclass

TaskSpec(
    task_name,
    task_type,
    task_class,
    interval_seconds,
    enabled=True,
    kwargs=dict(),
)

Specification for scheduled tasks.

BaseProviderPluginFactory

BaseProviderPluginFactory()

Bases: ProviderPluginFactory

Base factory for provider plugins that eliminates common boilerplate.

This class uses class attributes for plugin configuration and implements common methods that all provider factories share. Subclasses only need to define class attributes and override methods that need custom behavior.

Required class attributes to be defined by subclasses: - plugin_name: str - plugin_description: str - runtime_class: type[ProviderPluginRuntime] - adapter_class: type[BaseAdapter] - config_class: type[BaseSettings]

Optional class attributes with defaults: - plugin_version: str = "1.0.0" - detection_service_class: type | None = None - credentials_manager_class: type | None = None - router: APIRouter | None = None - route_prefix: str = "/api" - dependencies: list[str] = [] - optional_requires: list[str] = [] - tasks: list[TaskSpec] = []

Source code in ccproxy/core/plugins/factories.py
def __init__(self) -> None:
    """Initialize factory with manifest built from class attributes."""
    # Validate required class attributes
    self._validate_class_attributes()

    # Validate runtime class is a proper subclass
    # Import locally to avoid circular import during module import
    from .runtime import ProviderPluginRuntime

    if not issubclass(self.runtime_class, ProviderPluginRuntime):
        raise TypeError(
            f"runtime_class {self.runtime_class.__name__} must be a subclass of ProviderPluginRuntime"
        )

    # Build routes from routers list
    routes = []
    for router_spec in self.routers:
        # Handle both router instances and router factory functions
        router_instance = router_spec.router
        if callable(router_spec.router) and not isinstance(
            router_spec.router, APIRouter
        ):
            # Router is a factory function, call it to get the actual router
            router_instance = router_spec.router()

        routes.append(
            RouteSpec(
                router=cast(APIRouter, router_instance),
                prefix=router_spec.prefix,
                tags=router_spec.tags or [],
                dependencies=router_spec.dependencies,
            )
        )

    # Create manifest from class attributes
    manifest = PluginManifest(
        name=self.plugin_name,
        version=self.plugin_version,
        description=self.plugin_description,
        is_provider=True,
        config_class=self.config_class,
        tool_accumulator_class=self.tool_accumulator_class,
        dependencies=self.dependencies.copy(),
        optional_requires=self.optional_requires.copy(),
        routes=routes,
        tasks=self.tasks.copy(),
        format_adapters=self.format_adapters.copy(),
        requires_format_adapters=self.requires_format_adapters.copy(),
        cli_commands=self.cli_commands.copy(),
        cli_arguments=self.cli_arguments.copy(),
    )

    # Format adapter specification validation is deferred to runtime
    # when settings are available via dependency injection

    # Store the manifest and runtime class directly
    # We don't call parent __init__ because ProviderPluginFactory
    # would override our runtime_class with ProviderPluginRuntime
    self.manifest = manifest
    self.runtime_class = self.__class__.runtime_class

validate_format_adapters_with_settings

validate_format_adapters_with_settings(settings)

Validate format adapter specifications (feature flags removed).

Source code in ccproxy/core/plugins/factories.py
def validate_format_adapters_with_settings(self, settings: "Settings") -> None:
    """Validate format adapter specifications (feature flags removed)."""
    self._validate_format_adapter_specs()

create_runtime

create_runtime()

Create runtime instance using the configured runtime class.

Source code in ccproxy/core/plugins/factories.py
def create_runtime(self) -> Any:
    """Create runtime instance using the configured runtime class."""
    return cast(Any, self.runtime_class(self.manifest))

create_adapter async

create_adapter(context)

Create adapter instance with explicit dependencies.

This method extracts services from context and creates the adapter with explicit dependency injection. Subclasses can override this method if they need custom adapter creation logic.

Parameters:

Name Type Description Default
context PluginContext

Plugin context

required

Returns:

Type Description
BaseAdapter

Adapter instance

Source code in ccproxy/core/plugins/factories.py
async def create_adapter(self, context: PluginContext) -> BaseAdapter:
    """Create adapter instance with explicit dependencies.

    This method extracts services from context and creates the adapter
    with explicit dependency injection. Subclasses can override this
    method if they need custom adapter creation logic.

    Args:
        context: Plugin context

    Returns:
        Adapter instance
    """
    # Extract services from context (one-time extraction)
    http_pool_manager: HTTPPoolManager | None = cast(
        "HTTPPoolManager | None", context.get("http_pool_manager")
    )
    request_tracer: IRequestTracer | None = context.get("request_tracer")
    metrics: IMetricsCollector | None = context.get("metrics")
    streaming_handler: StreamingMetrics | None = context.get("streaming_handler")
    hook_manager = context.get("hook_manager")

    # Get auth and detection services that may have been created by factory
    auth_manager = context.get("credentials_manager")
    detection_service = context.get("detection_service")

    # Get config if available
    config = context.get("config")

    # Get all adapter dependencies from service container
    service_container = context.get("service_container")
    if not service_container:
        raise RuntimeError("Service container is required for adapter services")

    # Get standardized adapter dependencies
    adapter_dependencies = service_container.get_adapter_dependencies(metrics)

    # Check if this is an HTTP-based adapter
    if issubclass(self.adapter_class, BaseHTTPAdapter):
        # HTTP adapters require http_pool_manager
        if not http_pool_manager:
            raise RuntimeError(
                f"HTTP pool manager required for {self.adapter_class.__name__} but not available in context"
            )

        # Ensure config is provided for HTTP adapters
        if config is None and self.manifest.config_class:
            config = self.manifest.config_class()

        # Create HTTP adapter with explicit dependencies including format services
        init_params = inspect.signature(self.adapter_class.__init__).parameters
        adapter_kwargs: dict[str, Any] = {
            "config": config,
            "auth_manager": auth_manager,
            "detection_service": detection_service,
            "http_pool_manager": http_pool_manager,
            "request_tracer": request_tracer or NullRequestTracer(),
            "metrics": metrics or NullMetricsCollector(),
            "streaming_handler": streaming_handler or NullStreamingHandler(),
            "hook_manager": hook_manager,
            "format_registry": adapter_dependencies["format_registry"],
            "context": context,
            "model_mapper": context.get("model_mapper")
            if hasattr(context, "get")
            else None,
        }
        if self.tool_accumulator_class:
            adapter_kwargs["tool_accumulator_class"] = self.tool_accumulator_class

        return cast(BaseAdapter, self.adapter_class(**adapter_kwargs))
    else:
        # Non-HTTP adapters (like ClaudeSDK) have different dependencies
        # Build kwargs based on adapter class constructor signature
        non_http_adapter_kwargs: dict[str, Any] = {}

        # Get the adapter's __init__ signature
        sig = inspect.signature(self.adapter_class.__init__)
        params = sig.parameters

        # For non-HTTP adapters, create http_client from pool manager if needed
        client_for_non_http: httpx.AsyncClient | None = None
        if http_pool_manager and "http_client" in params:
            client_for_non_http = await http_pool_manager.get_client()

        # Map available services to expected parameters
        param_mapping = {
            "config": config,
            "http_client": client_for_non_http,
            "http_pool_manager": http_pool_manager,
            "auth_manager": auth_manager,
            "detection_service": detection_service,
            "session_manager": context.get("session_manager"),
            "request_tracer": request_tracer,
            "metrics": metrics,
            "streaming_handler": streaming_handler,
            "hook_manager": hook_manager,
            "format_registry": adapter_dependencies["format_registry"],
            "context": context,
            "model_mapper": context.get("model_mapper")
            if hasattr(context, "get")
            else None,
        }
        if self.tool_accumulator_class:
            non_http_adapter_kwargs["tool_accumulator_class"] = (
                self.tool_accumulator_class
            )

        # Add parameters that the adapter expects
        for param_name, param in params.items():
            if param_name in ("self", "kwargs"):
                continue
            if param_name in param_mapping:
                if param_mapping[param_name] is not None:
                    non_http_adapter_kwargs[param_name] = param_mapping[param_name]
                elif (
                    param_name == "config"
                    and param.default is inspect.Parameter.empty
                    and self.manifest.config_class
                ):
                    # Config is None but required, create default
                    default_config = self.manifest.config_class()
                    non_http_adapter_kwargs["config"] = default_config
            elif (
                param.default is inspect.Parameter.empty
                and param_name not in non_http_adapter_kwargs
                and param_name == "config"
                and self.manifest.config_class
            ):
                # Config parameter is missing but required, create default
                default_config = self.manifest.config_class()
                non_http_adapter_kwargs["config"] = default_config

        return cast(BaseAdapter, self.adapter_class(**non_http_adapter_kwargs))

create_detection_service

create_detection_service(context)

Create detection service instance if class is configured.

Parameters:

Name Type Description Default
context PluginContext

Plugin context

required

Returns:

Type Description
Any

Detection service instance or None if no class configured

Source code in ccproxy/core/plugins/factories.py
def create_detection_service(self, context: PluginContext) -> Any:
    """Create detection service instance if class is configured.

    Args:
        context: Plugin context

    Returns:
        Detection service instance or None if no class configured
    """
    if self.detection_service_class is None:
        return None

    settings = context.get("settings")
    if settings is None:
        from ccproxy.config.settings import Settings

        settings = Settings()

    cli_service = context.get("cli_detection_service")
    return self.detection_service_class(settings, cli_service)

create_credentials_manager async

create_credentials_manager(context)

Resolve credentials manager via the shared auth registry.

Source code in ccproxy/core/plugins/factories.py
async def create_credentials_manager(self, context: PluginContext) -> Any:
    """Resolve credentials manager via the shared auth registry."""

    auth_manager_name = self.get_auth_manager_name(context)
    registry = None

    service_container = context.get("service_container")
    if service_container and hasattr(
        service_container, "get_auth_manager_registry"
    ):
        registry = service_container.get_auth_manager_registry()

    if not auth_manager_name:
        return None

    if not registry:
        logger.warning(
            "auth_manager_registry_unavailable",
            plugin=self.manifest.name,
            auth_manager_name=auth_manager_name,
            category="auth",
        )
        return None

    resolved = await registry.get(auth_manager_name)
    if resolved:
        return resolved

    # Respect explicit overrides that could not be resolved
    if self.auth_manager_name and auth_manager_name != self.auth_manager_name:
        logger.warning(
            "auth_manager_override_not_resolved",
            plugin=self.manifest.name,
            auth_manager_name=auth_manager_name,
            category="auth",
        )
    else:
        logger.warning(
            "auth_manager_not_registered",
            plugin=self.manifest.name,
            auth_manager_name=auth_manager_name,
            category="auth",
        )

    return None

get_auth_manager_name

get_auth_manager_name(context)

Get auth manager name, allowing config override.

Parameters:

Name Type Description Default
context PluginContext

Plugin context containing config

required

Returns:

Type Description
str | None

Auth manager name or None if not configured

Source code in ccproxy/core/plugins/factories.py
def get_auth_manager_name(self, context: PluginContext) -> str | None:
    """Get auth manager name, allowing config override.

    Args:
        context: Plugin context containing config

    Returns:
        Auth manager name or None if not configured
    """
    # Check if plugin config overrides auth manager
    if hasattr(context, "config") and context.config:
        config_auth_manager = getattr(context.config, "auth_manager", None)
        if config_auth_manager:
            return str(config_auth_manager)

    # Use plugin's default auth manager name
    return self.auth_manager_name

create_context

create_context(service_container)

Create context with provider-specific components.

This method provides a hook for subclasses to customize context creation. The default implementation just returns the base context.

Parameters:

Name Type Description Default
core_services

Core services container

required

Returns:

Type Description
PluginContext

Plugin context

Source code in ccproxy/core/plugins/factories.py
def create_context(self, service_container: "ServiceContainer") -> PluginContext:
    """Create context with provider-specific components.

    This method provides a hook for subclasses to customize context creation.
    The default implementation just returns the base context.

    Args:
        core_services: Core services container

    Returns:
        Plugin context
    """
    context = super().create_context(service_container)
    config = context.get("config", None)
    if isinstance(config, ProviderConfig) and config.model_mappings:
        context.model_mapper = ModelMapper(config.model_mappings)
    return context

PluginRegistry

PluginRegistry()

Registry for managing plugin factories and runtime instances.

Source code in ccproxy/core/plugins/factories.py
def __init__(self) -> None:
    """Initialize plugin registry."""
    self.factories: dict[str, PluginFactory] = {}
    self.runtimes: dict[str, Any] = {}
    self.initialization_order: list[str] = []

    # Service management
    self._services: dict[str, Any] = {}
    self._service_providers: dict[str, str] = {}  # service_name -> plugin_name

register_service

register_service(
    service_name, service_instance, provider_plugin
)

Register a service provided by a plugin.

Parameters:

Name Type Description Default
service_name str

Name of the service

required
service_instance Any

Service instance

required
provider_plugin str

Name of the plugin providing the service

required
Source code in ccproxy/core/plugins/factories.py
def register_service(
    self, service_name: str, service_instance: Any, provider_plugin: str
) -> None:
    """Register a service provided by a plugin.

    Args:
        service_name: Name of the service
        service_instance: Service instance
        provider_plugin: Name of the plugin providing the service
    """
    if service_name in self._services:
        logger.warning(
            "service_already_registered",
            service=service_name,
            existing_provider=self._service_providers[service_name],
            new_provider=provider_plugin,
        )
    self._services[service_name] = service_instance
    self._service_providers[service_name] = provider_plugin

get_service

get_service(service_name, service_type=None)

Get a service by name with optional type checking.

Parameters:

Name Type Description Default
service_name str

Name of the service

required
service_type type[T] | None

Optional expected service type

None

Returns:

Type Description
T | None

Service instance or None if not found

Source code in ccproxy/core/plugins/factories.py
def get_service(
    self, service_name: str, service_type: type[T] | None = None
) -> T | None:
    """Get a service by name with optional type checking.

    Args:
        service_name: Name of the service
        service_type: Optional expected service type

    Returns:
        Service instance or None if not found
    """
    service = self._services.get(service_name)
    if service and service_type and not isinstance(service, service_type):
        logger.warning(
            "service_type_mismatch",
            service=service_name,
            expected_type=service_type,
            actual_type=type(service),
        )
        return None
    return service

has_service

has_service(service_name)

Check if a service is registered.

Parameters:

Name Type Description Default
service_name str

Name of the service

required

Returns:

Type Description
bool

True if service is registered

Source code in ccproxy/core/plugins/factories.py
def has_service(self, service_name: str) -> bool:
    """Check if a service is registered.

    Args:
        service_name: Name of the service

    Returns:
        True if service is registered
    """
    return service_name in self._services

get_required_services

get_required_services(plugin_name)

Get required and optional services for a plugin.

Parameters:

Name Type Description Default
plugin_name str

Name of the plugin

required

Returns:

Type Description
tuple[list[str], list[str]]

Tuple of (required_services, optional_services)

Source code in ccproxy/core/plugins/factories.py
def get_required_services(self, plugin_name: str) -> tuple[list[str], list[str]]:
    """Get required and optional services for a plugin.

    Args:
        plugin_name: Name of the plugin

    Returns:
        Tuple of (required_services, optional_services)
    """
    manifest = self.factories[plugin_name].get_manifest()
    return manifest.requires, manifest.optional_requires

register_factory

register_factory(factory)

Register a plugin factory.

Parameters:

Name Type Description Default
factory PluginFactory

Plugin factory to register

required
Source code in ccproxy/core/plugins/factories.py
def register_factory(self, factory: PluginFactory) -> None:
    """Register a plugin factory.

    Args:
        factory: Plugin factory to register
    """
    manifest = factory.get_manifest()

    if manifest.name in self.factories:
        raise ValueError(f"Plugin {manifest.name} already registered")

    self.factories[manifest.name] = factory

get_factory

get_factory(name)

Get a plugin factory by name.

Parameters:

Name Type Description Default
name str

Plugin name

required

Returns:

Type Description
PluginFactory | None

Plugin factory or None

Source code in ccproxy/core/plugins/factories.py
def get_factory(self, name: str) -> PluginFactory | None:
    """Get a plugin factory by name.

    Args:
        name: Plugin name

    Returns:
        Plugin factory or None
    """
    return self.factories.get(name)

get_all_manifests

get_all_manifests()

Get all registered plugin manifests.

Returns:

Type Description
dict[str, PluginManifest]

Dictionary mapping plugin names to manifests

Source code in ccproxy/core/plugins/factories.py
def get_all_manifests(self) -> dict[str, PluginManifest]:
    """Get all registered plugin manifests.

    Returns:
        Dictionary mapping plugin names to manifests
    """
    return {
        name: factory.get_manifest() for name, factory in self.factories.items()
    }

resolve_dependencies

resolve_dependencies(settings)

Resolve plugin dependencies and return initialization order.

Skips plugins with missing hard dependencies or required services instead of failing the entire plugin system. Logs skipped plugins and continues with the rest.

Parameters:

Name Type Description Default
settings Settings

Settings instance

required

Returns:

Type Description
list[str]

List of plugin names in initialization order

Source code in ccproxy/core/plugins/factories.py
def resolve_dependencies(self, settings: "Settings") -> list[str]:
    """Resolve plugin dependencies and return initialization order.

    Skips plugins with missing hard dependencies or required services
    instead of failing the entire plugin system. Logs skipped plugins
    and continues with the rest.

    Args:
        settings: Settings instance

    Returns:
        List of plugin names in initialization order
    """
    manifests = self.get_all_manifests()

    # Start with all plugins available
    available = set(manifests.keys())
    skipped: dict[str, str] = {}

    # Validate format adapter dependencies (latest behavior)
    missing_format_adapters = self._validate_format_adapter_requirements()
    if missing_format_adapters:
        for plugin_name, missing in missing_format_adapters.items():
            logger.error(
                "plugin_missing_format_adapters",
                plugin=plugin_name,
                missing_adapters=missing,
                category="format",
            )
            # Remove plugins with missing format adapter requirements
            available.discard(plugin_name)
            skipped[plugin_name] = f"missing format adapters: {missing}"

    # Iteratively prune plugins with unsatisfied dependencies or services
    while True:
        removed_this_pass: set[str] = set()

        # Compute services provided by currently available plugins
        available_services = {
            service for name in available for service in manifests[name].provides
        }

        for name in sorted(available):
            manifest = manifests[name]

            # Check plugin dependencies
            missing_plugins = [
                dep for dep in manifest.dependencies if dep not in available
            ]
            if missing_plugins:
                removed_this_pass.add(name)
                skipped[name] = f"missing plugin dependencies: {missing_plugins}"
                continue

            # Check required services
            missing_services = manifest.validate_service_dependencies(
                available_services
            )
            if missing_services:
                removed_this_pass.add(name)
                skipped[name] = f"missing required services: {missing_services}"

        if not removed_this_pass:
            break

        # Remove the failing plugins and repeat until stable
        available -= removed_this_pass

    # Before sorting, ensure provider plugins load before consumers by
    # adding provider plugins to the consumer's dependency list.
    # Choose a stable provider (lexicographically first) when multiple exist.
    for name in available:
        manifest = manifests[name]
        for required_service in manifest.requires:
            provider_names = [
                other_name
                for other_name in available
                if required_service in manifests[other_name].provides
            ]
            if provider_names:
                provider_names.sort()
                provider = provider_names[0]
                if provider != name and provider not in manifest.dependencies:
                    manifest.dependencies.append(provider)

    # Kahn's algorithm for topological sort over remaining plugins
    # Build dependency graph restricted to available plugins
    deps: dict[str, list[str]] = {
        name: [dep for dep in manifests[name].dependencies if dep in available]
        for name in available
    }
    in_degree: dict[str, int] = {name: len(deps[name]) for name in available}
    dependents: dict[str, list[str]] = {name: [] for name in available}
    for name, dlist in deps.items():
        for dep in dlist:
            dependents[dep].append(name)

    # Initialize queue with nodes having zero in-degree
    queue = [name for name, deg in in_degree.items() if deg == 0]
    queue.sort()

    order: list[str] = []
    while queue:
        node = queue.pop(0)
        order.append(node)
        for consumer in dependents[node]:
            in_degree[consumer] -= 1
            if in_degree[consumer] == 0:
                queue.append(consumer)
        queue.sort()

    # Any nodes not in order are part of cycles; skip them
    cyclic = [name for name in available if name not in order]
    if cyclic:
        for name in cyclic:
            skipped[name] = "circular dependency"
        logger.error(
            "plugin_dependency_cycle_detected",
            skipped=cyclic,
            category="plugin",
        )

    # Final initialization order excludes skipped and cyclic plugins
    self.initialization_order = order

    if skipped:
        logger.warning(
            "plugins_skipped_due_to_missing_dependencies",
            skipped=skipped,
            category="plugin",
        )

    return order

create_runtime async

create_runtime(name, service_container)

Create and initialize a plugin runtime.

Parameters:

Name Type Description Default
name str

Plugin name

required
service_container ServiceContainer

Service container with all available services

required

Returns:

Type Description
Any

Initialized plugin runtime

Raises:

Type Description
ValueError

If plugin not found

Source code in ccproxy/core/plugins/factories.py
async def create_runtime(
    self, name: str, service_container: "ServiceContainer"
) -> Any:
    """Create and initialize a plugin runtime.

    Args:
        name: Plugin name
        service_container: Service container with all available services

    Returns:
        Initialized plugin runtime

    Raises:
        ValueError: If plugin not found
    """
    factory = self.get_factory(name)
    if not factory:
        raise ValueError(f"Plugin {name} not found")

    # Check if already created
    if name in self.runtimes:
        return self.runtimes[name]

    # Create runtime instance
    runtime = factory.create_runtime()

    # Create context
    context = factory.create_context(service_container)

    # For auth provider plugins, create auth components first so registries are ready
    if isinstance(factory, AuthProviderPluginFactory):
        context.auth_provider = factory.create_auth_provider(context)
        context.token_manager = factory.create_token_manager()
        context.storage = factory.create_storage()
        self._register_auth_manager_with_registry(factory, context)

    # For provider plugins, create additional components (may depend on auth registry)
    if isinstance(factory, ProviderPluginFactory):
        # Create credentials manager and detection service first as adapter may depend on them
        context.detection_service = factory.create_detection_service(context)
        context.credentials_manager = await factory.create_credentials_manager(
            context
        )
        context.adapter = await factory.create_adapter(context)

    # Initialize runtime
    await runtime.initialize(context)

    # Store runtime
    self.runtimes[name] = runtime

    return runtime

initialize_all async

initialize_all(service_container)

Initialize all registered plugins with format adapter support.

Parameters:

Name Type Description Default
service_container ServiceContainer

Service container with all available services

required
Source code in ccproxy/core/plugins/factories.py
async def initialize_all(self, service_container: "ServiceContainer") -> None:
    """Initialize all registered plugins with format adapter support.

    Args:
        service_container: Service container with all available services
    """

    # Resolve dependencies and get initialization order
    settings = service_container.settings
    order = self.resolve_dependencies(settings)

    # Consolidated discovery summary at INFO
    logger.info(
        "plugins_discovered", count=len(order), names=order, category="plugin"
    )

    # Register format adapters from manifests in first pass (latest behavior)
    format_registry = service_container.get_format_registry()
    manifests = self.get_all_manifests()
    for name, manifest in manifests.items():
        if manifest.format_adapters:
            await format_registry.register_from_manifest(manifest, name)
            logger.debug(
                "plugin_format_adapters_registered_from_manifest",
                plugin=name,
                adapter_count=len(manifest.format_adapters),
                category="format",
            )

    # Auth managers are registered when auth provider contexts are constructed

    initialized: list[str] = []
    for name in order:
        try:
            await self.create_runtime(name, service_container)
            initialized.append(name)
        except Exception as e:
            logger.warning(
                "plugin_initialization_failed",
                plugin=name,
                error=str(e),
                exc_info=e,
                category="plugin",
            )
            # Continue with other plugins

    # Registry entries are available immediately; log consolidated summary
    skipped = [n for n in order if n not in initialized]
    logger.info(
        "plugins_initialized",
        count=len(initialized),
        names=initialized,
        skipped=skipped if skipped else [],
        category="plugin",
    )

    # Emit a single hooks summary at the end
    try:
        hook_registry = service_container.get_hook_registry()
        totals: dict[str, int] = {}
        for event_name, hooks in hook_registry.list().items():
            totals[event_name] = len(hooks)
        logger.info(
            "hooks_registered",
            total_events=len(totals),
            by_event_counts=totals,
        )
    except Exception:
        pass

shutdown_all async

shutdown_all()

Shutdown all plugin runtimes in reverse initialization order.

Source code in ccproxy/core/plugins/factories.py
async def shutdown_all(self) -> None:
    """Shutdown all plugin runtimes in reverse initialization order."""
    # Shutdown in reverse order
    for name in reversed(self.initialization_order):
        if name in self.runtimes:
            runtime = self.runtimes[name]
            try:
                await runtime.shutdown()
            except Exception as e:
                logger.error(
                    "plugin_shutdown_failed",
                    plugin=name,
                    error=str(e),
                    exc_info=e,
                    category="plugin",
                )

    # Clear runtimes
    self.runtimes.clear()

get_runtime

get_runtime(name)

Get a plugin runtime by name.

Parameters:

Name Type Description Default
name str

Plugin name

required

Returns:

Type Description
Any | None

Plugin runtime or None

Source code in ccproxy/core/plugins/factories.py
def get_runtime(self, name: str) -> Any | None:
    """Get a plugin runtime by name.

    Args:
        name: Plugin name

    Returns:
        Plugin runtime or None
    """
    return self.runtimes.get(name)

list_plugins

list_plugins()

List all registered plugin names.

Returns:

Type Description
list[str]

List of plugin names

Source code in ccproxy/core/plugins/factories.py
def list_plugins(self) -> list[str]:
    """List all registered plugin names.

    Returns:
        List of plugin names
    """
    return list(self.factories.keys())

list_provider_plugins

list_provider_plugins()

List all registered provider plugin names.

Returns:

Type Description
list[str]

List of provider plugin names

Source code in ccproxy/core/plugins/factories.py
def list_provider_plugins(self) -> list[str]:
    """List all registered provider plugin names.

    Returns:
        List of provider plugin names
    """
    return [
        name
        for name, factory in self.factories.items()
        if factory.get_manifest().is_provider
    ]

AuthProviderPluginFactory

AuthProviderPluginFactory(manifest)

Bases: BasePluginFactory

Factory for authentication provider plugins.

Auth provider plugins provide OAuth authentication flows and token management without directly proxying requests to API providers.

Parameters:

Name Type Description Default
manifest PluginManifest

Plugin manifest

required
Source code in ccproxy/core/plugins/interfaces.py
def __init__(self, manifest: PluginManifest):
    """Initialize auth provider plugin factory.

    Args:
        manifest: Plugin manifest
    """
    # Local import to avoid circular dependency at module load time
    from .runtime import AuthProviderPluginRuntime

    super().__init__(manifest, AuthProviderPluginRuntime)

    # Validate this is marked as a provider plugin (auth providers are a type of provider)
    if not manifest.is_provider:
        raise ValueError(
            f"Plugin {manifest.name} must be marked as provider for AuthProviderPluginFactory"
        )

create_context

create_context(service_container)

Create context with auth provider-specific components.

Parameters:

Name Type Description Default
core_services

Core services container

required

Returns:

Type Description
PluginContext

Plugin context with auth provider components

Source code in ccproxy/core/plugins/interfaces.py
def create_context(self, service_container: "ServiceContainer") -> PluginContext:
    """Create context with auth provider-specific components.

    Args:
        core_services: Core services container

    Returns:
        Plugin context with auth provider components
    """
    # Start with base context
    context = super().create_context(service_container)

    # Auth provider plugins need to create their auth components
    # This is typically done in the specific plugin factory implementation

    return context

get_auth_manager_registry_name

get_auth_manager_registry_name()

Return registry key used for this auth manager.

Source code in ccproxy/core/plugins/interfaces.py
def get_auth_manager_registry_name(self) -> str:
    """Return registry key used for this auth manager."""

    name = getattr(self, "auth_manager_name", None)
    return name or self.manifest.name

create_auth_provider abstractmethod

create_auth_provider(context=None)

Create the OAuth provider for this auth plugin.

Parameters:

Name Type Description Default
context PluginContext | None

Optional plugin context for initialization

None

Returns:

Type Description
OAuthProviderProtocol

OAuth provider instance implementing OAuthProviderProtocol

Source code in ccproxy/core/plugins/interfaces.py
@abstractmethod
def create_auth_provider(
    self, context: PluginContext | None = None
) -> OAuthProviderProtocol:
    """Create the OAuth provider for this auth plugin.

    Args:
        context: Optional plugin context for initialization

    Returns:
        OAuth provider instance implementing  OAuthProviderProtocol
    """
    ...

create_token_manager

create_token_manager()

Create the token manager for this auth plugin.

Returns:

Type Description
BaseTokenManager[Any] | None

Token manager instance or None if not needed

Source code in ccproxy/core/plugins/interfaces.py
def create_token_manager(self) -> BaseTokenManager[Any] | None:
    """Create the token manager for this auth plugin.

    Returns:
        Token manager instance or None if not needed
    """
    return None

create_storage

create_storage()

Create the storage implementation for this auth plugin.

Returns:

Type Description
TokenStorage[Any] | None

Storage instance or None if using default

Source code in ccproxy/core/plugins/interfaces.py
def create_storage(self) -> TokenStorage[Any] | None:
    """Create the storage implementation for this auth plugin.

    Returns:
        Storage instance or None if using default
    """
    return None

BasePluginFactory

BasePluginFactory(manifest, runtime_class)

Bases: PluginFactory

Base implementation of plugin factory.

This class provides common functionality for creating plugin runtime instances from manifests.

Parameters:

Name Type Description Default
manifest PluginManifest

Plugin manifest

required
runtime_class type[Any]

Runtime class to instantiate

required
Source code in ccproxy/core/plugins/interfaces.py
def __init__(self, manifest: PluginManifest, runtime_class: type[Any]):
    """Initialize factory with manifest and runtime class.

    Args:
        manifest: Plugin manifest
        runtime_class: Runtime class to instantiate
    """
    self.manifest = manifest
    self.runtime_class = runtime_class

get_manifest

get_manifest()

Get the plugin manifest.

Source code in ccproxy/core/plugins/interfaces.py
def get_manifest(self) -> PluginManifest:
    """Get the plugin manifest."""
    return self.manifest

create_runtime

create_runtime()

Create a runtime instance.

Source code in ccproxy/core/plugins/interfaces.py
def create_runtime(self) -> Any:
    """Create a runtime instance."""
    return self.runtime_class(self.manifest)

create_context

create_context(service_container)

Create base context for plugin initialization.

Parameters:

Name Type Description Default
service_container ServiceContainer

Service container with all available services

required

Returns:

Type Description
PluginContext

Plugin context with base services

Source code in ccproxy/core/plugins/interfaces.py
def create_context(self, service_container: "ServiceContainer") -> PluginContext:
    """Create base context for plugin initialization.

    Args:
        service_container: Service container with all available services

    Returns:
        Plugin context with base services
    """
    context = PluginContext()

    # Set core services
    context.settings = service_container.settings
    context.http_pool_manager = service_container.get_pool_manager()
    context.logger = structlog.get_logger().bind(plugin=self.manifest.name)

    # Add explicit dependency injection services
    context.request_tracer = service_container.get_request_tracer()
    context.streaming_handler = cast(
        "StreamingMetrics", service_container.get_streaming_handler()
    )
    context.metrics = None  # Will be set by plugins if needed

    # Add CLI detection service
    context.cli_detection_service = service_container.get_cli_detection_service()

    # Add scheduler - not available in ServiceContainer, get from app state
    context.scheduler = None  # Will be set from app.state if needed

    # Add plugin registry - not directly in ServiceContainer, get from app state
    context.plugin_registry = None  # Will be set from app.state

    # Add OAuth registry for auth providers
    context.oauth_registry = service_container.get_oauth_registry()

    # Add hook registry and manager
    context.hook_registry = service_container.get_hook_registry()

    # Provide runtime helpers when available in the container
    try:
        from ccproxy.core.plugins.hooks.manager import HookManager

        context.hook_manager = service_container.get_service(HookManager)
    except (ValueError, ImportError):
        context.hook_manager = None

    try:
        context.app = service_container.get_service(FastAPI)
    except ValueError:
        context.app = None

    # Add service container directly
    context.service_container = service_container

    # Add plugin-specific config if available
    # ServiceContainer doesn't have get_plugin_config, so we'll get it from settings directly
    if self.manifest.config_class:
        plugin_config = service_container.settings.plugins.get(self.manifest.name)

        try:
            if plugin_config is None:
                # No explicit config provided; instantiate defaults
                context.config = self.manifest.config_class()
            else:
                # Validate (even if empty dict) to honor model defaults
                validated_config = self.manifest.config_class.model_validate(
                    plugin_config
                )
                context.config = validated_config
        except Exception as exc:  # pragma: no cover - defensive safety
            logger.warning(
                "plugin_config_initialization_failed",
                plugin=self.manifest.name,
                error=str(exc),
            )
            raise

    # Add format registry
    context.format_registry = service_container.get_format_registry()

    return context

PluginFactory

Bases: ABC

Abstract factory for creating plugin runtime instances.

Each plugin must provide a factory that knows how to create its runtime instance from its manifest.

get_manifest abstractmethod

get_manifest()

Get the plugin manifest with static declarations.

Returns:

Type Description
PluginManifest

Plugin manifest

Source code in ccproxy/core/plugins/interfaces.py
@abstractmethod
def get_manifest(self) -> PluginManifest:
    """Get the plugin manifest with static declarations.

    Returns:
        Plugin manifest
    """
    ...

create_runtime abstractmethod

create_runtime()

Create a runtime instance for this plugin.

Returns:

Type Description
Any

Plugin runtime instance

Source code in ccproxy/core/plugins/interfaces.py
@abstractmethod
def create_runtime(self) -> Any:
    """Create a runtime instance for this plugin.

    Returns:
        Plugin runtime instance
    """
    ...

create_context abstractmethod

create_context(core_services)

Create the context for plugin initialization.

Parameters:

Name Type Description Default
core_services ServiceContainer

Core services container

required

Returns:

Type Description
PluginContext

Plugin context with required services

Source code in ccproxy/core/plugins/interfaces.py
@abstractmethod
def create_context(self, core_services: "ServiceContainer") -> PluginContext:
    """Create the context for plugin initialization.

    Args:
        core_services: Core services container

    Returns:
        Plugin context with required services
    """
    ...

ProviderPluginFactory

ProviderPluginFactory(manifest)

Bases: BasePluginFactory

Factory for provider plugins.

Provider plugins require additional components like adapters and detection services that must be created during initialization.

Parameters:

Name Type Description Default
manifest PluginManifest

Plugin manifest

required
Source code in ccproxy/core/plugins/interfaces.py
def __init__(self, manifest: PluginManifest):
    """Initialize provider plugin factory.

    Args:
        manifest: Plugin manifest
    """
    # Local import to avoid circular dependency at module load time
    from .runtime import ProviderPluginRuntime

    super().__init__(manifest, ProviderPluginRuntime)

    # Validate this is a provider plugin
    if not manifest.is_provider:
        raise ValueError(
            f"Plugin {manifest.name} is not marked as provider but using ProviderPluginFactory"
        )

create_context

create_context(service_container)

Create context with provider-specific components.

Parameters:

Name Type Description Default
core_services

Core services container

required

Returns:

Type Description
PluginContext

Plugin context with provider components

Source code in ccproxy/core/plugins/interfaces.py
def create_context(self, service_container: "ServiceContainer") -> PluginContext:
    """Create context with provider-specific components.

    Args:
        core_services: Core services container

    Returns:
        Plugin context with provider components
    """
    # Start with base context
    context = super().create_context(service_container)

    # Provider plugins need to create their own adapter and detection service
    # This is typically done in the specific plugin factory implementation
    # Here we just ensure the structure is correct

    return context

create_adapter abstractmethod async

create_adapter(context)

Create the adapter for this provider.

Parameters:

Name Type Description Default
context PluginContext

Plugin context

required

Returns:

Type Description
Any

Provider adapter instance

Source code in ccproxy/core/plugins/interfaces.py
@abstractmethod
async def create_adapter(self, context: PluginContext) -> Any:
    """Create the adapter for this provider.

    Args:
        context: Plugin context

    Returns:
        Provider adapter instance
    """
    ...

create_detection_service abstractmethod

create_detection_service(context)

Create the detection service for this provider.

Parameters:

Name Type Description Default
context PluginContext

Plugin context

required

Returns:

Type Description
DetectionServiceProtocol | None

Detection service instance or None

Source code in ccproxy/core/plugins/interfaces.py
@abstractmethod
def create_detection_service(
    self, context: PluginContext
) -> DetectionServiceProtocol | None:
    """Create the detection service for this provider.

    Args:
        context: Plugin context

    Returns:
        Detection service instance or None
    """
    ...

create_credentials_manager abstractmethod async

create_credentials_manager(context)

Create the credentials manager for this provider.

Parameters:

Name Type Description Default
context PluginContext

Plugin context

required

Returns:

Type Description
BaseTokenManager[Any] | None

Credentials manager instance or None

Source code in ccproxy/core/plugins/interfaces.py
@abstractmethod
async def create_credentials_manager(
    self, context: PluginContext
) -> BaseTokenManager[Any] | None:
    """Create the credentials manager for this provider.

    Args:
        context: Plugin context

    Returns:
        Credentials manager instance or None
    """
    ...

SystemPluginFactory

SystemPluginFactory(manifest)

Bases: BasePluginFactory

Factory for system plugins.

Parameters:

Name Type Description Default
manifest PluginManifest

Plugin manifest

required
Source code in ccproxy/core/plugins/interfaces.py
def __init__(self, manifest: PluginManifest):
    """Initialize system plugin factory.

    Args:
        manifest: Plugin manifest
    """
    # Local import to avoid circular dependency at module load time
    from .runtime import SystemPluginRuntime

    super().__init__(manifest, SystemPluginRuntime)

    # Validate this is a system plugin
    if manifest.is_provider:
        raise ValueError(
            f"Plugin {manifest.name} is marked as provider but using SystemPluginFactory"
        )

CoreMiddlewareSpec dataclass

CoreMiddlewareSpec(
    middleware_class,
    priority=APPLICATION,
    kwargs=dict(),
    source="core",
)

Bases: MiddlewareSpec

Specification for core application middleware.

Extends MiddlewareSpec with a source field to distinguish between core and plugin middleware.

MiddlewareManager

MiddlewareManager()

Manages middleware registration and ordering.

Source code in ccproxy/core/plugins/middleware.py
def __init__(self) -> None:
    """Initialize middleware manager."""
    self.middleware_specs: list[CoreMiddlewareSpec] = []

add_core_middleware

add_core_middleware(
    middleware_class, priority=APPLICATION, **kwargs
)

Add core application middleware.

Parameters:

Name Type Description Default
middleware_class type[BaseHTTPMiddleware]

Middleware class

required
priority int

Priority for ordering

APPLICATION
**kwargs Any

Additional middleware arguments

{}
Source code in ccproxy/core/plugins/middleware.py
def add_core_middleware(
    self,
    middleware_class: type[BaseHTTPMiddleware],
    priority: int = MiddlewareLayer.APPLICATION,
    **kwargs: Any,
) -> None:
    """Add core application middleware.

    Args:
        middleware_class: Middleware class
        priority: Priority for ordering
        **kwargs: Additional middleware arguments
    """
    spec = CoreMiddlewareSpec(
        middleware_class=middleware_class,
        priority=priority,
        kwargs=kwargs,
        source="core",
    )
    self.middleware_specs.append(spec)

add_plugin_middleware

add_plugin_middleware(plugin_name, specs)

Add middleware from a plugin.

Parameters:

Name Type Description Default
plugin_name str

Name of the plugin

required
specs list[MiddlewareSpec]

List of middleware specifications

required
Source code in ccproxy/core/plugins/middleware.py
def add_plugin_middleware(
    self, plugin_name: str, specs: list[MiddlewareSpec]
) -> None:
    """Add middleware from a plugin.

    Args:
        plugin_name: Name of the plugin
        specs: List of middleware specifications
    """
    for spec in specs:
        core_spec = CoreMiddlewareSpec(
            middleware_class=spec.middleware_class,
            priority=spec.priority,
            kwargs=spec.kwargs,
            source=plugin_name,
        )
        self.middleware_specs.append(core_spec)
        logger.trace(
            "plugin_middleware_added",
            plugin=plugin_name,
            middleware=spec.middleware_class.__name__,
            priority=spec.priority,
            category="middleware",
        )

get_ordered_middleware

get_ordered_middleware()

Get all middleware sorted by priority.

Returns:

Type Description
list[CoreMiddlewareSpec]

List of middleware specs sorted by priority (lower first)

Source code in ccproxy/core/plugins/middleware.py
def get_ordered_middleware(self) -> list[CoreMiddlewareSpec]:
    """Get all middleware sorted by priority.

    Returns:
        List of middleware specs sorted by priority (lower first)
    """
    # Sort by priority (lower values first)
    # Secondary sort by source (core before plugins) for same priority
    return sorted(
        self.middleware_specs,
        key=lambda x: (x.priority, x.source != "core", x.source),
    )

apply_to_app

apply_to_app(app)

Apply all middleware to the FastAPI app in correct order.

Note: Middleware in FastAPI/Starlette is applied in reverse order (last added runs first), so we add them in reverse priority order.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required
Source code in ccproxy/core/plugins/middleware.py
def apply_to_app(self, app: FastAPI) -> None:
    """Apply all middleware to the FastAPI app in correct order.

    Note: Middleware in FastAPI/Starlette is applied in reverse order
    (last added runs first), so we add them in reverse priority order.

    Args:
        app: FastAPI application
    """
    ordered = self.get_ordered_middleware()
    applied_middleware = []
    failed_middleware = []

    # Apply in reverse order (highest priority last so it runs first)
    for spec in reversed(ordered):
        try:
            app.add_middleware(spec.middleware_class, **spec.kwargs)  # type: ignore[arg-type]
            applied_middleware.append(
                {
                    "name": spec.middleware_class.__name__,
                    "priority": spec.priority,
                    "source": spec.source,
                }
            )
        except Exception as e:
            failed_middleware.append(
                {
                    "name": spec.middleware_class.__name__,
                    "source": spec.source,
                    "error": str(e),
                }
            )
            logger.error(
                "middleware_application_failed",
                middleware=spec.middleware_class.__name__,
                source=spec.source,
                error=str(e),
                exc_info=e,
                category="middleware",
            )

    # Log aggregated success
    if applied_middleware:
        logger.info(
            "middleware_stack_configured",
            applied=len(applied_middleware),
            failed=len(failed_middleware),
            middleware=[m["name"] for m in applied_middleware],
            category="middleware",
        )

get_middleware_summary

get_middleware_summary()

Get a summary of registered middleware.

Returns:

Type Description
dict[str, Any]

Dictionary with middleware statistics and order

Source code in ccproxy/core/plugins/middleware.py
def get_middleware_summary(self) -> dict[str, Any]:
    """Get a summary of registered middleware.

    Returns:
        Dictionary with middleware statistics and order
    """
    ordered = self.get_ordered_middleware()

    summary = {
        "total": len(ordered),
        "core": len([m for m in ordered if m.source == "core"]),
        "plugins": len([m for m in ordered if m.source != "core"]),
        "order": [
            {
                "name": spec.middleware_class.__name__,
                "priority": spec.priority,
                "layer": self._get_layer_name(spec.priority),
                "source": spec.source,
            }
            for spec in ordered
        ],
    }

    return summary

AuthProviderPluginRuntime

AuthProviderPluginRuntime(manifest)

Bases: BasePluginRuntime

Runtime for authentication provider plugins.

Auth provider plugins provide OAuth authentication flows and token management for various API providers without directly proxying requests.

Parameters:

Name Type Description Default
manifest PluginManifest

Plugin manifest with static declarations

required
Source code in ccproxy/core/plugins/runtime.py
def __init__(self, manifest: PluginManifest):
    """Initialize auth provider plugin runtime.

    Args:
        manifest: Plugin manifest with static declarations
    """
    super().__init__(manifest)
    self.auth_provider: Any | None = None  # OAuthProviderProtocol
    self.token_manager: Any | None = None
    self.storage: Any | None = None

BasePluginRuntime

BasePluginRuntime(manifest)

Bases: PluginRuntimeProtocol

Base implementation of plugin runtime.

This class provides common functionality for all plugin runtimes. Specific plugin types (system, provider) can extend this base class.

Parameters:

Name Type Description Default
manifest PluginManifest

Plugin manifest with static declarations

required
Source code in ccproxy/core/plugins/runtime.py
def __init__(self, manifest: PluginManifest):
    """Initialize runtime with manifest.

    Args:
        manifest: Plugin manifest with static declarations
    """
    self.manifest = manifest
    self.context: PluginContext | None = None
    self.initialized = False

name property

name

Plugin name from manifest.

version property

version

Plugin version from manifest.

initialize async

initialize(context)

Initialize the plugin with runtime context.

Parameters:

Name Type Description Default
context PluginContext

Runtime context with services and configuration

required
Source code in ccproxy/core/plugins/runtime.py
async def initialize(self, context: PluginContext) -> None:
    """Initialize the plugin with runtime context.

    Args:
        context: Runtime context with services and configuration
    """
    if self.initialized:
        logger.warning(
            "plugin_already_initialized", plugin=self.name, category="plugin"
        )
        return

    self.context = context

    # Allow subclasses to perform custom initialization
    await self._on_initialize()

    self.initialized = True
    logger.debug(
        "plugin_initialized",
        plugin=self.name,
        version=self.version,
        category="plugin",
    )

shutdown async

shutdown()

Cleanup on shutdown.

Source code in ccproxy/core/plugins/runtime.py
async def shutdown(self) -> None:
    """Cleanup on shutdown."""
    if not self.initialized:
        return

    # Allow subclasses to perform custom cleanup
    await self._on_shutdown()

    self.initialized = False
    logger.info("plugin_shutdown", plugin=self.name, category="plugin")

validate async

validate()

Validate plugin is ready.

Returns:

Type Description
bool

True if plugin is ready, False otherwise

Source code in ccproxy/core/plugins/runtime.py
async def validate(self) -> bool:
    """Validate plugin is ready.

    Returns:
        True if plugin is ready, False otherwise
    """
    # Basic validation - plugin is initialized
    if not self.initialized:
        return False

    # Allow subclasses to add custom validation
    return await self._on_validate()

health_check async

health_check()

Perform health check.

Returns:

Type Description
dict[str, Any]

Health check result following IETF format

Source code in ccproxy/core/plugins/runtime.py
async def health_check(self) -> dict[str, Any]:
    """Perform health check.

    Returns:
        Health check result following IETF format
    """
    try:
        # Start with basic health check
        is_healthy = await self.validate()

        # Allow subclasses to provide detailed health info
        details = await self._get_health_details()

        return {
            "status": "pass" if is_healthy else "fail",
            "componentId": self.name,
            "componentType": "provider_plugin"
            if self.manifest.is_provider
            else "system_plugin",
            "version": self.version,
            "details": details,
        }
    except Exception as e:
        logger.error(
            "plugin_health_check_failed",
            plugin=self.name,
            error=str(e),
            exc_info=e,
            category="plugin",
        )
        return {
            "status": "fail",
            "componentId": self.name,
            "componentType": "provider_plugin"
            if self.manifest.is_provider
            else "system_plugin",
            "version": self.version,
            "output": str(e),
        }

ProviderPluginRuntime

ProviderPluginRuntime(manifest)

Bases: BasePluginRuntime

Runtime for provider plugins.

Provider plugins proxy requests to external API providers and require additional components like adapters and detection services.

Parameters:

Name Type Description Default
manifest PluginManifest

Plugin manifest with static declarations

required
Source code in ccproxy/core/plugins/runtime.py
def __init__(self, manifest: PluginManifest):
    """Initialize provider plugin runtime.

    Args:
        manifest: Plugin manifest with static declarations
    """
    super().__init__(manifest)
    self.adapter: Any | None = None  # BaseAdapter
    self.detection_service: Any | None = None
    self.credentials_manager: Any | None = None

SystemPluginRuntime

SystemPluginRuntime(manifest)

Bases: BasePluginRuntime

Runtime for system plugins (non-provider plugins).

System plugins provide functionality like logging, monitoring, permissions, etc., but don't proxy to external providers.

Source code in ccproxy/core/plugins/runtime.py
def __init__(self, manifest: PluginManifest):
    """Initialize runtime with manifest.

    Args:
        manifest: Plugin manifest with static declarations
    """
    self.manifest = manifest
    self.context: PluginContext | None = None
    self.initialized = False

factory_type_name

factory_type_name(factory)

Return a stable type name for a plugin factory.

Returns one of: "auth_provider", "provider", "system", or "plugin" (fallback).

Source code in ccproxy/core/plugins/interfaces.py
def factory_type_name(factory: PluginFactory) -> str:
    """Return a stable type name for a plugin factory.

    Returns one of: "auth_provider", "provider", "system", or "plugin" (fallback).
    """
    try:
        if isinstance(factory, AuthProviderPluginFactory):
            return "auth_provider"
        if isinstance(factory, ProviderPluginFactory):
            return "provider"
        if isinstance(factory, SystemPluginFactory):
            return "system"
    except Exception:
        pass
    return "plugin"

load_cli_plugins

load_cli_plugins(
    settings, auth_provider=None, allow_plugins=None
)

Load filtered plugins for CLI operations.

This function creates a lightweight plugin registry for CLI commands that: - Includes only CLI-safe plugins (marked with cli_safe = True) - Optionally includes a specific auth provider plugin if requested - Excludes heavy provider plugins that cause DuckDB locks, task manager errors, etc.

Parameters:

Name Type Description Default
settings Any

Application settings

required
auth_provider str | None

Name of auth provider to include (e.g., "codex", "claude-api")

None
allow_plugins list[str] | None

Additional plugins to explicitly allow (beyond cli_safe ones)

None

Returns:

Type Description
PluginRegistry

Filtered PluginRegistry containing only CLI-appropriate plugins

Source code in ccproxy/core/plugins/loader.py
def load_cli_plugins(
    settings: Any,
    auth_provider: str | None = None,
    allow_plugins: list[str] | None = None,
) -> PluginRegistry:
    """Load filtered plugins for CLI operations.

    This function creates a lightweight plugin registry for CLI commands that:
    - Includes only CLI-safe plugins (marked with cli_safe = True)
    - Optionally includes a specific auth provider plugin if requested
    - Excludes heavy provider plugins that cause DuckDB locks, task manager errors, etc.

    Args:
        settings: Application settings
        auth_provider: Name of auth provider to include (e.g., "codex", "claude-api")
        allow_plugins: Additional plugins to explicitly allow (beyond cli_safe ones)

    Returns:
        Filtered PluginRegistry containing only CLI-appropriate plugins
    """
    # Discover all available factories
    all_factories: dict[str, PluginFactory] = discover_and_load_plugins(settings)

    # Start with CLI-safe plugins
    cli_factories: dict[str, PluginFactory] = {}

    for name, factory in all_factories.items():
        # Include plugins explicitly marked as CLI-safe
        if getattr(factory, "cli_safe", False):
            cli_factories[name] = factory

    # Add specific auth provider if requested
    if auth_provider:
        auth_plugin_name = _resolve_auth_provider_plugin_name(auth_provider)
        if auth_plugin_name and auth_plugin_name in all_factories:
            cli_factories[auth_plugin_name] = all_factories[auth_plugin_name]
        else:
            logger.warning(
                "auth_provider_not_found",
                provider=auth_provider,
                resolved_name=auth_plugin_name,
                available_auth_providers=[
                    name
                    for name, factory in all_factories.items()
                    if isinstance(factory, AuthProviderPluginFactory)
                ],
            )

    # Add explicitly allowed plugins
    if allow_plugins:
        for plugin_name in allow_plugins:
            if plugin_name in all_factories and plugin_name not in cli_factories:
                cli_factories[plugin_name] = all_factories[plugin_name]

    # Create filtered registry
    registry = PluginRegistry()
    for _name, factory in cli_factories.items():
        registry.register_factory(factory)

    logger.debug(
        "cli_plugin_system_loaded",
        total_available=len(all_factories),
        cli_safe_count=len(
            [f for f in all_factories.values() if getattr(f, "cli_safe", False)]
        ),
        loaded_count=len(cli_factories),
        loaded_plugins=list(cli_factories.keys()),
        auth_provider=auth_provider,
        allow_plugins=allow_plugins or [],
        category="plugin",
    )

    return registry

load_plugin_system

load_plugin_system(settings)

Discover plugins and build a registry + middleware manager.

This function is the single entry point to set up the plugin layer for the application factory. It avoids scattering discovery/registry logic.

Parameters:

Name Type Description Default
settings Settings

Application settings (with plugin config)

required

Returns:

Type Description
tuple[PluginRegistry, MiddlewareManager]

Tuple of (PluginRegistry, MiddlewareManager)

Source code in ccproxy/core/plugins/loader.py
def load_plugin_system(settings: Settings) -> tuple[PluginRegistry, MiddlewareManager]:
    """Discover plugins and build a registry + middleware manager.

    This function is the single entry point to set up the plugin layer for
    the application factory. It avoids scattering discovery/registry logic.

    Args:
        settings: Application settings (with plugin config)

    Returns:
        Tuple of (PluginRegistry, MiddlewareManager)
    """
    # Discover factories (filesystem + entry points) with existing helper
    factories: dict[str, PluginFactory] = discover_and_load_plugins(settings)

    # Create registry and register all factories
    registry = PluginRegistry()
    for _name, factory in factories.items():
        registry.register_factory(factory)

    # Prepare middleware manager; plugins will populate via manifests during
    # app creation (manifest population stage) and at runtime as needed
    middleware_manager = MiddlewareManager()

    logger.debug(
        "plugin_system_loaded",
        factory_count=len(factories),
        plugins=list(factories.keys()),
        category="plugin",
    )

    return registry, middleware_manager

setup_default_middleware

setup_default_middleware(manager)

Setup default core middleware.

Parameters:

Name Type Description Default
manager MiddlewareManager

Middleware manager

required
Source code in ccproxy/core/plugins/middleware.py
def setup_default_middleware(manager: MiddlewareManager) -> None:
    """Setup default core middleware.

    Args:
        manager: Middleware manager
    """
    from ccproxy.api.middleware.hooks import HooksMiddleware
    from ccproxy.api.middleware.normalize_headers import NormalizeHeadersMiddleware
    from ccproxy.api.middleware.request_id import RequestIDMiddleware

    # Request ID should be first (lowest priority) to set context for all others
    manager.add_core_middleware(
        RequestIDMiddleware,
        priority=MiddlewareLayer.SECURITY - 50,  # Before security layer
    )

    # Hooks middleware should be early to capture all requests
    manager.add_core_middleware(
        HooksMiddleware,
        priority=MiddlewareLayer.SECURITY
        - 40,  # After request ID, before other middleware
    )

    # # Access logging in observability layer
    # manager.add_core_middleware(
    #     AccessLogMiddleware, priority=MiddlewareLayer.OBSERVABILITY
    # )
    #
    # Normalize headers: strip unsafe and ensure server header
    manager.add_core_middleware(
        NormalizeHeadersMiddleware,  # type: ignore[arg-type]
        priority=MiddlewareLayer.ROUTING,  # after routing layer
    )

    logger.debug("default_middleware_configured", category="middleware")