Skip to content

ccproxy.plugins.pricing.tasks

ccproxy.plugins.pricing.tasks

Pricing plugin scheduled tasks.

BaseScheduledTask

BaseScheduledTask(
    name,
    interval_seconds,
    enabled=True,
    max_backoff_seconds=300.0,
    jitter_factor=0.25,
)

Bases: ABC

Abstract base class for all scheduled tasks.

Provides common functionality for task lifecycle management, error handling, and exponential backoff for failed executions.

Parameters:

Name Type Description Default
name str

Human-readable task name

required
interval_seconds float

Interval between task executions in seconds

required
enabled bool

Whether the task is enabled

True
max_backoff_seconds float

Maximum backoff delay for failed tasks

300.0
jitter_factor float

Jitter factor for backoff randomization (0.0-1.0)

0.25
Source code in ccproxy/plugins/pricing/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    enabled: bool = True,
    max_backoff_seconds: float = 300.0,
    jitter_factor: float = 0.25,
):
    """
    Initialize scheduled task.

    Args:
        name: Human-readable task name
        interval_seconds: Interval between task executions in seconds
        enabled: Whether the task is enabled
        max_backoff_seconds: Maximum backoff delay for failed tasks
        jitter_factor: Jitter factor for backoff randomization (0.0-1.0)
    """
    self.name = name
    self.interval_seconds = max(1.0, interval_seconds)
    self.enabled = enabled
    self.max_backoff_seconds = max_backoff_seconds
    self.jitter_factor = min(1.0, max(0.0, jitter_factor))

    # Task state
    self._task: asyncio.Task[None] | None = None
    self._stop_event = asyncio.Event()
    self._consecutive_failures = 0
    self._last_success_time: float | None = None
    self._next_run_time: float | None = None

run abstractmethod async

run()

Execute the task logic.

Returns:

Type Description
bool

True if task completed successfully, False otherwise

Source code in ccproxy/plugins/pricing/tasks.py
@abstractmethod
async def run(self) -> bool:
    """
    Execute the task logic.

    Returns:
        True if task completed successfully, False otherwise
    """

setup async

setup()

Optional setup hook called before the task starts running.

Override this method to perform any initialization required by the task.

Source code in ccproxy/plugins/pricing/tasks.py
async def setup(self) -> None:  # noqa: B027
    """
    Optional setup hook called before the task starts running.

    Override this method to perform any initialization required by the task.
    """
    pass

teardown async

teardown()

Optional teardown hook called when the task stops.

Override this method to perform any cleanup required by the task.

Source code in ccproxy/plugins/pricing/tasks.py
async def teardown(self) -> None:  # noqa: B027
    """
    Optional teardown hook called when the task stops.

    Override this method to perform any cleanup required by the task.
    """
    pass

start async

start()

Start the scheduled task.

Source code in ccproxy/plugins/pricing/tasks.py
async def start(self) -> None:
    """Start the scheduled task."""
    if not self.enabled:
        logger.info("scheduled_task_disabled", task_name=self.name)
        return

    if self._task and not self._task.done():
        logger.warning("scheduled_task_already_running", task_name=self.name)
        return

    self._stop_event.clear()
    self._task = await create_managed_task(
        self._task_loop(), name=f"scheduled_task_{self.name}"
    )

stop async

stop(timeout=10.0)

Stop the scheduled task.

Source code in ccproxy/plugins/pricing/tasks.py
async def stop(self, timeout: float = 10.0) -> None:
    """Stop the scheduled task."""
    if not self._task:
        return

    logger.info("scheduled_task_stopping", task_name=self.name)

    # Signal stop
    self._stop_event.set()

    # Wait for task to complete
    try:
        await asyncio.wait_for(self._task, timeout=timeout)
    except TimeoutError:
        logger.warning(
            "scheduled_task_stop_timeout", task_name=self.name, timeout=timeout
        )
        if not self._task.done():
            self._task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._task

    self._task = None

is_running

is_running()

Check if task is currently running.

Source code in ccproxy/plugins/pricing/tasks.py
def is_running(self) -> bool:
    """Check if task is currently running."""
    return self._task is not None and not self._task.done()

get_status

get_status()

Get current task status information.

Source code in ccproxy/plugins/pricing/tasks.py
def get_status(self) -> dict[str, Any]:
    """Get current task status information."""
    now = time.time()
    return {
        "name": self.name,
        "enabled": self.enabled,
        "running": self.is_running(),
        "consecutive_failures": self._consecutive_failures,
        "last_success_time": self._last_success_time,
        "last_success_ago_seconds": (
            now - self._last_success_time if self._last_success_time else None
        ),
        "next_run_time": self._next_run_time,
        "next_run_in_seconds": (
            self._next_run_time - now if self._next_run_time else None
        ),
        "interval_seconds": self.interval_seconds,
    }

PricingCacheUpdateTask

PricingCacheUpdateTask(
    name,
    interval_seconds,
    pricing_service,
    enabled=True,
    force_refresh_on_startup=False,
)

Bases: BaseScheduledTask

Task for updating pricing cache periodically.

Parameters:

Name Type Description Default
name str

Task name

required
interval_seconds float

Interval between pricing updates

required
pricing_service PricingService

Pricing service instance

required
enabled bool

Whether task is enabled

True
force_refresh_on_startup bool

Whether to force refresh on first run

False
Source code in ccproxy/plugins/pricing/tasks.py
def __init__(
    self,
    name: str,
    interval_seconds: float,
    pricing_service: PricingService,
    enabled: bool = True,
    force_refresh_on_startup: bool = False,
):
    """
    Initialize pricing cache update task.

    Args:
        name: Task name
        interval_seconds: Interval between pricing updates
        pricing_service: Pricing service instance
        enabled: Whether task is enabled
        force_refresh_on_startup: Whether to force refresh on first run
    """
    super().__init__(
        name=name,
        interval_seconds=interval_seconds,
        enabled=enabled,
    )
    self.pricing_service = pricing_service
    self.force_refresh_on_startup = force_refresh_on_startup
    self._first_run = True

run async

run()

Execute pricing cache update.

Source code in ccproxy/plugins/pricing/tasks.py
async def run(self) -> bool:
    """Execute pricing cache update."""
    try:
        if not self.pricing_service.config.enabled:
            logger.debug("pricing_service_disabled", task_name=self.name)
            return True  # Not a failure, just disabled

        # Force refresh on first run if configured
        force_refresh = self._first_run and self.force_refresh_on_startup
        self._first_run = False

        if force_refresh:
            logger.info("pricing_update_force_refresh_startup", task_name=self.name)
            success = await self.pricing_service.force_refresh_pricing()
        else:
            # Regular update check
            pricing_data = await self.pricing_service.get_current_pricing(
                force_refresh=False
            )
            success = pricing_data is not None

        if success:
            logger.debug("pricing_update_success", task_name=self.name)
        else:
            logger.warning("pricing_update_failed", task_name=self.name)

        return success

    except Exception as e:
        logger.error(
            "pricing_update_task_error",
            task_name=self.name,
            error=str(e),
            error_type=type(e).__name__,
            exc_info=e,
        )
        return False