Skip to content

ccproxy.api.app

ccproxy.api.app

FastAPI application factory for CCProxy API Server with plugin system.

merge_router_tags

merge_router_tags(
    router, spec_tags=None, default_tags=None
)

Merge router tags with spec tags, removing duplicates while preserving order.

Parameters:

Name Type Description Default
router APIRouter

FastAPI router instance

required
spec_tags list[str] | None

Tags from route specification

None
default_tags list[str] | None

Fallback tags if no other tags exist

None

Returns:

Type Description
list[str | Enum] | None

Deduplicated list of tags, or None if no tags

Source code in ccproxy/api/app.py
def merge_router_tags(
    router: APIRouter,
    spec_tags: list[str] | None = None,
    default_tags: list[str] | None = None,
) -> list[str | Enum] | None:
    """Merge router tags with spec tags, removing duplicates while preserving order.

    Args:
        router: FastAPI router instance
        spec_tags: Tags from route specification
        default_tags: Fallback tags if no other tags exist

    Returns:
        Deduplicated list of tags, or None if no tags
    """
    router_tags: list[str | Enum] = list(router.tags) if router.tags else []
    spec_tags_list: list[str | Enum] = list(spec_tags) if spec_tags else []
    default_tags_list: list[str | Enum] = list(default_tags) if default_tags else []

    # Only use defaults if no other tags exist
    if not router_tags and not spec_tags_list and default_tags_list:
        return default_tags_list

    # Merge all non-default tags and deduplicate
    all_tags: list[str | Enum] = router_tags + spec_tags_list
    if not all_tags:
        return None

    # Deduplicate by string value while preserving order
    unique: list[str | Enum] = []
    seen: set[str] = set()
    for t in all_tags:
        s = str(t)
        if s not in seen:
            seen.add(s)
            unique.append(t)
    return unique

setup_task_manager_startup async

setup_task_manager_startup(app, settings)

Start the async task manager.

Source code in ccproxy/api/app.py
async def setup_task_manager_startup(app: FastAPI, settings: Settings) -> None:
    """Start the async task manager."""
    container: ServiceContainer = app.state.service_container
    await start_task_manager(container=container)
    logger.debug("task_manager_startup_completed", category="lifecycle")

setup_task_manager_shutdown async

setup_task_manager_shutdown(app)

Stop the async task manager.

Source code in ccproxy/api/app.py
async def setup_task_manager_shutdown(app: FastAPI) -> None:
    """Stop the async task manager."""
    container: ServiceContainer = app.state.service_container
    await stop_task_manager(container=container)
    logger.debug("task_manager_shutdown_completed", category="lifecycle")

setup_service_container_shutdown async

setup_service_container_shutdown(app)

Close the service container and its resources.

Source code in ccproxy/api/app.py
async def setup_service_container_shutdown(app: FastAPI) -> None:
    """Close the service container and its resources."""
    if hasattr(app.state, "service_container"):
        service_container = app.state.service_container
        await service_container.shutdown()

initialize_plugins_startup async

initialize_plugins_startup(app, settings)

Initialize plugins during startup (runtime phase).

Source code in ccproxy/api/app.py
async def initialize_plugins_startup(app: FastAPI, settings: Settings) -> None:
    """Initialize plugins during startup (runtime phase)."""
    if not settings.enable_plugins:
        logger.info("plugin_system_disabled", category="lifecycle")
        return

    if not hasattr(app.state, "plugin_registry"):
        logger.warning("plugin_registry_not_found", category="lifecycle")
        return

    plugin_registry: PluginRegistry = app.state.plugin_registry
    service_container: ServiceContainer = app.state.service_container

    hook_registry = service_container.get_hook_registry()
    background_thread_manager = service_container.get_background_hook_thread_manager()
    hook_manager = HookManager(hook_registry, background_thread_manager)
    app.state.hook_registry = hook_registry
    app.state.hook_manager = hook_manager
    service_container.register_service(HookManager, instance=hook_manager)

    # StreamingHandler now requires HookManager at construction via DI factory,
    # so no post-hoc patching is needed here.

    # Perform manifest population with access to http_pool_manager
    # This allows plugins to modify their manifests during context creation
    for plugin_name, factory in plugin_registry.factories.items():
        try:
            factory.create_context(service_container)
        except Exception as e:
            logger.warning(
                "plugin_context_creation_failed",
                plugin=plugin_name,
                error=str(e),
                exc_info=e,
                category="plugin",
            )
            # Continue with other plugins

    await plugin_registry.initialize_all(service_container)

shutdown_plugins async

shutdown_plugins(app)

Shutdown plugins.

Source code in ccproxy/api/app.py
async def shutdown_plugins(app: FastAPI) -> None:
    """Shutdown plugins."""
    if hasattr(app.state, "plugin_registry"):
        plugin_registry: PluginRegistry = app.state.plugin_registry
        await plugin_registry.shutdown_all()
        logger.debug("plugins_shutdown_completed", category="lifecycle")

shutdown_hook_system async

shutdown_hook_system(app)

Shutdown the hook system and background thread.

Source code in ccproxy/api/app.py
async def shutdown_hook_system(app: FastAPI) -> None:
    """Shutdown the hook system and background thread."""
    try:
        # Get hook manager from app state - it will shutdown its own background manager
        hook_manager = getattr(app.state, "hook_manager", None)
        if hook_manager:
            hook_manager.shutdown()

        logger.debug("hook_system_shutdown_completed", category="lifecycle")
    except Exception as e:
        logger.error(
            "hook_system_shutdown_failed",
            error=str(e),
            category="lifecycle",
        )

initialize_hooks_startup async

initialize_hooks_startup(app, settings)

Initialize hook system with plugins.

Source code in ccproxy/api/app.py
async def initialize_hooks_startup(app: FastAPI, settings: Settings) -> None:
    """Initialize hook system with plugins."""
    if hasattr(app.state, "hook_registry") and hasattr(app.state, "hook_manager"):
        hook_registry = app.state.hook_registry
        hook_manager = app.state.hook_manager
        logger.debug("hook_system_already_created", category="lifecycle")
    else:
        service_container: ServiceContainer = app.state.service_container
        hook_registry = service_container.get_hook_registry()
        background_thread_manager = (
            service_container.get_background_hook_thread_manager()
        )
        hook_manager = HookManager(hook_registry, background_thread_manager)
        app.state.hook_registry = hook_registry
        app.state.hook_manager = hook_manager

    # Register plugin hooks
    if hasattr(app.state, "plugin_registry"):
        plugin_registry: PluginRegistry = app.state.plugin_registry

        for name, factory in plugin_registry.factories.items():
            manifest = factory.get_manifest()
            for hook_spec in manifest.hooks:
                try:
                    hook_instance = hook_spec.hook_class(**hook_spec.kwargs)
                    hook_registry.register(hook_instance)
                    logger.debug(
                        "plugin_hook_registered",
                        plugin_name=name,
                        hook_class=hook_spec.hook_class.__name__,
                        category="lifecycle",
                    )
                except Exception as e:
                    logger.error(
                        "plugin_hook_registration_failed",
                        plugin_name=name,
                        hook_class=hook_spec.hook_class.__name__,
                        error=str(e),
                        exc_info=e,
                        category="lifecycle",
                    )

    try:
        await hook_manager.emit(HookEvent.APP_STARTUP, {"phase": "startup"})
    except Exception as e:
        logger.error(
            "startup_hook_failed", error=str(e), exc_info=e, category="lifecycle"
        )

lifespan async

lifespan(app)

Application lifespan manager using component-based approach.

Source code in ccproxy/api/app.py
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    """Application lifespan manager using component-based approach."""
    service_container: ServiceContainer = app.state.service_container
    settings = service_container.get_service(Settings)
    logger.info(
        "server_starting",
        host=settings.server.host,
        port=settings.server.port,
        url=f"http://{settings.server.host}:{settings.server.port}",
        category="lifecycle",
    )
    # Demote granular config detail to DEBUG
    logger.debug(
        "server_configured",
        host=settings.server.host,
        port=settings.server.port,
        category="config",
    )

    for component in LIFECYCLE_COMPONENTS:
        if component["startup"]:
            component_name = component["name"]
            try:
                logger.debug(
                    f"starting_{component_name.lower().replace(' ', '_')}",
                    category="lifecycle",
                )
                await component["startup"](app, settings)
            except (OSError, PermissionError) as e:
                logger.error(
                    f"{component_name.lower().replace(' ', '_')}_startup_io_failed",
                    error=str(e),
                    component=component_name,
                    exc_info=e,
                    category="lifecycle",
                )
            except Exception as e:
                logger.error(
                    f"{component_name.lower().replace(' ', '_')}_startup_failed",
                    error=str(e),
                    component=component_name,
                    exc_info=e,
                    category="lifecycle",
                )

    # After startup completes (post-yield happens on shutdown); emit ready before yielding
    # Safely derive feature flags from settings which may be models or dicts
    def _get_plugin_enabled(name: str) -> bool:
        plugins_cfg = getattr(settings, "plugins", None)
        if plugins_cfg is None:
            return False
        # dict-like
        if isinstance(plugins_cfg, dict):
            cfg = plugins_cfg.get(name)
            if isinstance(cfg, dict):
                return bool(cfg.get("enabled", False))
            try:
                return bool(getattr(cfg, "enabled", False))
            except Exception:
                return False
        # object-like
        try:
            sub = getattr(plugins_cfg, name, None)
            return bool(getattr(sub, "enabled", False))
        except Exception:
            return False

    def _get_auth_enabled() -> bool:
        auth_cfg = getattr(settings, "auth", None)
        if auth_cfg is None:
            return False
        if isinstance(auth_cfg, dict):
            return bool(auth_cfg.get("enabled", False))
        return bool(getattr(auth_cfg, "enabled", False))

    logger.info(
        "server_ready",
        url=f"http://{settings.server.host}:{settings.server.port}",
        version=__version__,
        workers=settings.server.workers,
        reload=settings.server.reload,
        features_enabled={
            "plugins": bool(getattr(settings, "enable_plugins", False)),
            "metrics": _get_plugin_enabled("metrics"),
            "access": _get_plugin_enabled("access_log"),
            "auth": _get_auth_enabled(),
        },
        category="lifecycle",
    )

    yield

    logger.debug("server_stop", category="lifecycle")

    for shutdown_component in SHUTDOWN_ONLY_COMPONENTS:
        if shutdown_component["shutdown"]:
            component_name = shutdown_component["name"]
            try:
                logger.debug(
                    f"stopping_{component_name.lower().replace(' ', '_')}",
                    category="lifecycle",
                )
                await shutdown_component["shutdown"](app)
            except (OSError, PermissionError) as e:
                logger.error(
                    f"{component_name.lower().replace(' ', '_')}_shutdown_io_failed",
                    error=str(e),
                    component=component_name,
                    exc_info=e,
                    category="lifecycle",
                )
            except Exception as e:
                logger.error(
                    f"{component_name.lower().replace(' ', '_')}_shutdown_failed",
                    error=str(e),
                    component=component_name,
                    exc_info=e,
                    category="lifecycle",
                )

    for component in reversed(LIFECYCLE_COMPONENTS):
        if component["shutdown"]:
            component_name = component["name"]
            try:
                logger.debug(
                    f"stopping_{component_name.lower().replace(' ', '_')}",
                    category="lifecycle",
                )
                if component_name == "Permission Service":
                    await component["shutdown"](app, settings)  # type: ignore
                else:
                    await component["shutdown"](app)  # type: ignore
            except (OSError, PermissionError) as e:
                logger.error(
                    f"{component_name.lower().replace(' ', '_')}_shutdown_io_failed",
                    error=str(e),
                    component=component_name,
                    exc_info=e,
                    category="lifecycle",
                )
            except Exception as e:
                logger.error(
                    f"{component_name.lower().replace(' ', '_')}_shutdown_failed",
                    error=str(e),
                    component=component_name,
                    exc_info=e,
                    category="lifecycle",
                )

get_app

get_app()

Get the FastAPI app instance.

Source code in ccproxy/api/app.py
def get_app() -> FastAPI:
    """Get the FastAPI app instance."""
    container = create_service_container()
    return create_app(container)