Skip to content

ccproxy.scheduler.tasks

ccproxy.scheduler.tasks

Base scheduled task classes and task implementations.

BaseScheduledTask

BaseScheduledTask(
    name,
    interval_seconds,
    enabled=True,
    max_backoff_seconds=300.0,
    jitter_factor=0.25,
)

Bases: ABC

Abstract base class for all scheduled tasks.

Provides common functionality for task lifecycle management, error handling, and exponential backoff for failed executions.

Parameters:

Name Type Description Default
name str

Human-readable task name

required
interval_seconds float

Interval between task executions in seconds

required
enabled bool

Whether the task is enabled

True
max_backoff_seconds float

Maximum backoff delay for failed tasks

300.0
jitter_factor float

Jitter factor for backoff randomization (0.0-1.0)

0.25
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    max_backoff_seconds: float = 300.0,
    jitter_factor: float = 0.25,
):
    """
    Initialize scheduled task.

    Args:
        name: Human-readable task name
        interval_seconds: Interval between task executions in seconds
        enabled: Whether the task is enabled
        max_backoff_seconds: Maximum backoff delay for failed tasks
        jitter_factor: Jitter factor for backoff randomization (0.0-1.0)
    """
    self.name = name
    self.interval_seconds = max(1.0, interval_seconds)
    self.enabled = enabled
    self.max_backoff_seconds = max_backoff_seconds
    self.jitter_factor = min(1.0, max(0.0, jitter_factor))

    self._consecutive_failures = 0
    self._last_run_time: float = 0
    self._running = False
    self._task: asyncio.Task[Any] | None = None

is_running property

is_running

Check if the task is currently running.

consecutive_failures property

consecutive_failures

Get the number of consecutive failures.

last_run_time property

last_run_time

Get the timestamp of the last execution.

run abstractmethod async

run()

Execute the scheduled task.

Returns:

Type Description
bool

True if execution was successful, False otherwise

Source code in ccproxy/scheduler/tasks.py
@abstractmethod
async def run(self) -> bool:
    """
    Execute the scheduled task.

    Returns:
        True if execution was successful, False otherwise
    """
    pass

setup async

setup()

Perform any setup required before task execution starts.

Called once when the task is first started. Override if needed. Default implementation does nothing.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """
    Perform any setup required before task execution starts.

    Called once when the task is first started. Override if needed.
    Default implementation does nothing.
    """
    # Default implementation - subclasses can override if needed
    return

cleanup async

cleanup()

Perform any cleanup required after task execution stops.

Called once when the task is stopped. Override if needed. Default implementation does nothing.

Source code in ccproxy/scheduler/tasks.py
async def cleanup(self) -> None:
    """
    Perform any cleanup required after task execution stops.

    Called once when the task is stopped. Override if needed.
    Default implementation does nothing.
    """
    # Default implementation - subclasses can override if needed
    return

calculate_next_delay

calculate_next_delay()

Calculate the delay before the next task execution.

Returns exponential backoff delay for failed tasks, or normal interval for successful tasks, with optional jitter.

Returns:

Type Description
float

Delay in seconds before next execution

Source code in ccproxy/scheduler/tasks.py
def calculate_next_delay(self) -> float:
    """
    Calculate the delay before the next task execution.

    Returns exponential backoff delay for failed tasks, or normal interval
    for successful tasks, with optional jitter.

    Returns:
        Delay in seconds before next execution
    """
    if self._consecutive_failures == 0:
        base_delay = self.interval_seconds
    else:
        # Exponential backoff: interval * (2 ^ failures)
        base_delay = self.interval_seconds * (2**self._consecutive_failures)
        base_delay = min(base_delay, self.max_backoff_seconds)

    # Add jitter to prevent thundering herd
    if self.jitter_factor > 0:
        jitter = base_delay * self.jitter_factor * (random.random() - 0.5)
        base_delay += jitter

    return max(1.0, base_delay)

start async

start()

Start the scheduled task execution loop.

Source code in ccproxy/scheduler/tasks.py
async def start(self) -> None:
    """Start the scheduled task execution loop."""
    if self._running or not self.enabled:
        return

    self._running = True
    logger.debug("task_starting", task_name=self.name)

    try:
        await self.setup()
        self._task = asyncio.create_task(self._run_loop())
        logger.debug("task_started", task_name=self.name)
    except Exception as e:
        self._running = False
        logger.error(
            "task_start_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise

stop async

stop()

Stop the scheduled task execution loop.

Source code in ccproxy/scheduler/tasks.py
async def stop(self) -> None:
    """Stop the scheduled task execution loop."""
    if not self._running:
        return

    self._running = False
    logger.debug("task_stopping", task_name=self.name)

    # Cancel the running task
    if self._task and not self._task.done():
        self._task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._task

    try:
        await self.cleanup()
        logger.debug("task_stopped", task_name=self.name)
    except Exception as e:
        logger.error(
            "task_cleanup_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )

get_status

get_status()

Get current task status information.

Returns:

Type Description
dict[str, Any]

Dictionary with task status details

Source code in ccproxy/scheduler/tasks.py
def get_status(self) -> dict[str, Any]:
    """
    Get current task status information.

    Returns:
        Dictionary with task status details
    """
    return {
        "name": self.name,
        "enabled": self.enabled,
        "running": self.is_running,
        "interval_seconds": self.interval_seconds,
        "consecutive_failures": self.consecutive_failures,
        "last_run_time": self.last_run_time,
        "next_delay": self.calculate_next_delay() if self.is_running else None,
    }

PushgatewayTask

PushgatewayTask(
    name,
    interval_seconds,
    enabled=True,
    max_backoff_seconds=300.0,
)

Bases: BaseScheduledTask

Task for pushing metrics to Pushgateway periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between pushgateway operations

required
enabled bool

Whether task is enabled

True
max_backoff_seconds float

Maximum backoff delay for failures

300.0
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    max_backoff_seconds: float = 300.0,
):
    """
    Initialize pushgateway task.

    Args:
        name: Task name
        interval_seconds: Interval between pushgateway operations
        enabled: Whether task is enabled
        max_backoff_seconds: Maximum backoff delay for failures
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
        max_backoff_seconds=max_backoff_seconds,
    )
    self._metrics_instance: Any | None = None

setup async

setup()

Initialize metrics instance for pushgateway operations.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """Initialize metrics instance for pushgateway operations."""
    try:
        from ccproxy.observability.metrics import get_metrics

        self._metrics_instance = get_metrics()
        logger.debug("pushgateway_task_setup_complete", task_name=self.name)
    except Exception as e:
        logger.error(
            "pushgateway_task_setup_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise

run async

run()

Execute pushgateway metrics push.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Execute pushgateway metrics push."""
    try:
        if not self._metrics_instance:
            logger.warning("pushgateway_no_metrics_instance", task_name=self.name)
            return False

        if not self._metrics_instance.is_pushgateway_enabled():
            logger.debug("pushgateway_disabled", task_name=self.name)
            return True  # Not an error, just disabled

        success = bool(self._metrics_instance.push_to_gateway())

        if success:
            logger.debug("pushgateway_push_success", task_name=self.name)
        else:
            logger.warning("pushgateway_push_failed", task_name=self.name)

        return success

    except Exception as e:
        logger.error(
            "pushgateway_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

StatsPrintingTask

StatsPrintingTask(name, interval_seconds, enabled=True)

Bases: BaseScheduledTask

Task for printing stats summary periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between stats printing

required
enabled bool

Whether task is enabled

True
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
):
    """
    Initialize stats printing task.

    Args:
        name: Task name
        interval_seconds: Interval between stats printing
        enabled: Whether task is enabled
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
    )
    self._stats_collector_instance: Any | None = None
    self._metrics_instance: Any | None = None

setup async

setup()

Initialize stats collector and metrics instances.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """Initialize stats collector and metrics instances."""
    try:
        from ccproxy.config.settings import get_settings
        from ccproxy.observability.metrics import get_metrics
        from ccproxy.observability.stats_printer import get_stats_collector

        self._metrics_instance = get_metrics()
        settings = get_settings()
        self._stats_collector_instance = get_stats_collector(
            settings=settings.observability,
            metrics_instance=self._metrics_instance,
        )
        logger.debug("stats_printing_task_setup_complete", task_name=self.name)
    except Exception as e:
        logger.error(
            "stats_printing_task_setup_failed",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        raise

run async

run()

Execute stats printing.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Execute stats printing."""
    try:
        if not self._stats_collector_instance:
            logger.warning("stats_printing_no_collector", task_name=self.name)
            return False

        await self._stats_collector_instance.print_stats()
        logger.debug("stats_printing_success", task_name=self.name)
        return True

    except Exception as e:
        logger.error(
            "stats_printing_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

PricingCacheUpdateTask

PricingCacheUpdateTask(
    name,
    interval_seconds,
    enabled=True,
    force_refresh_on_startup=False,
    pricing_updater=None,
)

Bases: BaseScheduledTask

Task for updating pricing cache periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between pricing updates

required
enabled bool

Whether task is enabled

True
force_refresh_on_startup bool

Whether to force refresh on first run

False
pricing_updater Any | None

Injected pricing updater instance

None
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    force_refresh_on_startup: bool = False,
    pricing_updater: Any | None = None,
):
    """
    Initialize pricing cache update task.

    Args:
        name: Task name
        interval_seconds: Interval between pricing updates
        enabled: Whether task is enabled
        force_refresh_on_startup: Whether to force refresh on first run
        pricing_updater: Injected pricing updater instance
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
    )
    self.force_refresh_on_startup = force_refresh_on_startup
    self._pricing_updater = pricing_updater
    self._first_run = True

setup async

setup()

Initialize pricing updater instance if not injected.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """Initialize pricing updater instance if not injected."""
    if self._pricing_updater is None:
        try:
            from ccproxy.config.pricing import PricingSettings
            from ccproxy.pricing.cache import PricingCache
            from ccproxy.pricing.updater import PricingUpdater

            # Create pricing components with dependency injection
            settings = PricingSettings()
            cache = PricingCache(settings)
            self._pricing_updater = PricingUpdater(cache, settings)
            logger.debug("pricing_update_task_setup_complete", task_name=self.name)
        except Exception as e:
            logger.error(
                "pricing_update_task_setup_failed",
                task_name=self.name,
                error=str(e),
                error_type=type(e).__name__,
            )
            raise
    else:
        logger.debug(
            "pricing_update_task_using_injected_updater", task_name=self.name
        )

run async

run()

Execute pricing cache update.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Execute pricing cache update."""
    try:
        if not self._pricing_updater:
            logger.warning("pricing_update_no_updater", task_name=self.name)
            return False

        # Force refresh on first run if configured
        force_refresh = self._first_run and self.force_refresh_on_startup
        self._first_run = False

        if force_refresh:
            logger.info("pricing_update_force_refresh_startup", task_name=self.name)
            refresh_result = await self._pricing_updater.force_refresh()
            success = bool(refresh_result)
        else:
            # Regular update check
            pricing_data = await self._pricing_updater.get_current_pricing(
                force_refresh=False
            )
            success = pricing_data is not None

        if success:
            logger.debug("pricing_update_success", task_name=self.name)
        else:
            logger.warning("pricing_update_failed", task_name=self.name)

        return success

    except Exception as e:
        logger.error(
            "pricing_update_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

PoolStatsTask

PoolStatsTask(
    name, interval_seconds, enabled=True, pool_manager=None
)

Bases: BaseScheduledTask

Task for displaying pool statistics periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between stats display

required
enabled bool

Whether task is enabled

True
pool_manager Any | None

Injected pool manager instance

None
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    pool_manager: Any | None = None,
):
    """
    Initialize pool stats task.

    Args:
        name: Task name
        interval_seconds: Interval between stats display
        enabled: Whether task is enabled
        pool_manager: Injected pool manager instance
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
    )
    self._pool_manager = pool_manager

setup async

setup()

Initialize pool manager instance if not injected.

Source code in ccproxy/scheduler/tasks.py
async def setup(self) -> None:
    """Initialize pool manager instance if not injected."""
    if self._pool_manager is None:
        logger.warning(
            "pool_stats_task_no_manager",
            task_name=self.name,
            message="Pool manager not injected, task will be disabled",
        )

run async

run()

Display pool statistics.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Display pool statistics."""
    try:
        if not self._pool_manager:
            return True  # Not an error, just no pool manager available

        # Get general pool stats (if available)
        general_pool = getattr(self._pool_manager, "_pool", None)
        general_stats = None
        if general_pool:
            general_stats = general_pool.get_stats()

        # Get session pool stats
        session_pool = getattr(self._pool_manager, "_session_pool", None)
        session_stats = None
        if session_pool:
            session_stats = await session_pool.get_stats()

        # Log pool statistics
        logger.debug(
            "pool_stats_report",
            task_name=self.name,
            general_pool={
                "enabled": bool(general_pool),
                "total_clients": general_stats.total_clients
                if general_stats
                else 0,
                "available_clients": general_stats.available_clients
                if general_stats
                else 0,
                "active_clients": general_stats.active_clients
                if general_stats
                else 0,
                "connections_created": general_stats.connections_created
                if general_stats
                else 0,
                "connections_closed": general_stats.connections_closed
                if general_stats
                else 0,
                "acquire_count": general_stats.acquire_count
                if general_stats
                else 0,
                "release_count": general_stats.release_count
                if general_stats
                else 0,
                "health_check_failures": general_stats.health_check_failures
                if general_stats
                else 0,
            }
            if general_pool
            else None,
            session_pool={
                "enabled": session_stats.get("enabled", False)
                if session_stats
                else False,
                "total_sessions": session_stats.get("total_sessions", 0)
                if session_stats
                else 0,
                "active_sessions": session_stats.get("active_sessions", 0)
                if session_stats
                else 0,
                "max_sessions": session_stats.get("max_sessions", 0)
                if session_stats
                else 0,
                "total_messages": session_stats.get("total_messages", 0)
                if session_stats
                else 0,
                "session_ttl": session_stats.get("session_ttl", 0)
                if session_stats
                else 0,
            }
            if session_pool
            else None,
        )

        return True

    except Exception as e:
        logger.error(
            "pool_stats_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False

VersionUpdateCheckTask

VersionUpdateCheckTask(
    name,
    interval_seconds,
    enabled=True,
    version_check_cache_ttl_hours=1.0,
    *,
    skip_first_scheduled_run=True,
)

Bases: BaseScheduledTask

Task for checking version updates periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between version checks

required
enabled bool

Whether task is enabled

True
version_check_cache_ttl_hours float

Maximum cache age (hours) used at startup before contacting GitHub

1.0
skip_first_scheduled_run bool

If True, first scheduled loop execution is skipped

True
Source code in ccproxy/scheduler/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    version_check_cache_ttl_hours: float = 1.0,
    *,
    skip_first_scheduled_run: bool = True,
):
    """
    Initialize version update check task.

    Args:
        name: Task name
        interval_seconds: Interval between version checks
        enabled: Whether task is enabled
        version_check_cache_ttl_hours: Maximum cache age (hours) used at startup before contacting GitHub
        skip_first_scheduled_run: If True, first scheduled loop execution is skipped
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
    )
    self.version_check_cache_ttl_hours = version_check_cache_ttl_hours
    # Mark first scheduled execution; allow skipping to avoid duplicate run after startup
    self._first_run = True
    self._skip_first_run = skip_first_scheduled_run

run async

run()

Execute version update check.

Source code in ccproxy/scheduler/tasks.py
async def run(self) -> bool:
    """Execute version update check."""
    try:
        logger.debug(
            "version_check_task_run_start",
            task_name=self.name,
            first_run=self._first_run,
        )
        from datetime import datetime

        from ccproxy.utils.version_checker import (
            VersionCheckState,
            fetch_latest_github_version,
            get_current_version,
            get_version_check_state_path,
            load_check_state,
            save_check_state,
        )

        state_path = get_version_check_state_path()
        current_time = datetime.now(UTC)

        # Skip first scheduled run to avoid duplicate check after startup
        if self._first_run and self._skip_first_run:
            self._first_run = False
            logger.debug(
                "version_check_first_run_skipped",
                task_name=self.name,
                message="Skipping first scheduled run since startup check already completed",
            )
            return True

        # Determine freshness window using configured cache TTL
        # Applies to both startup and scheduled runs to avoid unnecessary network calls
        max_age_hours = self.version_check_cache_ttl_hours

        # Load previous state if available
        prev_state: VersionCheckState | None = await load_check_state(state_path)
        latest_version: str | None = None
        source: str | None = None

        # If we have a recent state within the freshness window, avoid network call
        if prev_state is not None:
            age_hours = (
                current_time - prev_state.last_check_at
            ).total_seconds() / 3600.0
            if age_hours < max_age_hours:
                logger.debug(
                    "version_check_cache_fresh",
                    task_name=self.name,
                    age_hours=round(age_hours, 3),
                    max_age_hours=max_age_hours,
                )
                latest_version = prev_state.latest_version_found
                source = "cache"
            else:
                logger.debug(
                    "version_check_cache_stale",
                    task_name=self.name,
                    age_hours=round(age_hours, 3),
                    max_age_hours=max_age_hours,
                )

        # Fetch only if we don't have a fresh cached version
        if latest_version is None:
            latest_version = await fetch_latest_github_version()
            if latest_version is None:
                logger.warning("version_check_fetch_failed", task_name=self.name)
                return False
            # Persist refreshed state
            new_state = VersionCheckState(
                last_check_at=current_time,
                latest_version_found=latest_version,
            )
            await save_check_state(state_path, new_state)
            source = "network"
        else:
            # Ensure state file at least exists; if it didn't, we wouldn't be here
            pass

        # Compare versions and log result
        current_version = get_current_version()
        self._log_version_comparison(current_version, latest_version, source=source)

        # Mark first run as complete
        if self._first_run:
            self._first_run = False

        return True

    except Exception as e:
        logger.error(
            "version_check_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
        )
        return False