Skip to content

ccproxy.scheduler

ccproxy.scheduler

Scheduler system for periodic tasks.

This module provides a generic, extensible scheduler for managing periodic tasks in the CCProxy API. It provides a centralized system that supports:

  • Generic task scheduling with configurable intervals
  • Task registration and discovery via registry pattern
  • Graceful startup and shutdown with FastAPI integration
  • Error handling with exponential backoff
  • Structured logging and monitoring

Key components: - Scheduler: Core scheduler engine for task management - BaseScheduledTask: Abstract base class for all scheduled tasks - TaskRegistry: Dynamic task registration and discovery system

Scheduler

Scheduler(
    max_concurrent_tasks=10,
    graceful_shutdown_timeout=30.0,
    task_registry=None,
)

Scheduler for managing multiple periodic tasks.

Provides centralized management of scheduled tasks with: - Dynamic task registration and configuration - Graceful startup and shutdown - Task monitoring and status reporting - Error handling and recovery

Parameters:

Name Type Description Default
max_concurrent_tasks int

Maximum number of tasks to run concurrently

10
graceful_shutdown_timeout float

Timeout for graceful shutdown in seconds

30.0
task_registry TaskRegistry | None

Task registry instance (uses global if None)

None
Source code in ccproxy/scheduler/core.py
def __init__(
    self,
    max_concurrent_tasks: int = 10,
    graceful_shutdown_timeout: float = 30.0,
    task_registry: TaskRegistry | None = None,
):
    """
    Initialize the scheduler.

    Args:
        max_concurrent_tasks: Maximum number of tasks to run concurrently
        graceful_shutdown_timeout: Timeout for graceful shutdown in seconds
        task_registry: Task registry instance (uses global if None)
    """
    self.max_concurrent_tasks = max_concurrent_tasks
    self.graceful_shutdown_timeout = graceful_shutdown_timeout
    self.task_registry = task_registry or get_task_registry()

    self._running = False
    self._tasks: dict[str, BaseScheduledTask] = {}
    self._semaphore: asyncio.Semaphore | None = None

is_running property

is_running

Check if the scheduler is running.

task_count property

task_count

Get the number of managed tasks.

start async

start()

Start the scheduler and all enabled tasks.

Source code in ccproxy/scheduler/core.py
async def start(self) -> None:
    """Start the scheduler and all enabled tasks."""
    if self._running:
        logger.warning("scheduler_already_running")
        return

    self._running = True
    self._semaphore = asyncio.Semaphore(self.max_concurrent_tasks)

    logger.info(
        "scheduler_starting",
        max_concurrent_tasks=self.max_concurrent_tasks,
        registered_tasks=self.task_registry.list_tasks(),
    )

    try:
        # No automatic task creation - tasks must be explicitly added
        logger.info(
            "scheduler_started",
            active_tasks=len(self._tasks),
            running_tasks=[
                name for name, task in self._tasks.items() if task.is_running
            ],
        )
    except Exception as e:
        self._running = False
        logger.error(
            "scheduler_start_failed",
            error=str(e),
            error_type=type(e).__name__,
        )
        raise SchedulerError(f"Failed to start scheduler: {e}") from e

stop async

stop()

Stop the scheduler and all running tasks.

Source code in ccproxy/scheduler/core.py
async def stop(self) -> None:
    """Stop the scheduler and all running tasks."""
    if not self._running:
        return

    self._running = False
    logger.info("scheduler_stopping", active_tasks=len(self._tasks))

    # Stop all tasks
    stop_tasks = []
    for task_name, task in self._tasks.items():
        if task.is_running:
            logger.debug("stopping_task", task_name=task_name)
            stop_tasks.append(task.stop())

    if stop_tasks:
        try:
            # Wait for all tasks to stop gracefully
            await asyncio.wait_for(
                asyncio.gather(*stop_tasks, return_exceptions=True),
                timeout=self.graceful_shutdown_timeout,
            )
            logger.info("scheduler_stopped_gracefully")
        except TimeoutError:
            logger.warning(
                "scheduler_shutdown_timeout",
                timeout=self.graceful_shutdown_timeout,
            )
            # Tasks should have cancelled themselves, but log the issue
            for task_name, task in self._tasks.items():
                if task.is_running:
                    logger.warning(
                        "task_still_running_after_shutdown", task_name=task_name
                    )
        except Exception as e:
            logger.error(
                "scheduler_shutdown_error",
                error=str(e),
                error_type=type(e).__name__,
            )
            raise SchedulerShutdownError(
                f"Error during scheduler shutdown: {e}"
            ) from e

    self._tasks.clear()
    logger.info("scheduler_stopped")

add_task async

add_task(task_name, task_type, **task_kwargs)

Add and start a task.

Parameters:

Name Type Description Default
task_name str

Unique name for this task instance

required
task_type str

Type of task (must be registered in task registry)

required
**task_kwargs Any

Additional arguments to pass to task constructor

{}

Raises:

Type Description
TaskRegistrationError

If task type is not registered

SchedulerError

If task name already exists or task creation fails

Source code in ccproxy/scheduler/core.py
async def add_task(
    self,
    task_name: str,
    task_type: str,
    **task_kwargs: Any,
) -> None:
    """
    Add and start a task.

    Args:
        task_name: Unique name for this task instance
        task_type: Type of task (must be registered in task registry)
        **task_kwargs: Additional arguments to pass to task constructor

    Raises:
        TaskRegistrationError: If task type is not registered
        SchedulerError: If task name already exists or task creation fails
    """
    if task_name in self._tasks:
        raise SchedulerError(f"Task '{task_name}' already exists")

    if not self.task_registry.is_registered(task_type):
        raise TaskRegistrationError(f"Task type '{task_type}' is not registered")

    try:
        # Get task class and create instance
        task_class = self.task_registry.get(task_type)
        task_instance = task_class(name=task_name, **task_kwargs)

        # Add to our tasks dict
        self._tasks[task_name] = task_instance

        # Start the task if scheduler is running and task is enabled
        if self._running and task_instance.enabled:
            await task_instance.start()
            logger.info(
                "task_added_and_started",
                task_name=task_name,
                task_type=task_type,
            )
        else:
            logger.info(
                "task_added_not_started",
                task_name=task_name,
                task_type=task_type,
                scheduler_running=self._running,
                task_enabled=task_instance.enabled,
            )

    except Exception as e:
        # Clean up if task was partially added
        if task_name in self._tasks:
            del self._tasks[task_name]

        logger.error(
            "task_add_failed",
            task_name=task_name,
            task_type=task_type,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise SchedulerError(f"Failed to add task '{task_name}': {e}") from e

remove_task async

remove_task(task_name)

Remove and stop a task.

Parameters:

Name Type Description Default
task_name str

Name of task to remove

required

Raises:

Type Description
TaskNotFoundError

If task does not exist

Source code in ccproxy/scheduler/core.py
async def remove_task(self, task_name: str) -> None:
    """
    Remove and stop a task.

    Args:
        task_name: Name of task to remove

    Raises:
        TaskNotFoundError: If task does not exist
    """
    if task_name not in self._tasks:
        raise TaskNotFoundError(f"Task '{task_name}' does not exist")

    task = self._tasks[task_name]

    try:
        if task.is_running:
            await task.stop()

        del self._tasks[task_name]
        logger.info("task_removed", task_name=task_name)

    except Exception as e:
        logger.error(
            "task_remove_failed",
            task_name=task_name,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise SchedulerError(f"Failed to remove task '{task_name}': {e}") from e

get_task

get_task(task_name)

Get a task instance by name.

Parameters:

Name Type Description Default
task_name str

Name of task to retrieve

required

Returns:

Type Description
BaseScheduledTask

Task instance

Raises:

Type Description
TaskNotFoundError

If task does not exist

Source code in ccproxy/scheduler/core.py
def get_task(self, task_name: str) -> BaseScheduledTask:
    """
    Get a task instance by name.

    Args:
        task_name: Name of task to retrieve

    Returns:
        Task instance

    Raises:
        TaskNotFoundError: If task does not exist
    """
    if task_name not in self._tasks:
        raise TaskNotFoundError(f"Task '{task_name}' does not exist")

    return self._tasks[task_name]

list_tasks

list_tasks()

Get list of all task names.

Returns:

Type Description
list[str]

List of task names

Source code in ccproxy/scheduler/core.py
def list_tasks(self) -> list[str]:
    """
    Get list of all task names.

    Returns:
        List of task names
    """
    return list(self._tasks.keys())

get_task_status

get_task_status(task_name)

Get status information for a specific task.

Parameters:

Name Type Description Default
task_name str

Name of task

required

Returns:

Type Description
dict[str, Any]

Task status dictionary

Raises:

Type Description
TaskNotFoundError

If task does not exist

Source code in ccproxy/scheduler/core.py
def get_task_status(self, task_name: str) -> dict[str, Any]:
    """
    Get status information for a specific task.

    Args:
        task_name: Name of task

    Returns:
        Task status dictionary

    Raises:
        TaskNotFoundError: If task does not exist
    """
    if task_name not in self._tasks:
        raise TaskNotFoundError(f"Task '{task_name}' does not exist")

    return self._tasks[task_name].get_status()

get_scheduler_status

get_scheduler_status()

Get overall scheduler status information.

Returns:

Type Description
dict[str, Any]

Scheduler status dictionary

Source code in ccproxy/scheduler/core.py
def get_scheduler_status(self) -> dict[str, Any]:
    """
    Get overall scheduler status information.

    Returns:
        Scheduler status dictionary
    """
    running_tasks = [name for name, task in self._tasks.items() if task.is_running]

    return {
        "running": self._running,
        "total_tasks": len(self._tasks),
        "running_tasks": len(running_tasks),
        "max_concurrent_tasks": self.max_concurrent_tasks,
        "graceful_shutdown_timeout": self.graceful_shutdown_timeout,
        "task_names": list(self._tasks.keys()),
        "running_task_names": running_tasks,
        "registered_task_types": self.task_registry.list_tasks(),
    }

TaskRegistry

TaskRegistry()

Registry for managing scheduled task registration and discovery.

Provides a centralized way to register and retrieve scheduled tasks, enabling dynamic task management and configuration.

Source code in ccproxy/scheduler/registry.py
def __init__(self) -> None:
    """Initialize the task registry."""
    self._tasks: dict[str, type[BaseScheduledTask]] = {}

register

register(name, task_class)

Register a scheduled task class.

Parameters:

Name Type Description Default
name str

Unique name for the task

required
task_class type[BaseScheduledTask]

Task class that inherits from BaseScheduledTask

required

Raises:

Type Description
TaskRegistrationError

If task name is already registered or invalid

Source code in ccproxy/scheduler/registry.py
def register(self, name: str, task_class: type[BaseScheduledTask]) -> None:
    """
    Register a scheduled task class.

    Args:
        name: Unique name for the task
        task_class: Task class that inherits from BaseScheduledTask

    Raises:
        TaskRegistrationError: If task name is already registered or invalid
    """
    if name in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is already registered")

    if not issubclass(task_class, BaseScheduledTask):
        raise TaskRegistrationError(
            f"Task class for '{name}' must inherit from BaseScheduledTask"
        )

    self._tasks[name] = task_class
    logger.debug("task_registered", task_name=name, task_class=task_class.__name__)

unregister

unregister(name)

Unregister a scheduled task.

Parameters:

Name Type Description Default
name str

Name of the task to unregister

required

Raises:

Type Description
TaskRegistrationError

If task is not registered

Source code in ccproxy/scheduler/registry.py
def unregister(self, name: str) -> None:
    """
    Unregister a scheduled task.

    Args:
        name: Name of the task to unregister

    Raises:
        TaskRegistrationError: If task is not registered
    """
    if name not in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is not registered")

    del self._tasks[name]
    logger.debug("task_unregistered", task_name=name)

get

get(name)

Get a registered task class by name.

Parameters:

Name Type Description Default
name str

Name of the task to retrieve

required

Returns:

Type Description
type[BaseScheduledTask]

Task class

Raises:

Type Description
TaskRegistrationError

If task is not registered

Source code in ccproxy/scheduler/registry.py
def get(self, name: str) -> type[BaseScheduledTask]:
    """
    Get a registered task class by name.

    Args:
        name: Name of the task to retrieve

    Returns:
        Task class

    Raises:
        TaskRegistrationError: If task is not registered
    """
    if name not in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is not registered")

    return self._tasks[name]

list_tasks

list_tasks()

Get list of all registered task names.

Returns:

Type Description
list[str]

List of registered task names

Source code in ccproxy/scheduler/registry.py
def list_tasks(self) -> list[str]:
    """
    Get list of all registered task names.

    Returns:
        List of registered task names
    """
    return list(self._tasks.keys())

is_registered

is_registered(name)

Check if a task is registered.

Parameters:

Name Type Description Default
name str

Task name to check

required

Returns:

Type Description
bool

True if task is registered, False otherwise

Source code in ccproxy/scheduler/registry.py
def is_registered(self, name: str) -> bool:
    """
    Check if a task is registered.

    Args:
        name: Task name to check

    Returns:
        True if task is registered, False otherwise
    """
    return name in self._tasks

clear

clear()

Clear all registered tasks.

Source code in ccproxy/scheduler/registry.py
def clear(self) -> None:
    """Clear all registered tasks."""
    self._tasks.clear()
    logger.debug("task_registry_cleared")

get_registry_info

get_registry_info()

Get information about the current registry state.

Returns:

Type Description
dict[str, Any]

Dictionary with registry information

Source code in ccproxy/scheduler/registry.py
def get_registry_info(self) -> dict[str, Any]:
    """
    Get information about the current registry state.

    Returns:
        Dictionary with registry information
    """
    return {
        "total_tasks": len(self._tasks),
        "registered_tasks": list(self._tasks.keys()),
        "task_classes": {name: cls.__name__ for name, cls in self._tasks.items()},
    }

BaseScheduledTask

BaseScheduledTask(
    name,
    interval_seconds,
    enabled=True,
    max_backoff_seconds=300.0,
    jitter_factor=0.25,
)

Bases: ABC

Abstract base class for all scheduled tasks.

Provides common functionality for task lifecycle management, error handling, and exponential backoff for failed executions.

Parameters:

Name Type Description Default
name str

Human-readable task name

required
interval_seconds float

Interval between task executions in seconds

required
enabled bool

Whether the task is enabled

True
max_backoff_seconds float

Maximum backoff delay for failed tasks

300.0
jitter_factor float

Jitter factor for backoff randomization (0.0-1.0)

0.25
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    max_backoff_seconds: float = 300.0,
    jitter_factor: float = 0.25,
):
    """
    Initialize scheduled task.

    Args:
        name: Human-readable task name
        interval_seconds: Interval between task executions in seconds
        enabled: Whether the task is enabled
        max_backoff_seconds: Maximum backoff delay for failed tasks
        jitter_factor: Jitter factor for backoff randomization (0.0-1.0)
    """
    self.name = name
    self.interval_seconds = max(1.0, interval_seconds)
    self.enabled = enabled
    self.max_backoff_seconds = max_backoff_seconds
    self.jitter_factor = min(1.0, max(0.0, jitter_factor))

    self._consecutive_failures = 0
    self._last_run_time: float = 0
    self._running = False
    self._task: asyncio.Task[Any] | None = None

is_running property

is_running

Check if the task is currently running.

consecutive_failures property

consecutive_failures

Get the number of consecutive failures.

last_run_time property

last_run_time

Get the timestamp of the last execution.

run abstractmethod async

run()

Execute the scheduled task.

Returns:

Type Description
bool

True if execution was successful, False otherwise

Source code in ccproxy/scheduler/tasks.py
@abstractmethod
async def run(self) -> bool:
    """
    Execute the scheduled task.

    Returns:
        True if execution was successful, False otherwise
    """
    pass

setup async

setup()

Perform any setup required before task execution starts.

Called once when the task is first started. Override if needed. Default implementation does nothing.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """
    Perform any setup required before task execution starts.

    Called once when the task is first started. Override if needed.
    Default implementation does nothing.
    """
    # Default implementation - subclasses can override if needed
    return

cleanup async

cleanup()

Perform any cleanup required after task execution stops.

Called once when the task is stopped. Override if needed. Default implementation does nothing.

Source code in ccproxy/scheduler/tasks.py
async def cleanup(self) -> None:
    """
    Perform any cleanup required after task execution stops.

    Called once when the task is stopped. Override if needed.
    Default implementation does nothing.
    """
    # Default implementation - subclasses can override if needed
    return

calculate_next_delay

calculate_next_delay()

Calculate the delay before the next task execution.

Returns exponential backoff delay for failed tasks, or normal interval for successful tasks, with optional jitter.

Returns:

Type Description
float

Delay in seconds before next execution

Source code in ccproxy/scheduler/tasks.py
def calculate_next_delay(self) -> float:
    """
    Calculate the delay before the next task execution.

    Returns exponential backoff delay for failed tasks, or normal interval
    for successful tasks, with optional jitter.

    Returns:
        Delay in seconds before next execution
    """
    if self._consecutive_failures == 0:
        base_delay = self.interval_seconds
    else:
        # Exponential backoff: interval * (2 ^ failures)
        base_delay = self.interval_seconds * (2**self._consecutive_failures)
        base_delay = min(base_delay, self.max_backoff_seconds)

    # Add jitter to prevent thundering herd
    if self.jitter_factor > 0:
        jitter = base_delay * self.jitter_factor * (random.random() - 0.5)
        base_delay += jitter

    return max(1.0, base_delay)

start async

start()

Start the scheduled task execution loop.

Source code in ccproxy/scheduler/tasks.py
async def start(self) -> None:
    """Start the scheduled task execution loop."""
    if self._running or not self.enabled:
        return

    self._running = True
    logger.debug("task_starting", task_name=self.name)

    try:
        await self.setup()
        self._task = asyncio.create_task(self._run_loop())
        logger.debug("task_started", task_name=self.name)
    except Exception as e:
        self._running = False
        logger.error(
            "task_start_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise

stop async

stop()

Stop the scheduled task execution loop.

Source code in ccproxy/scheduler/tasks.py
async def stop(self) -> None:
    """Stop the scheduled task execution loop."""
    if not self._running:
        return

    self._running = False
    logger.debug("task_stopping", task_name=self.name)

    # Cancel the running task
    if self._task and not self._task.done():
        self._task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._task

    try:
        await self.cleanup()
        logger.debug("task_stopped", task_name=self.name)
    except Exception as e:
        logger.error(
            "task_cleanup_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )

get_status

get_status()

Get current task status information.

Returns:

Type Description
dict[str, Any]

Dictionary with task status details

Source code in ccproxy/scheduler/tasks.py
def get_status(self) -> dict[str, Any]:
    """
    Get current task status information.

    Returns:
        Dictionary with task status details
    """
    return {
        "name": self.name,
        "enabled": self.enabled,
        "running": self.is_running,
        "interval_seconds": self.interval_seconds,
        "consecutive_failures": self.consecutive_failures,
        "last_run_time": self.last_run_time,
        "next_delay": self.calculate_next_delay() if self.is_running else None,
    }

PricingCacheUpdateTask

PricingCacheUpdateTask(
    name,
    interval_seconds,
    enabled=True,
    force_refresh_on_startup=False,
    pricing_updater=None,
)

Bases: BaseScheduledTask

Task for updating pricing cache periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between pricing updates

required
enabled bool

Whether task is enabled

True
force_refresh_on_startup bool

Whether to force refresh on first run

False
pricing_updater Any | None

Injected pricing updater instance

None
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    force_refresh_on_startup: bool = False,
    pricing_updater: Any | None = None,
):
    """
    Initialize pricing cache update task.

    Args:
        name: Task name
        interval_seconds: Interval between pricing updates
        enabled: Whether task is enabled
        force_refresh_on_startup: Whether to force refresh on first run
        pricing_updater: Injected pricing updater instance
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
    )
    self.force_refresh_on_startup = force_refresh_on_startup
    self._pricing_updater = pricing_updater
    self._first_run = True

setup async

setup()

Initialize pricing updater instance if not injected.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """Initialize pricing updater instance if not injected."""
    if self._pricing_updater is None:
        try:
            from ccproxy.config.pricing import PricingSettings
            from ccproxy.pricing.cache import PricingCache
            from ccproxy.pricing.updater import PricingUpdater

            # Create pricing components with dependency injection
            settings = PricingSettings()
            cache = PricingCache(settings)
            self._pricing_updater = PricingUpdater(cache, settings)
            logger.debug("pricing_update_task_setup_complete", task_name=self.name)
        except Exception as e:
            logger.error(
                "pricing_update_task_setup_failed",
                task_name=self.name,
                error=str(e),
                error_type=type(e).__name__,
            )
            raise
    else:
        logger.debug(
            "pricing_update_task_using_injected_updater", task_name=self.name
        )

run async

run()

Execute pricing cache update.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Execute pricing cache update."""
    try:
        if not self._pricing_updater:
            logger.warning("pricing_update_no_updater", task_name=self.name)
            return False

        # Force refresh on first run if configured
        force_refresh = self._first_run and self.force_refresh_on_startup
        self._first_run = False

        if force_refresh:
            logger.info("pricing_update_force_refresh_startup", task_name=self.name)
            refresh_result = await self._pricing_updater.force_refresh()
            success = bool(refresh_result)
        else:
            # Regular update check
            pricing_data = await self._pricing_updater.get_current_pricing(
                force_refresh=False
            )
            success = pricing_data is not None

        if success:
            logger.debug("pricing_update_success", task_name=self.name)
        else:
            logger.warning("pricing_update_failed", task_name=self.name)

        return success

    except Exception as e:
        logger.error(
            "pricing_update_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

PushgatewayTask

PushgatewayTask(
    name,
    interval_seconds,
    enabled=True,
    max_backoff_seconds=300.0,
)

Bases: BaseScheduledTask

Task for pushing metrics to Pushgateway periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between pushgateway operations

required
enabled bool

Whether task is enabled

True
max_backoff_seconds float

Maximum backoff delay for failures

300.0
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    max_backoff_seconds: float = 300.0,
):
    """
    Initialize pushgateway task.

    Args:
        name: Task name
        interval_seconds: Interval between pushgateway operations
        enabled: Whether task is enabled
        max_backoff_seconds: Maximum backoff delay for failures
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
        max_backoff_seconds=max_backoff_seconds,
    )
    self._metrics_instance: Any | None = None

setup async

setup()

Initialize metrics instance for pushgateway operations.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """Initialize metrics instance for pushgateway operations."""
    try:
        from ccproxy.observability.metrics import get_metrics

        self._metrics_instance = get_metrics()
        logger.debug("pushgateway_task_setup_complete", task_name=self.name)
    except Exception as e:
        logger.error(
            "pushgateway_task_setup_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise

run async

run()

Execute pushgateway metrics push.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Execute pushgateway metrics push."""
    try:
        if not self._metrics_instance:
            logger.warning("pushgateway_no_metrics_instance", task_name=self.name)
            return False

        if not self._metrics_instance.is_pushgateway_enabled():
            logger.debug("pushgateway_disabled", task_name=self.name)
            return True  # Not an error, just disabled

        success = bool(self._metrics_instance.push_to_gateway())

        if success:
            logger.debug("pushgateway_push_success", task_name=self.name)
        else:
            logger.warning("pushgateway_push_failed", task_name=self.name)

        return success

    except Exception as e:
        logger.error(
            "pushgateway_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

StatsPrintingTask

StatsPrintingTask(name, interval_seconds, enabled=True)

Bases: BaseScheduledTask

Task for printing stats summary periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between stats printing

required
enabled bool

Whether task is enabled

True
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
):
    """
    Initialize stats printing task.

    Args:
        name: Task name
        interval_seconds: Interval between stats printing
        enabled: Whether task is enabled
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
    )
    self._stats_collector_instance: Any | None = None
    self._metrics_instance: Any | None = None

setup async

setup()

Initialize stats collector and metrics instances.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """Initialize stats collector and metrics instances."""
    try:
        from ccproxy.config.settings import get_settings
        from ccproxy.observability.metrics import get_metrics
        from ccproxy.observability.stats_printer import get_stats_collector

        self._metrics_instance = get_metrics()
        settings = get_settings()
        self._stats_collector_instance = get_stats_collector(
            settings=settings.observability,
            metrics_instance=self._metrics_instance,
        )
        logger.debug("stats_printing_task_setup_complete", task_name=self.name)
    except Exception as e:
        logger.error(
            "stats_printing_task_setup_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise

run async

run()

Execute stats printing.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Execute stats printing."""
    try:
        if not self._stats_collector_instance:
            logger.warning("stats_printing_no_collector", task_name=self.name)
            return False

        await self._stats_collector_instance.print_stats()
        logger.debug("stats_printing_success", task_name=self.name)
        return True

    except Exception as e:
        logger.error(
            "stats_printing_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False