ccproxy.scheduler¶
ccproxy.scheduler
¶
Scheduler system for periodic tasks.
This module provides a generic, extensible scheduler for managing periodic tasks in the CCProxy API. It provides a centralized system that supports:
- Generic task scheduling with configurable intervals
- Task registration and discovery via registry pattern
- Graceful startup and shutdown with FastAPI integration
- Error handling with exponential backoff
- Structured logging and monitoring
Key components: - Scheduler: Core scheduler engine for task management - BaseScheduledTask: Abstract base class for all scheduled tasks - TaskRegistry: Dynamic task registration and discovery system
Scheduler
¶
Scheduler for managing multiple periodic tasks.
Provides centralized management of scheduled tasks with: - Dynamic task registration and configuration - Graceful startup and shutdown - Task monitoring and status reporting - Error handling and recovery
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_concurrent_tasks
|
int
|
Maximum number of tasks to run concurrently |
10
|
graceful_shutdown_timeout
|
float
|
Timeout for graceful shutdown in seconds |
30.0
|
task_registry
|
TaskRegistry | None
|
Task registry instance (uses global if None) |
None
|
Source code in ccproxy/scheduler/core.py
start
async
¶
Start the scheduler and all enabled tasks.
Source code in ccproxy/scheduler/core.py
stop
async
¶
Stop the scheduler and all running tasks.
Source code in ccproxy/scheduler/core.py
add_task
async
¶
Add and start a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name
|
str
|
Unique name for this task instance |
required |
task_type
|
str
|
Type of task (must be registered in task registry) |
required |
**task_kwargs
|
Any
|
Additional arguments to pass to task constructor |
{}
|
Raises:
Type | Description |
---|---|
TaskRegistrationError
|
If task type is not registered |
SchedulerError
|
If task name already exists or task creation fails |
Source code in ccproxy/scheduler/core.py
remove_task
async
¶
Remove and stop a task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name
|
str
|
Name of task to remove |
required |
Raises:
Type | Description |
---|---|
TaskNotFoundError
|
If task does not exist |
Source code in ccproxy/scheduler/core.py
get_task
¶
Get a task instance by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name
|
str
|
Name of task to retrieve |
required |
Returns:
Type | Description |
---|---|
BaseScheduledTask
|
Task instance |
Raises:
Type | Description |
---|---|
TaskNotFoundError
|
If task does not exist |
Source code in ccproxy/scheduler/core.py
list_tasks
¶
get_task_status
¶
Get status information for a specific task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task_name
|
str
|
Name of task |
required |
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Task status dictionary |
Raises:
Type | Description |
---|---|
TaskNotFoundError
|
If task does not exist |
Source code in ccproxy/scheduler/core.py
get_scheduler_status
¶
Get overall scheduler status information.
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Scheduler status dictionary |
Source code in ccproxy/scheduler/core.py
TaskRegistry
¶
Registry for managing scheduled task registration and discovery.
Provides a centralized way to register and retrieve scheduled tasks, enabling dynamic task management and configuration.
Source code in ccproxy/scheduler/registry.py
register
¶
Register a scheduled task class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Unique name for the task |
required |
task_class
|
type[BaseScheduledTask]
|
Task class that inherits from BaseScheduledTask |
required |
Raises:
Type | Description |
---|---|
TaskRegistrationError
|
If task name is already registered or invalid |
Source code in ccproxy/scheduler/registry.py
unregister
¶
Unregister a scheduled task.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the task to unregister |
required |
Raises:
Type | Description |
---|---|
TaskRegistrationError
|
If task is not registered |
Source code in ccproxy/scheduler/registry.py
get
¶
Get a registered task class by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the task to retrieve |
required |
Returns:
Type | Description |
---|---|
type[BaseScheduledTask]
|
Task class |
Raises:
Type | Description |
---|---|
TaskRegistrationError
|
If task is not registered |
Source code in ccproxy/scheduler/registry.py
list_tasks
¶
is_registered
¶
clear
¶
get_registry_info
¶
Get information about the current registry state.
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary with registry information |
Source code in ccproxy/scheduler/registry.py
BaseScheduledTask
¶
BaseScheduledTask(
name,
interval_seconds,
enabled=True,
max_backoff_seconds=300.0,
jitter_factor=0.25,
)
Bases: ABC
Abstract base class for all scheduled tasks.
Provides common functionality for task lifecycle management, error handling, and exponential backoff for failed executions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Human-readable task name |
required |
interval_seconds
|
float
|
Interval between task executions in seconds |
required |
enabled
|
bool
|
Whether the task is enabled |
True
|
max_backoff_seconds
|
float
|
Maximum backoff delay for failed tasks |
300.0
|
jitter_factor
|
float
|
Jitter factor for backoff randomization (0.0-1.0) |
0.25
|
Source code in ccproxy/scheduler/tasks.py
run
abstractmethod
async
¶
Execute the scheduled task.
Returns:
Type | Description |
---|---|
bool
|
True if execution was successful, False otherwise |
setup
async
¶
Perform any setup required before task execution starts.
Called once when the task is first started. Override if needed. Default implementation does nothing.
Source code in ccproxy/scheduler/tasks.py
cleanup
async
¶
Perform any cleanup required after task execution stops.
Called once when the task is stopped. Override if needed. Default implementation does nothing.
Source code in ccproxy/scheduler/tasks.py
calculate_next_delay
¶
Calculate the delay before the next task execution.
Returns exponential backoff delay for failed tasks, or normal interval for successful tasks, with optional jitter.
Returns:
Type | Description |
---|---|
float
|
Delay in seconds before next execution |
Source code in ccproxy/scheduler/tasks.py
start
async
¶
Start the scheduled task execution loop.
Source code in ccproxy/scheduler/tasks.py
stop
async
¶
Stop the scheduled task execution loop.
Source code in ccproxy/scheduler/tasks.py
get_status
¶
Get current task status information.
Returns:
Type | Description |
---|---|
dict[str, Any]
|
Dictionary with task status details |
Source code in ccproxy/scheduler/tasks.py
PricingCacheUpdateTask
¶
PricingCacheUpdateTask(
name,
interval_seconds,
enabled=True,
force_refresh_on_startup=False,
pricing_updater=None,
)
Bases: BaseScheduledTask
Task for updating pricing cache periodically.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Task name |
required |
interval_seconds
|
float
|
Interval between pricing updates |
required |
enabled
|
bool
|
Whether task is enabled |
True
|
force_refresh_on_startup
|
bool
|
Whether to force refresh on first run |
False
|
pricing_updater
|
Any | None
|
Injected pricing updater instance |
None
|
Source code in ccproxy/scheduler/tasks.py
setup
async
¶
Initialize pricing updater instance if not injected.
Source code in ccproxy/scheduler/tasks.py
run
async
¶
Execute pricing cache update.
Source code in ccproxy/scheduler/tasks.py
PushgatewayTask
¶
Bases: BaseScheduledTask
Task for pushing metrics to Pushgateway periodically.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Task name |
required |
interval_seconds
|
float
|
Interval between pushgateway operations |
required |
enabled
|
bool
|
Whether task is enabled |
True
|
max_backoff_seconds
|
float
|
Maximum backoff delay for failures |
300.0
|
Source code in ccproxy/scheduler/tasks.py
setup
async
¶
Initialize metrics instance for pushgateway operations.
Source code in ccproxy/scheduler/tasks.py
run
async
¶
Execute pushgateway metrics push.
Source code in ccproxy/scheduler/tasks.py
StatsPrintingTask
¶
Bases: BaseScheduledTask
Task for printing stats summary periodically.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Task name |
required |
interval_seconds
|
float
|
Interval between stats printing |
required |
enabled
|
bool
|
Whether task is enabled |
True
|
Source code in ccproxy/scheduler/tasks.py
setup
async
¶
Initialize stats collector and metrics instances.
Source code in ccproxy/scheduler/tasks.py
run
async
¶
Execute stats printing.