Skip to content

ccproxy.scheduler.registry

ccproxy.scheduler.registry

Task registry for dynamic task registration and discovery.

TaskRegistry

TaskRegistry()

Registry for managing scheduled task registration and discovery.

Provides a centralized way to register and retrieve scheduled tasks, enabling dynamic task management and configuration.

Source code in ccproxy/scheduler/registry.py
def __init__(self) -> None:
    """Initialize the task registry."""
    self._tasks: dict[str, type[BaseScheduledTask]] = {}

register

register(name, task_class)

Register a scheduled task class.

Parameters:

Name Type Description Default
name str

Unique name for the task

required
task_class type[BaseScheduledTask]

Task class that inherits from BaseScheduledTask

required

Raises:

Type Description
TaskRegistrationError

If task name is already registered or invalid

Source code in ccproxy/scheduler/registry.py
def register(self, name: str, task_class: type[BaseScheduledTask]) -> None:
    """
    Register a scheduled task class.

    Args:
        name: Unique name for the task
        task_class: Task class that inherits from BaseScheduledTask

    Raises:
        TaskRegistrationError: If task name is already registered or invalid
    """
    if name in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is already registered")

    if not issubclass(task_class, BaseScheduledTask):
        raise TaskRegistrationError(
            f"Task class for '{name}' must inherit from BaseScheduledTask"
        )

    self._tasks[name] = task_class
    logger.debug("task_registered", task_name=name, task_class=task_class.__name__)

unregister

unregister(name)

Unregister a scheduled task.

Parameters:

Name Type Description Default
name str

Name of the task to unregister

required

Raises:

Type Description
TaskRegistrationError

If task is not registered

Source code in ccproxy/scheduler/registry.py
def unregister(self, name: str) -> None:
    """
    Unregister a scheduled task.

    Args:
        name: Name of the task to unregister

    Raises:
        TaskRegistrationError: If task is not registered
    """
    if name not in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is not registered")

    del self._tasks[name]
    logger.debug("task_unregistered", task_name=name)

get

get(name)

Get a registered task class by name.

Parameters:

Name Type Description Default
name str

Name of the task to retrieve

required

Returns:

Type Description
type[BaseScheduledTask]

Task class

Raises:

Type Description
TaskRegistrationError

If task is not registered

Source code in ccproxy/scheduler/registry.py
def get(self, name: str) -> type[BaseScheduledTask]:
    """
    Get a registered task class by name.

    Args:
        name: Name of the task to retrieve

    Returns:
        Task class

    Raises:
        TaskRegistrationError: If task is not registered
    """
    if name not in self._tasks:
        raise TaskRegistrationError(f"Task '{name}' is not registered")

    return self._tasks[name]

list_tasks

list_tasks()

Get list of all registered task names.

Returns:

Type Description
list[str]

List of registered task names

Source code in ccproxy/scheduler/registry.py
def list_tasks(self) -> list[str]:
    """
    Get list of all registered task names.

    Returns:
        List of registered task names
    """
    return list(self._tasks.keys())

is_registered

is_registered(name)

Check if a task is registered.

Parameters:

Name Type Description Default
name str

Task name to check

required

Returns:

Type Description
bool

True if task is registered, False otherwise

Source code in ccproxy/scheduler/registry.py
def is_registered(self, name: str) -> bool:
    """
    Check if a task is registered.

    Args:
        name: Task name to check

    Returns:
        True if task is registered, False otherwise
    """
    return name in self._tasks

clear

clear()

Clear all registered tasks.

Source code in ccproxy/scheduler/registry.py
def clear(self) -> None:
    """Clear all registered tasks."""
    self._tasks.clear()
    logger.debug("task_registry_cleared")

get_registry_info

get_registry_info()

Get information about the current registry state.

Returns:

Type Description
dict[str, Any]

Dictionary with registry information

Source code in ccproxy/scheduler/registry.py
def get_registry_info(self) -> dict[str, Any]:
    """
    Get information about the current registry state.

    Returns:
        Dictionary with registry information
    """
    return {
        "total_tasks": len(self._tasks),
        "registered_tasks": list(self._tasks.keys()),
        "task_classes": {name: cls.__name__ for name, cls in self._tasks.items()},
    }

get_task_registry

get_task_registry()

Get the global task registry instance.

Returns:

Type Description
TaskRegistry

Global TaskRegistry instance

Source code in ccproxy/scheduler/registry.py
def get_task_registry() -> TaskRegistry:
    """
    Get the global task registry instance.

    Returns:
        Global TaskRegistry instance
    """
    global _global_registry

    if _global_registry is None:
        _global_registry = TaskRegistry()

    return _global_registry

register_task

register_task(name, task_class)

Register a task in the global registry.

Parameters:

Name Type Description Default
name str

Unique name for the task

required
task_class type[BaseScheduledTask]

Task class that inherits from BaseScheduledTask

required
Source code in ccproxy/scheduler/registry.py
def register_task(name: str, task_class: type[BaseScheduledTask]) -> None:
    """
    Register a task in the global registry.

    Args:
        name: Unique name for the task
        task_class: Task class that inherits from BaseScheduledTask
    """
    registry = get_task_registry()
    registry.register(name, task_class)