Skip to content

ccproxy.scheduler

ccproxy.scheduler

Scheduler system for periodic tasks.

This module provides a generic, extensible scheduler for managing periodic tasks in the CCProxy API. It provides a centralized system that supports:

  • Generic task scheduling with configurable intervals
  • Task registration and discovery via registry pattern
  • Graceful startup and shutdown with FastAPI integration
  • Error handling with exponential backoff
  • Structured logging and monitoring

Key components: - Scheduler: Core scheduler engine for task management - BaseScheduledTask: Abstract base class for all scheduled tasks - TaskRegistry: Dynamic task registration and discovery system

Scheduler

Scheduler(
    task_registry,
    max_concurrent_tasks=10,
    graceful_shutdown_timeout=30.0,
)

Scheduler for managing multiple periodic tasks.

Provides centralized management of scheduled tasks with: - Dynamic task registration and configuration - Graceful startup and shutdown - Task monitoring and status reporting - Error handling and recovery

Parameters:

Name Type Description Default
max_concurrent_tasks int

Maximum number of tasks to run concurrently

10
graceful_shutdown_timeout float

Timeout for graceful shutdown in seconds

30.0
task_registry TaskRegistry

Task registry instance (required)

required
Source code in ccproxy/scheduler/core.py
def __init__(
    self,
    task_registry: TaskRegistry,
    max_concurrent_tasks: int = 10,
    graceful_shutdown_timeout: float = 30.0,
):
    """
    Initialize the scheduler.

    Args:
        max_concurrent_tasks: Maximum number of tasks to run concurrently
        graceful_shutdown_timeout: Timeout for graceful shutdown in seconds
        task_registry: Task registry instance (required)
    """
    self.max_concurrent_tasks = max_concurrent_tasks
    self.graceful_shutdown_timeout = graceful_shutdown_timeout
    self.task_registry = task_registry

    self._running = False
    self._tasks: dict[str, BaseScheduledTask] = {}
    self._semaphore: asyncio.Semaphore | None = None

is_running property

is_running

Check if the scheduler is running.

task_count property

task_count

Get the number of managed tasks.

start async

start()

Start the scheduler and all enabled tasks.

Source code in ccproxy/scheduler/core.py
async def start(self) -> None:
    """Start the scheduler and all enabled tasks."""
    if self._running:
        logger.warning("scheduler_already_running")
        return

    self._running = True
    self._semaphore = asyncio.Semaphore(self.max_concurrent_tasks)

    logger.debug(
        "scheduler_starting",
        max_concurrent_tasks=self.max_concurrent_tasks,
        registered_tasks=self.task_registry.list(),
    )

    try:
        # No automatic task creation - tasks must be explicitly added
        logger.debug(
            "scheduler_started",
            active_tasks=len(self._tasks),
            running_tasks=[
                name for name, task in self._tasks.items() if task.is_running
            ],
        )
    except Exception as e:
        self._running = False
        logger.error(
            "scheduler_start_failed",
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )
        raise SchedulerError(f"Failed to start scheduler: {e}") from e

stop async

stop()

Stop the scheduler and all running tasks.

Source code in ccproxy/scheduler/core.py
async def stop(self) -> None:
    """Stop the scheduler and all running tasks."""
    if not self._running:
        return

    self._running = False
    logger.debug("scheduler_stopping", active_tasks=len(self._tasks))

    # Stop all tasks
    stop_tasks = []
    for task_name, task in self._tasks.items():
        if task.is_running:
            logger.debug("stopping_task", task_name=task_name)
            stop_tasks.append(task.stop())

    if stop_tasks:
        try:
            # Wait for all tasks to stop gracefully
            await asyncio.wait_for(
                asyncio.gather(*stop_tasks, return_exceptions=True),
                timeout=self.graceful_shutdown_timeout,
            )
            logger.debug("scheduler_stopped_gracefully")
        except TimeoutError:
            logger.warning(
                "scheduler_shutdown_timeout",
                timeout=self.graceful_shutdown_timeout,
            )
            # Tasks should have cancelled themselves, but log the issue
            for task_name, task in self._tasks.items():
                if task.is_running:
                    logger.warning(
                        "task_still_running_after_shutdown", task_name=task_name
                    )
        except Exception as e:
            logger.error(
                "scheduler_shutdown_error",
                error=str(e),
                error_type=type(e).__name__,
                exc_info=e,
            )
            raise SchedulerShutdownError(
                f"Error during scheduler shutdown: {e}"
            ) from e

    self._tasks.clear()
    logger.debug("scheduler_stopped")

add_task async

add_task(task_name, task_type, **task_kwargs)

Add and start a task.

Parameters:

Name Type Description Default
task_name str

Unique name for this task instance

required
task_type str

Type of task (must be registered in task registry)

required
**task_kwargs Any

Additional arguments to pass to task constructor

{}

Raises:

Type Description
TaskRegistrationError

If task type is not registered

SchedulerError

If task name already exists or task creation fails

Source code in ccproxy/scheduler/core.py
async def add_task(
    self,
    task_name: str,
    task_type: str,
    **task_kwargs: Any,
) -> None:
    """
    Add and start a task.

    Args:
        task_name: Unique name for this task instance
        task_type: Type of task (must be registered in task registry)
        **task_kwargs: Additional arguments to pass to task constructor

    Raises:
        TaskRegistrationError: If task type is not registered
        SchedulerError: If task name already exists or task creation fails
    """
    if task_name in self._tasks:
        raise SchedulerError(f"Task '{task_name}' already exists")

    if not self.task_registry.has(task_type):
        raise TaskRegistrationError(f"Task type '{task_type}' is not registered")

    try:
        # Get task class and create instance
        task_class = self.task_registry.get(task_type)
        task_instance = task_class(name=task_name, **task_kwargs)

        interval_value = task_kwargs.get("interval_seconds")
        if interval_value is not None:
            try:
                task_instance.interval_seconds = max(1.0, float(interval_value))
            except (TypeError, ValueError):
                logger.warning(
                    "task_interval_invalid",
                    task_name=task_name,
                    task_type=task_type,
                    interval_value=interval_value,
                )

        # Add to our tasks dict
        self._tasks[task_name] = task_instance

        # Start the task if scheduler is running and task is enabled
        if self._running and task_instance.enabled:
            await task_instance.start()
            logger.debug(
                "task_added_and_started",
                task_name=task_name,
                task_type=task_type,
            )
        else:
            logger.debug(
                "task_added_not_started",
                task_name=task_name,
                task_type=task_type,
                scheduler_running=self._running,
                task_enabled=task_instance.enabled,
            )

    except Exception as e:
        # Clean up if task was partially added
        if task_name in self._tasks:
            del self._tasks[task_name]

        logger.error(
            "task_add_failed",
            task_name=task_name,
            task_type=task_type,
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )
        raise SchedulerError(f"Failed to add task '{task_name}': {e}") from e

remove_task async

remove_task(task_name)

Remove and stop a task.

Parameters:

Name Type Description Default
task_name str

Name of task to remove

required

Raises:

Type Description
TaskNotFoundError

If task does not exist

Source code in ccproxy/scheduler/core.py
async def remove_task(self, task_name: str) -> None:
    """
    Remove and stop a task.

    Args:
        task_name: Name of task to remove

    Raises:
        TaskNotFoundError: If task does not exist
    """
    if task_name not in self._tasks:
        raise TaskNotFoundError(f"Task '{task_name}' does not exist")

    task = self._tasks[task_name]

    try:
        if task.is_running:
            await task.stop()

        del self._tasks[task_name]
        logger.info("task_removed", task_name=task_name)

    except Exception as e:
        logger.error(
            "task_remove_failed",
            task_name=task_name,
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )
        raise SchedulerError(f"Failed to remove task '{task_name}': {e}") from e

get_task

get_task(task_name)

Get a task instance by name.

Parameters:

Name Type Description Default
task_name str

Name of task to retrieve

required

Returns:

Type Description
BaseScheduledTask

Task instance

Raises:

Type Description
TaskNotFoundError

If task does not exist

Source code in ccproxy/scheduler/core.py
def get_task(self, task_name: str) -> BaseScheduledTask:
    """
    Get a task instance by name.

    Args:
        task_name: Name of task to retrieve

    Returns:
        Task instance

    Raises:
        TaskNotFoundError: If task does not exist
    """
    if task_name not in self._tasks:
        raise TaskNotFoundError(f"Task '{task_name}' does not exist")

    return self._tasks[task_name]

list_tasks

list_tasks()

Get list of all task names.

Returns:

Type Description
list[str]

List of task names

Source code in ccproxy/scheduler/core.py
def list_tasks(self) -> list[str]:
    """
    Get list of all task names.

    Returns:
        List of task names
    """
    return list(self._tasks.keys())

get_task_status

get_task_status(task_name)

Get status information for a specific task.

Parameters:

Name Type Description Default
task_name str

Name of task

required

Returns:

Type Description
dict[str, Any]

Task status dictionary

Raises:

Type Description
TaskNotFoundError

If task does not exist

Source code in ccproxy/scheduler/core.py
def get_task_status(self, task_name: str) -> dict[str, Any]:
    """
    Get status information for a specific task.

    Args:
        task_name: Name of task

    Returns:
        Task status dictionary

    Raises:
        TaskNotFoundError: If task does not exist
    """
    if task_name not in self._tasks:
        raise TaskNotFoundError(f"Task '{task_name}' does not exist")

    return self._tasks[task_name].get_status()

get_scheduler_status

get_scheduler_status()

Get overall scheduler status information.

Returns:

Type Description
dict[str, Any]

Scheduler status dictionary

Source code in ccproxy/scheduler/core.py
def get_scheduler_status(self) -> dict[str, Any]:
    """
    Get overall scheduler status information.

    Returns:
        Scheduler status dictionary
    """
    running_tasks = [name for name, task in self._tasks.items() if task.is_running]

    return {
        "running": self._running,
        "total_tasks": len(self._tasks),
        "running_tasks": len(running_tasks),
        "max_concurrent_tasks": self.max_concurrent_tasks,
        "graceful_shutdown_timeout": self.graceful_shutdown_timeout,
        "task_names": list(self._tasks.keys()),
        "running_task_names": running_tasks,
        "registered_task_types": self.task_registry.list(),
    }

TaskRegistry

TaskRegistry()

Registry for managing scheduled task registration and discovery.

Provides a centralized way to register and retrieve scheduled tasks, enabling dynamic task management and configuration.

Source code in ccproxy/scheduler/registry.py
def __init__(self) -> None:
    """Initialize the task registry."""
    self._tasks: dict[str, type[BaseScheduledTask]] = {}

register

register(name, task_class)

Register a scheduled task class.

Parameters:

Name Type Description Default
name str

Unique name for the task

required
task_class type[BaseScheduledTask]

Task class that inherits from BaseScheduledTask

required

Raises:

Type Description
TaskRegistrationError

If task name is already registered or invalid

Source code in ccproxy/scheduler/registry.py
def register(self, name: str, task_class: type[BaseScheduledTask]) -> None:
    """
    Register a scheduled task class.

    Args:
        name: Unique name for the task
        task_class: Task class that inherits from BaseScheduledTask

    Raises:
        TaskRegistrationError: If task name is already registered or invalid
    """
    if name in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is already registered")

    if not issubclass(task_class, BaseScheduledTask):
        raise TaskRegistrationError(
            f"Task class for '{name}' must inherit from BaseScheduledTask"
        )

    self._tasks[name] = task_class
    logger.debug("task_registered", task_name=name, task_class=task_class.__name__)

unregister

unregister(name)

Unregister a scheduled task.

Parameters:

Name Type Description Default
name str

Name of the task to unregister

required

Raises:

Type Description
TaskRegistrationError

If task is not registered

Source code in ccproxy/scheduler/registry.py
def unregister(self, name: str) -> None:
    """
    Unregister a scheduled task.

    Args:
        name: Name of the task to unregister

    Raises:
        TaskRegistrationError: If task is not registered
    """
    if name not in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is not registered")

    del self._tasks[name]
    logger.debug("task_unregistered", task_name=name)

get

get(name)

Get a registered task class by name.

Parameters:

Name Type Description Default
name str

Name of the task to retrieve

required

Returns:

Type Description
type[BaseScheduledTask]

Task class

Raises:

Type Description
TaskRegistrationError

If task is not registered

Source code in ccproxy/scheduler/registry.py
def get(self, name: str) -> type[BaseScheduledTask]:
    """
    Get a registered task class by name.

    Args:
        name: Name of the task to retrieve

    Returns:
        Task class

    Raises:
        TaskRegistrationError: If task is not registered
    """
    if name not in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is not registered")

    return self._tasks[name]

list

list()

Get list of all registered task names.

Returns:

Type Description
list[str]

List of registered task names

Source code in ccproxy/scheduler/registry.py
def list(self) -> list[str]:
    """
    Get list of all registered task names.

    Returns:
        List of registered task names
    """
    return list(self._tasks.keys())

has

has(name)

Check if a task is registered.

Parameters:

Name Type Description Default
name str

Task name to check

required

Returns:

Type Description
bool

True if task is registered, False otherwise

Source code in ccproxy/scheduler/registry.py
def has(self, name: str) -> bool:
    """
    Check if a task is registered.

    Args:
        name: Task name to check

    Returns:
        True if task is registered, False otherwise
    """
    return name in self._tasks

clear

clear()

Clear all registered tasks.

Source code in ccproxy/scheduler/registry.py
def clear(self) -> None:
    """Clear all registered tasks."""
    self._tasks.clear()
    logger.debug("task_registry_cleared")

info

info()

Get information about the current registry state.

Returns:

Type Description
dict[str, Any]

Dictionary with registry information

Source code in ccproxy/scheduler/registry.py
def info(self) -> dict[str, Any]:
    """
    Get information about the current registry state.

    Returns:
        Dictionary with registry information
    """
    return {
        "total_tasks": len(self._tasks),
        "registered_tasks": list(self._tasks.keys()),
        "task_classes": {name: cls.__name__ for name, cls in self._tasks.items()},
    }

BaseScheduledTask

BaseScheduledTask(
    name,
    interval_seconds,
    enabled=True,
    max_backoff_seconds=300.0,
    jitter_factor=0.25,
)

Bases: ABC

Abstract base class for all scheduled tasks.

Provides common functionality for task lifecycle management, error handling, and exponential backoff for failed executions.

Parameters:

Name Type Description Default
name str

Human-readable task name

required
interval_seconds float

Interval between task executions in seconds

required
enabled bool

Whether the task is enabled

True
max_backoff_seconds float

Maximum backoff delay for failed tasks

300.0
jitter_factor float

Jitter factor for backoff randomization (0.0-1.0)

0.25
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    max_backoff_seconds: float = 300.0,
    jitter_factor: float = 0.25,
):
    """
    Initialize scheduled task.

    Args:
        name: Human-readable task name
        interval_seconds: Interval between task executions in seconds
        enabled: Whether the task is enabled
        max_backoff_seconds: Maximum backoff delay for failed tasks
        jitter_factor: Jitter factor for backoff randomization (0.0-1.0)
    """
    self.name = name
    self.interval_seconds = max(1.0, interval_seconds)
    self.enabled = enabled
    self.max_backoff_seconds = max_backoff_seconds
    self.jitter_factor = min(1.0, max(0.0, jitter_factor))

    self._consecutive_failures = 0
    self._last_run_time: float = 0
    self._running = False
    self._task: asyncio.Task[Any] | None = None
    self._stop_complete: asyncio.Event | None = None

is_running property

is_running

Check if the task is currently running.

consecutive_failures property

consecutive_failures

Get the number of consecutive failures.

last_run_time property

last_run_time

Get the timestamp of the last execution.

run abstractmethod async

run()

Execute the scheduled task.

Returns:

Type Description
bool

True if execution was successful, False otherwise

Source code in ccproxy/scheduler/tasks.py
@abstractmethod
async def run(self) -> bool:
    """
    Execute the scheduled task.

    Returns:
        True if execution was successful, False otherwise
    """
    pass

setup async

setup()

Perform any setup required before task execution starts.

Called once when the task is first started. Override if needed. Default implementation does nothing.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """
    Perform any setup required before task execution starts.

    Called once when the task is first started. Override if needed.
    Default implementation does nothing.
    """
    # Default implementation - subclasses can override if needed
    return

cleanup async

cleanup()

Perform any cleanup required after task execution stops.

Called once when the task is stopped. Override if needed. Default implementation does nothing.

Source code in ccproxy/scheduler/tasks.py
async def cleanup(self) -> None:
    """
    Perform any cleanup required after task execution stops.

    Called once when the task is stopped. Override if needed.
    Default implementation does nothing.
    """
    # Default implementation - subclasses can override if needed
    return

calculate_next_delay

calculate_next_delay()

Calculate the delay before the next task execution.

Returns exponential backoff delay for failed tasks, or normal interval for successful tasks, with optional jitter.

Returns:

Type Description
float

Delay in seconds before next execution

Source code in ccproxy/scheduler/tasks.py
def calculate_next_delay(self) -> float:
    """
    Calculate the delay before the next task execution.

    Returns exponential backoff delay for failed tasks, or normal interval
    for successful tasks, with optional jitter.

    Returns:
        Delay in seconds before next execution
    """
    if self._consecutive_failures == 0:
        base_delay = self.interval_seconds
    else:
        # Exponential backoff: interval * (2 ^ failures)
        base_delay = self.interval_seconds * (2**self._consecutive_failures)
        base_delay = min(base_delay, self.max_backoff_seconds)

    # Add jitter to prevent thundering herd
    if self.jitter_factor > 0:
        jitter = base_delay * self.jitter_factor * (random.random() - 0.5)
        base_delay += jitter

    return max(1.0, base_delay)

start async

start()

Start the scheduled task execution loop.

Source code in ccproxy/scheduler/tasks.py
async def start(self) -> None:
    """Start the scheduled task execution loop."""
    if self._running or not self.enabled:
        return

    self._running = True
    self._stop_complete = asyncio.Event()
    logger.debug("task_starting", task_name=self.name)

    try:
        await self.setup()
        self._task = await create_managed_task(
            self._run_loop(),
            name=f"scheduled_task_{self.name}",
            creator="BaseScheduledTask",
        )
        logger.debug("task_started", task_name=self.name)
    except SchedulerError as e:
        self._running = False
        logger.error(
            "task_start_scheduler_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )
        raise
    except Exception as e:
        self._running = False
        logger.error(
            "task_start_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )
        raise

stop async

stop()

Stop the scheduled task execution loop.

Source code in ccproxy/scheduler/tasks.py
async def stop(self) -> None:
    """Stop the scheduled task execution loop."""
    if not self._running:
        return

    self._running = False
    logger.debug("task_stopping", task_name=self.name)

    # Cancel the running task and wait for it to complete
    if self._task and not self._task.done():
        self._task.cancel()
        try:
            # Wait for the task to complete cancellation
            await self._task
        except asyncio.CancelledError:
            # Expected when task is cancelled
            pass
        except Exception as e:
            logger.warning(
                "task_stop_unexpected_error",
                task_name=self.name,
                error=str(e),
                error_type=type(e).__name__,
            )

    # Ensure the task reference is cleared
    self._task = None

    # Wait for the completion event to be signaled
    if self._stop_complete is not None:
        try:
            await asyncio.wait_for(self._stop_complete.wait(), timeout=1.0)
        except TimeoutError:
            logger.warning(
                "task_stop_completion_timeout",
                task_name=self.name,
                message="Task stop completion event not signaled within timeout",
            )

    try:
        await self.cleanup()
        logger.debug("task_stopped", task_name=self.name)
    except SchedulerError as e:
        logger.error(
            "task_cleanup_scheduler_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )
    except Exception as e:
        logger.error(
            "task_cleanup_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )

get_status

get_status()

Get current task status information.

Returns:

Type Description
dict[str, Any]

Dictionary with task status details

Source code in ccproxy/scheduler/tasks.py
def get_status(self) -> dict[str, Any]:
    """
    Get current task status information.

    Returns:
        Dictionary with task status details
    """
    return {
        "name": self.name,
        "enabled": self.enabled,
        "running": self.is_running,
        "interval_seconds": self.interval_seconds,
        "consecutive_failures": self.consecutive_failures,
        "last_run_time": self.last_run_time,
        "next_delay": self.calculate_next_delay() if self.is_running else None,
    }