Skip to content

ccproxy.core.async_task_manager

ccproxy.core.async_task_manager

Centralized async task management for lifecycle control and resource cleanup.

This module provides a centralized task manager that tracks all spawned async tasks, handles proper cancellation on shutdown, and provides exception handling for background tasks to prevent resource leaks and unhandled exceptions.

TaskInfo

TaskInfo(
    task,
    name,
    created_at,
    creator=None,
    cleanup_callback=None,
)

Information about a managed task.

Source code in ccproxy/core/async_task_manager.py
def __init__(
    self,
    task: asyncio.Task[Any],
    name: str,
    created_at: float,
    creator: str | None = None,
    cleanup_callback: Callable[[], None] | None = None,
):
    self.task = task
    self.name = name
    self.created_at = created_at
    self.creator = creator
    self.cleanup_callback = cleanup_callback
    self.task_id = str(uuid.uuid4())

age_seconds property

age_seconds

Get the age of the task in seconds.

is_done property

is_done

Check if the task is done.

is_cancelled property

is_cancelled

Check if the task was cancelled.

get_exception

get_exception()

Get the exception if the task failed.

Source code in ccproxy/core/async_task_manager.py
def get_exception(self) -> BaseException | None:
    """Get the exception if the task failed."""
    if self.task.done() and not self.task.cancelled():
        try:
            return self.task.exception()
        except asyncio.InvalidStateError:
            return None
    return None

AsyncTaskManager

AsyncTaskManager(
    cleanup_interval=30.0,
    shutdown_timeout=30.0,
    max_tasks=1000,
)

Centralized manager for async tasks with lifecycle control.

This class provides: - Task registration and tracking - Automatic cleanup of completed tasks - Graceful shutdown with cancellation - Exception handling for background tasks - Task monitoring and statistics

Parameters:

Name Type Description Default
cleanup_interval float

Interval for cleaning up completed tasks (seconds)

30.0
shutdown_timeout float

Timeout for graceful shutdown (seconds)

30.0
max_tasks int

Maximum number of tasks to track (prevents memory leaks)

1000
Source code in ccproxy/core/async_task_manager.py
def __init__(
    self,
    cleanup_interval: float = 30.0,
    shutdown_timeout: float = 30.0,
    max_tasks: int = 1000,
):
    """Initialize the task manager.

    Args:
        cleanup_interval: Interval for cleaning up completed tasks (seconds)
        shutdown_timeout: Timeout for graceful shutdown (seconds)
        max_tasks: Maximum number of tasks to track (prevents memory leaks)
    """
    self.cleanup_interval = cleanup_interval
    self.shutdown_timeout = shutdown_timeout
    self.max_tasks = max_tasks

    self._tasks: dict[str, TaskInfo] = {}
    self._lock = asyncio.Lock()
    self._shutdown_event = asyncio.Event()
    self._cleanup_task: asyncio.Task[None] | None = None
    self._started = False

is_started property

is_started

Check if the task manager is started.

start async

start()

Start the task manager and its cleanup task.

Source code in ccproxy/core/async_task_manager.py
async def start(self) -> None:
    """Start the task manager and its cleanup task."""
    if self._started:
        logger.warning("task_manager_already_started")
        return

    self._started = True
    logger.debug("task_manager_starting", cleanup_interval=self.cleanup_interval)

    # Start cleanup task
    self._cleanup_task = asyncio.create_task(
        self._cleanup_loop(), name="task_manager_cleanup"
    )

    logger.debug("task_manager_started")

stop async

stop()

Stop the task manager and cancel all managed tasks.

Source code in ccproxy/core/async_task_manager.py
async def stop(self) -> None:
    """Stop the task manager and cancel all managed tasks."""
    if not self._started:
        return

    logger.debug("task_manager_stopping", active_tasks=len(self._tasks))
    self._shutdown_event.set()

    # Stop cleanup task first
    if self._cleanup_task and not self._cleanup_task.done():
        self._cleanup_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._cleanup_task

    # Cancel all managed tasks
    await self._cancel_all_tasks()

    # Clear task registry
    async with self._lock:
        self._tasks.clear()

    self._started = False
    logger.debug("task_manager_stopped")

create_task async

create_task(
    coro, *, name=None, creator=None, cleanup_callback=None
)

Create a managed task.

Parameters:

Name Type Description Default
coro Awaitable[T]

Coroutine to execute

required
name str | None

Optional name for the task (auto-generated if None)

None
creator str | None

Optional creator identifier for debugging

None
cleanup_callback Callable[[], None] | None

Optional callback to run when task completes

None

Returns:

Type Description
Task[T]

The created task

Raises:

Type Description
RuntimeError

If task manager is not started or has too many tasks

Source code in ccproxy/core/async_task_manager.py
async def create_task(
    self,
    coro: Awaitable[T],
    *,
    name: str | None = None,
    creator: str | None = None,
    cleanup_callback: Callable[[], None] | None = None,
) -> asyncio.Task[T]:
    """Create a managed task.

    Args:
        coro: Coroutine to execute
        name: Optional name for the task (auto-generated if None)
        creator: Optional creator identifier for debugging
        cleanup_callback: Optional callback to run when task completes

    Returns:
        The created task

    Raises:
        RuntimeError: If task manager is not started or has too many tasks
    """
    if not self._started:
        raise RuntimeError("Task manager is not started")

    # Check task limit
    if len(self._tasks) >= self.max_tasks:
        logger.warning(
            "task_manager_at_capacity",
            current_tasks=len(self._tasks),
            max_tasks=self.max_tasks,
        )
        # Clean up completed tasks to make room
        await self._cleanup_completed_tasks()

        if len(self._tasks) >= self.max_tasks:
            raise RuntimeError(f"Task manager at capacity ({self.max_tasks} tasks)")

    # Generate name if not provided
    if name is None:
        name = f"managed_task_{len(self._tasks)}"

    # Create the task with exception handling
    task = asyncio.create_task(
        self._wrap_with_exception_handling(coro, name),
        name=name,
    )

    # Register the task
    task_info = TaskInfo(
        task=task,
        name=name,
        created_at=time.time(),
        creator=creator,
        cleanup_callback=cleanup_callback,
    )

    async with self._lock:
        self._tasks[task_info.task_id] = task_info

    # Add done callback for automatic cleanup
    task.add_done_callback(lambda t: self._schedule_cleanup_callback(task_info))

    logger.debug(
        "task_created",
        task_id=task_info.task_id,
        task_name=name,
        creator=creator,
        total_tasks=len(self._tasks),
    )

    return task

get_task_stats async

get_task_stats()

Get statistics about managed tasks.

Source code in ccproxy/core/async_task_manager.py
async def get_task_stats(self) -> dict[str, Any]:
    """Get statistics about managed tasks."""
    async with self._lock:
        active_tasks = sum(1 for t in self._tasks.values() if not t.is_done)
        cancelled_tasks = sum(1 for t in self._tasks.values() if t.is_cancelled)
        failed_tasks = sum(
            1
            for t in self._tasks.values()
            if t.is_done and not t.is_cancelled and t.get_exception()
        )

        return {
            "total_tasks": len(self._tasks),
            "active_tasks": active_tasks,
            "cancelled_tasks": cancelled_tasks,
            "failed_tasks": failed_tasks,
            "completed_tasks": len(self._tasks) - active_tasks,
            "started": self._started,
            "max_tasks": self.max_tasks,
        }

list_active_tasks async

list_active_tasks()

Get list of active tasks with details.

Source code in ccproxy/core/async_task_manager.py
async def list_active_tasks(self) -> list[dict[str, Any]]:
    """Get list of active tasks with details."""
    active_tasks = []

    async with self._lock:
        for task_info in self._tasks.values():
            if not task_info.is_done:
                active_tasks.append(
                    {
                        "task_id": task_info.task_id,
                        "name": task_info.name,
                        "creator": task_info.creator,
                        "age_seconds": task_info.age_seconds,
                        "created_at": task_info.created_at,
                    }
                )

    return active_tasks

create_managed_task async

create_managed_task(
    coro,
    *,
    name=None,
    creator=None,
    cleanup_callback=None,
    container=None,
    task_manager=None,
)

Create a managed task using the dependency-injected task manager.

Parameters:

Name Type Description Default
coro Awaitable[T]

Coroutine to execute

required
name str | None

Optional name for the task

None
creator str | None

Optional creator identifier

None
cleanup_callback Callable[[], None] | None

Optional cleanup callback

None
container Optional[ServiceContainer]

Optional service container for resolving the task manager

None
task_manager Optional[AsyncTaskManager]

Optional explicit task manager instance

None

Returns:

Type Description
Task[T]

The created managed task

Source code in ccproxy/core/async_task_manager.py
async def create_managed_task(
    coro: Awaitable[T],
    *,
    name: str | None = None,
    creator: str | None = None,
    cleanup_callback: Callable[[], None] | None = None,
    container: Optional["ServiceContainer"] = None,
    task_manager: Optional["AsyncTaskManager"] = None,
) -> asyncio.Task[T]:
    """Create a managed task using the dependency-injected task manager.

    Args:
        coro: Coroutine to execute
        name: Optional name for the task
        creator: Optional creator identifier
        cleanup_callback: Optional cleanup callback
        container: Optional service container for resolving the task manager
        task_manager: Optional explicit task manager instance

    Returns:
        The created managed task
    """

    manager = _resolve_task_manager(container=container, task_manager=task_manager)
    return await manager.create_task(
        coro, name=name, creator=creator, cleanup_callback=cleanup_callback
    )

start_task_manager async

start_task_manager(*, container=None, task_manager=None)

Start the dependency-injected task manager.

Source code in ccproxy/core/async_task_manager.py
async def start_task_manager(
    *,
    container: Optional["ServiceContainer"] = None,
    task_manager: Optional["AsyncTaskManager"] = None,
) -> None:
    """Start the dependency-injected task manager."""

    manager = _resolve_task_manager(container=container, task_manager=task_manager)
    await manager.start()

stop_task_manager async

stop_task_manager(*, container=None, task_manager=None)

Stop the dependency-injected task manager.

Source code in ccproxy/core/async_task_manager.py
async def stop_task_manager(
    *,
    container: Optional["ServiceContainer"] = None,
    task_manager: Optional["AsyncTaskManager"] = None,
) -> None:
    """Stop the dependency-injected task manager."""

    manager = _resolve_task_manager(container=container, task_manager=task_manager)
    await manager.stop()

create_fire_and_forget_task

create_fire_and_forget_task(
    coro,
    *,
    name=None,
    creator=None,
    container=None,
    task_manager=None,
)

Create a fire-and-forget managed task from a synchronous context.

This function schedules a coroutine to run as a managed task without needing to await it. Useful for calling from synchronous functions that need to schedule background work.

Parameters:

Name Type Description Default
coro Awaitable[T]

Coroutine to execute

required
name str | None

Optional name for the task

None
creator str | None

Optional creator identifier

None
container Optional[ServiceContainer]

Optional service container to resolve the task manager

None
task_manager Optional[AsyncTaskManager]

Optional explicit task manager instance

None
Source code in ccproxy/core/async_task_manager.py
def create_fire_and_forget_task(
    coro: Awaitable[T],
    *,
    name: str | None = None,
    creator: str | None = None,
    container: Optional["ServiceContainer"] = None,
    task_manager: Optional["AsyncTaskManager"] = None,
) -> None:
    """Create a fire-and-forget managed task from a synchronous context.

    This function schedules a coroutine to run as a managed task without
    needing to await it. Useful for calling from synchronous functions
    that need to schedule background work.

    Args:
        coro: Coroutine to execute
        name: Optional name for the task
        creator: Optional creator identifier
        container: Optional service container to resolve the task manager
        task_manager: Optional explicit task manager instance
    """

    manager = _resolve_task_manager(container=container, task_manager=task_manager)

    if not manager.is_started:
        # If task manager isn't started, fall back to regular asyncio.create_task
        logger.warning(
            "task_manager_not_started_fire_and_forget",
            name=name,
            creator=creator,
        )
        asyncio.create_task(coro, name=name)  # type: ignore[arg-type]
        return

    # Schedule the task creation as a fire-and-forget operation
    async def _create_managed_task() -> None:
        try:
            await manager.create_task(coro, name=name, creator=creator)
        except Exception as e:
            logger.error(
                "fire_and_forget_task_creation_failed",
                name=name,
                creator=creator,
                error=str(e),
                exc_info=True,
            )

    # Use asyncio.create_task to schedule the managed task creation
    asyncio.create_task(_create_managed_task(), name=f"create_{name or 'unnamed'}")