Skip to content

ccproxy.core.plugins.hooks.thread_manager

ccproxy.core.plugins.hooks.thread_manager

Background thread manager for async hook execution.

HookTask dataclass

HookTask(
    context,
    task_id=(lambda: str(uuid4()))(),
    created_at=utcnow(),
)

Represents a hook execution task.

BackgroundHookThreadManager

BackgroundHookThreadManager()

Manages a dedicated async thread for hook execution.

Source code in ccproxy/core/plugins/hooks/thread_manager.py
def __init__(self) -> None:
    """Initialize the background thread manager."""
    self._loop: asyncio.AbstractEventLoop | None = None
    self._thread: threading.Thread | None = None
    self._queue: asyncio.Queue[tuple[HookTask, Any]] | None = None
    self._shutdown_event: asyncio.Event | None = None
    self._running = False
    self._logger = logger.bind(component="background_hook_thread")
    # Signals when the background loop and its resources are ready
    self._ready_event: threading.Event | None = None

start

start()

Start the background thread with its own event loop.

Source code in ccproxy/core/plugins/hooks/thread_manager.py
def start(self) -> None:
    """Start the background thread with its own event loop."""
    if self._running:
        return

    # Create readiness event so callers can safely enqueue without sleeps
    self._ready_event = threading.Event()

    self._thread = threading.Thread(
        target=self._run_background_loop, name="hook-background-thread", daemon=True
    )
    self._thread.start()

    # Block briefly until the background loop has initialized its resources
    if self._ready_event and not self._ready_event.wait(timeout=1.0):
        self._logger.warning("background_hook_thread_startup_timeout")
    self._running = True

    self._logger.debug("background_hook_thread_started")

stop

stop(timeout=5.0)

Gracefully shutdown the background thread.

Source code in ccproxy/core/plugins/hooks/thread_manager.py
def stop(self, timeout: float = 5.0) -> None:
    """Gracefully shutdown the background thread."""
    if not self._running:
        return

    self._logger.debug("stopping_background_hook_thread")

    # Signal shutdown to the background loop
    if self._loop and self._shutdown_event:
        self._loop.call_soon_threadsafe(self._shutdown_event.set)

    # Wait for thread to complete
    if self._thread:
        self._thread.join(timeout=timeout)
        if self._thread.is_alive():
            self._logger.warning("background_thread_shutdown_timeout")

    self._running = False
    self._loop = None
    self._thread = None
    self._queue = None
    self._shutdown_event = None
    self._ready_event = None

    self._logger.debug("background_hook_thread_stopped")

emit_async

emit_async(context, registry)

Queue a hook task for background execution.

Parameters:

Name Type Description Default
context HookContext

Hook context to execute

required
registry Any

Hook registry to get hooks from

required
Source code in ccproxy/core/plugins/hooks/thread_manager.py
def emit_async(self, context: HookContext, registry: Any) -> None:
    """Queue a hook task for background execution.

    Args:
        context: Hook context to execute
        registry: Hook registry to get hooks from
    """
    if not self._running:
        self.start()

    if not self._loop or not self._queue:
        self._logger.warning("background_thread_not_ready_dropping_task")
        return

    task = HookTask(context=context)

    # Add task to queue in a thread-safe way
    try:
        self._loop.call_soon_threadsafe(self._add_task_to_queue, task, registry)
    except Exception as e:
        self._logger.error("failed_to_queue_hook_task", error=str(e))