Skip to content

ccproxy.core.plugins.hooks

ccproxy.core.plugins.hooks

Hook system for CCProxy.

This package provides a flexible, event-driven hook system that enables metrics collection, analytics, logging, and custom provider behaviors without modifying core code.

Key components: - HookEvent: Enumeration of all supported events - HookContext: Context data passed to hooks - Hook: Protocol for hook implementations - HookRegistry: Registry for managing hooks - HookManager: Manager for executing hooks - BackgroundHookThreadManager: Background thread manager for async hook execution

Hook

Bases: Protocol

Base hook protocol

name property

name

Hook name for debugging

events property

events

Events this hook listens to

priority property

priority

Hook execution priority (0-1000, lower executes first).

Default is 500 (middle priority) for backward compatibility. See HookLayer enum for standard priority values.

HookContext dataclass

HookContext(
    event,
    timestamp,
    data,
    metadata,
    request=None,
    response=None,
    provider=None,
    plugin=None,
    error=None,
)

Context passed to all hooks

HookEvent

Bases: str, Enum

Event types that can trigger hooks

HookManager

HookManager(registry, background_manager=None)

Manages hook execution with error isolation and async/sync support.

The HookManager is responsible for emitting events to registered hooks and ensuring that hook failures don't crash the system. It handles both async and sync hooks by running sync hooks in a thread pool.

Parameters:

Name Type Description Default
registry HookRegistry

The hook registry to get hooks from

required
background_manager BackgroundHookThreadManager | None

Optional background thread manager for fire-and-forget execution

None
Source code in ccproxy/core/plugins/hooks/manager.py
def __init__(
    self,
    registry: HookRegistry,
    background_manager: BackgroundHookThreadManager | None = None,
):
    """Initialize the hook manager.

    Args:
        registry: The hook registry to get hooks from
        background_manager: Optional background thread manager for fire-and-forget execution
    """
    self._registry = registry
    self._background_manager = background_manager
    self._logger = structlog.get_logger(__name__)

emit async

emit(event, data=None, fire_and_forget=True, **kwargs)

Emit an event to all registered hooks.

Creates a HookContext with the provided data and emits it to all hooks registered for the given event. Handles errors gracefully to ensure one failing hook doesn't affect others.

Parameters:

Name Type Description Default
event HookEvent

The event to emit

required
data dict[str, Any] | None

Optional data dictionary to include in context

None
fire_and_forget bool

If True, execute hooks in background thread (default)

True
**kwargs Any

Additional context fields (request, response, provider, etc.)

{}
Source code in ccproxy/core/plugins/hooks/manager.py
async def emit(
    self,
    event: HookEvent,
    data: dict[str, Any] | None = None,
    fire_and_forget: bool = True,
    **kwargs: Any,
) -> None:
    """Emit an event to all registered hooks.

    Creates a HookContext with the provided data and emits it to all
    hooks registered for the given event. Handles errors gracefully
    to ensure one failing hook doesn't affect others.

    Args:
        event: The event to emit
        data: Optional data dictionary to include in context
        fire_and_forget: If True, execute hooks in background thread (default)
        **kwargs: Additional context fields (request, response, provider, etc.)
    """
    context = HookContext(
        event=event,
        timestamp=datetime.utcnow(),
        data=data or {},
        metadata={},
        **kwargs,
    )

    if fire_and_forget and self._background_manager:
        # Execute in background thread - non-blocking
        self._background_manager.emit_async(context, self._registry)
        return
    elif fire_and_forget and not self._background_manager:
        # No background manager available, log warning and fall back to sync
        self._logger.warning(
            "fire_and_forget_requested_but_no_background_manager_available"
        )
    # Fall through to synchronous execution

    # Synchronous execution (legacy behavior)
    hooks = self._registry.get(event)
    if not hooks:
        return

    # Log execution order if debug logging enabled
    self._logger.debug(
        "hook_execution_order",
        hook_event=event.value if hasattr(event, "value") else str(event),
        hooks=[
            {"name": h.name, "priority": getattr(h, "priority", 500)} for h in hooks
        ],
    )

    # Execute all hooks in priority order, catching errors
    for hook in hooks:
        try:
            await self._execute_hook(hook, context)
        except Exception as e:
            self._logger.error(
                "hook_execution_failed",
                hook=hook.name,
                hook_event=event.value if hasattr(event, "value") else str(event),
                priority=getattr(hook, "priority", 500),
                error=str(e),
            )

emit_with_context async

emit_with_context(context, fire_and_forget=True)

Emit an event using a pre-built HookContext.

This is useful when you need to build the context with specific metadata before emitting the event.

Parameters:

Name Type Description Default
context HookContext

The HookContext to emit

required
fire_and_forget bool

If True, execute hooks in background thread (default)

True
Source code in ccproxy/core/plugins/hooks/manager.py
async def emit_with_context(
    self, context: HookContext, fire_and_forget: bool = True
) -> None:
    """Emit an event using a pre-built HookContext.

    This is useful when you need to build the context with specific metadata
    before emitting the event.

    Args:
        context: The HookContext to emit
        fire_and_forget: If True, execute hooks in background thread (default)
    """
    if fire_and_forget and self._background_manager:
        # Execute in background thread - non-blocking
        self._background_manager.emit_async(context, self._registry)
        return
    elif fire_and_forget and not self._background_manager:
        # No background manager available, log warning and fall back to sync
        self._logger.warning(
            "fire_and_forget_requested_but_no_background_manager_available"
        )
    # Fall through to synchronous execution

    # Synchronous execution (legacy behavior)
    hooks = self._registry.get(context.event)
    if not hooks:
        return

    # Log execution order if debug logging enabled
    self._logger.debug(
        "hook_execution_order",
        hook_event=context.event.value
        if hasattr(context.event, "value")
        else str(context.event),
        hooks=[
            {"name": h.name, "priority": getattr(h, "priority", 500)} for h in hooks
        ],
    )

    # Execute all hooks in priority order, catching errors
    for hook in hooks:
        try:
            await self._execute_hook(hook, context)
        except Exception as e:
            self._logger.error(
                "hook_execution_failed",
                hook=hook.name,
                hook_event=context.event.value
                if hasattr(context.event, "value")
                else str(context.event),
                priority=getattr(hook, "priority", 500),
                error=str(e),
            )

shutdown

shutdown()

Shutdown the background hook processing.

This method should be called during application shutdown to ensure proper cleanup of the background thread.

Source code in ccproxy/core/plugins/hooks/manager.py
def shutdown(self) -> None:
    """Shutdown the background hook processing.

    This method should be called during application shutdown to ensure
    proper cleanup of the background thread.
    """
    if self._background_manager:
        self._background_manager.stop()

HookRegistry

HookRegistry()

Central registry for all hooks with priority-based ordering.

Source code in ccproxy/core/plugins/hooks/registry.py
def __init__(self) -> None:
    # Use SortedList for automatic priority ordering
    # Key function sorts by (priority, registration_order)
    self._hooks: dict[HookEvent, Any] = defaultdict(
        lambda: SortedList(
            key=lambda h: (
                getattr(h, "priority", 500),
                self._registration_order.get(h, 0),
            )
        )
    )
    self._registration_order: dict[Hook, int] = {}
    self._next_order = 0
    self._logger = structlog.get_logger(__name__)
    # Batch logging for registration/unregistration
    self._pending_registrations: list[tuple[str, str, int]] = []
    self._pending_unregistrations: list[tuple[str, str]] = []

register

register(hook)

Register a hook for its events with priority ordering

Source code in ccproxy/core/plugins/hooks/registry.py
def register(self, hook: Hook) -> None:
    """Register a hook for its events with priority ordering"""
    priority = getattr(
        hook, "priority", 500
    )  # Default priority for backward compatibility

    # Track registration order for stable sorting
    if hook not in self._registration_order:
        self._registration_order[hook] = self._next_order
        self._next_order += 1

    events_registered = []
    for event in hook.events:
        self._hooks[event].add(hook)
        event_name = event.value if hasattr(event, "value") else str(event)
        events_registered.append(event_name)
        # Log individual registrations at DEBUG level
        # self._logger.debug(
        #     "hook_registered",
        #     name=hook.name,
        #     hook_event=event_name,
        #     priority=priority,
        # )

    # Log summary at DEBUG; a global summary will be logged elsewhere at INFO
    if len(events_registered) > 0:
        self._logger.debug(
            "hook_registered",
            name=hook.name,
            events=events_registered,
            event_count=len(events_registered),
            priority=priority,
        )

unregister

unregister(hook)

Remove a hook from all events

Source code in ccproxy/core/plugins/hooks/registry.py
def unregister(self, hook: Hook) -> None:
    """Remove a hook from all events"""
    events_unregistered = []
    for event in hook.events:
        try:
            self._hooks[event].remove(hook)
            event_name = event.value if hasattr(event, "value") else str(event)
            events_unregistered.append(event_name)
            # Log individual unregistrations at DEBUG level
            # self._logger.debug(
            #     "hook_unregistered",
            #     name=hook.name,
            #     hook_event=event_name,
            # )
        except ValueError:
            pass  # Hook not in list, ignore

    # Log summary at INFO level only if multiple events
    if len(events_unregistered) > 1:
        self._logger.info(
            "hook_unregistered_summary",
            name=hook.name,
            events=events_unregistered,
            event_count=len(events_unregistered),
        )
    elif events_unregistered:
        # Single event - log at DEBUG level to reduce verbosity
        self._logger.debug(
            "hook_unregistered_single",
            name=hook.name,
            hook_event=events_unregistered[0],
        )

    # Clean up registration order tracking
    if hook in self._registration_order:
        del self._registration_order[hook]

get

get(event)

Get all hooks for an event in priority order

Source code in ccproxy/core/plugins/hooks/registry.py
def get(self, event: HookEvent) -> list[Hook]:
    """Get all hooks for an event in priority order"""
    return list(self._hooks.get(event, []))

list

list()

Get summary of all registered hooks organized by event.

Returns:

Type Description
dict[str, list[dict[str, Any]]]

Dictionary mapping event names to lists of hook info

Source code in ccproxy/core/plugins/hooks/registry.py
def list(self) -> dict[str, list[dict[str, Any]]]:
    """Get summary of all registered hooks organized by event.

    Returns:
        Dictionary mapping event names to lists of hook info
    """
    summary = {}
    for event, hooks in self._hooks.items():
        event_name = event.value if hasattr(event, "value") else str(event)
        summary[event_name] = [
            {
                "name": hook.name,
                "priority": getattr(hook, "priority", 500),
            }
            for hook in hooks
        ]
    return summary

has

has(event)

Check if any hook is registered for the event.

Source code in ccproxy/core/plugins/hooks/registry.py
def has(self, event: HookEvent) -> bool:
    """Check if any hook is registered for the event."""
    hooks = self._hooks.get(event)
    return bool(hooks and len(hooks) > 0)

clear

clear()

Clear all registered hooks and reset ordering (testing or shutdown).

Source code in ccproxy/core/plugins/hooks/registry.py
def clear(self) -> None:
    """Clear all registered hooks and reset ordering (testing or shutdown)."""
    self._hooks.clear()
    self._registration_order.clear()
    self._next_order = 0

BackgroundHookThreadManager

BackgroundHookThreadManager()

Manages a dedicated async thread for hook execution.

Source code in ccproxy/core/plugins/hooks/thread_manager.py
def __init__(self) -> None:
    """Initialize the background thread manager."""
    self._loop: asyncio.AbstractEventLoop | None = None
    self._thread: threading.Thread | None = None
    self._queue: asyncio.Queue[tuple[HookTask, Any]] | None = None
    self._shutdown_event: asyncio.Event | None = None
    self._running = False
    self._logger = logger.bind(component="background_hook_thread")
    # Signals when the background loop and its resources are ready
    self._ready_event: threading.Event | None = None

start

start()

Start the background thread with its own event loop.

Source code in ccproxy/core/plugins/hooks/thread_manager.py
def start(self) -> None:
    """Start the background thread with its own event loop."""
    if self._running:
        return

    # Create readiness event so callers can safely enqueue without sleeps
    self._ready_event = threading.Event()

    self._thread = threading.Thread(
        target=self._run_background_loop, name="hook-background-thread", daemon=True
    )
    self._thread.start()

    # Block briefly until the background loop has initialized its resources
    if self._ready_event and not self._ready_event.wait(timeout=1.0):
        self._logger.warning("background_hook_thread_startup_timeout")
    self._running = True

    self._logger.debug("background_hook_thread_started")

stop

stop(timeout=5.0)

Gracefully shutdown the background thread.

Source code in ccproxy/core/plugins/hooks/thread_manager.py
def stop(self, timeout: float = 5.0) -> None:
    """Gracefully shutdown the background thread."""
    if not self._running:
        return

    self._logger.debug("stopping_background_hook_thread")

    # Signal shutdown to the background loop
    if self._loop and self._shutdown_event:
        self._loop.call_soon_threadsafe(self._shutdown_event.set)

    # Wait for thread to complete
    if self._thread:
        self._thread.join(timeout=timeout)
        if self._thread.is_alive():
            self._logger.warning("background_thread_shutdown_timeout")

    self._running = False
    self._loop = None
    self._thread = None
    self._queue = None
    self._shutdown_event = None
    self._ready_event = None

    self._logger.debug("background_hook_thread_stopped")

emit_async

emit_async(context, registry)

Queue a hook task for background execution.

Parameters:

Name Type Description Default
context HookContext

Hook context to execute

required
registry Any

Hook registry to get hooks from

required
Source code in ccproxy/core/plugins/hooks/thread_manager.py
def emit_async(self, context: HookContext, registry: Any) -> None:
    """Queue a hook task for background execution.

    Args:
        context: Hook context to execute
        registry: Hook registry to get hooks from
    """
    if not self._running:
        self.start()

    if not self._loop or not self._queue:
        self._logger.warning("background_thread_not_ready_dropping_task")
        return

    task = HookTask(context=context)

    # Add task to queue in a thread-safe way
    try:
        self._loop.call_soon_threadsafe(self._add_task_to_queue, task, registry)
    except Exception as e:
        self._logger.error("failed_to_queue_hook_task", error=str(e))