Skip to content

ccproxy.api.services.permission_service

ccproxy.api.services.permission_service

Permission service for handling permission requests without UI dependencies.

PermissionRequest

Bases: BaseModel

Represents a tool permission request.

is_expired

is_expired()

Check if the request has expired.

Source code in ccproxy/models/permissions.py
def is_expired(self) -> bool:
    """Check if the request has expired."""
    if self.status != PermissionStatus.PENDING:
        return False

    now, expires_at = self._normalize_datetimes(datetime.now(UTC), self.expires_at)
    return now > expires_at

time_remaining

time_remaining()

Get time remaining in seconds.

Source code in ccproxy/models/permissions.py
def time_remaining(self) -> int:
    """Get time remaining in seconds."""
    if self.status != PermissionStatus.PENDING:
        return 0

    now, expires_at = self._normalize_datetimes(datetime.now(UTC), self.expires_at)
    remaining = (expires_at - now).total_seconds()
    return max(0, int(remaining))

resolve

resolve(allowed)

Resolve the request.

Source code in ccproxy/models/permissions.py
def resolve(self, allowed: bool) -> None:
    """Resolve the request."""
    if self.status != PermissionStatus.PENDING:
        raise ValueError(f"Cannot resolve request in {self.status} status")

    self.status = PermissionStatus.ALLOWED if allowed else PermissionStatus.DENIED
    self.resolved_at = datetime.now(UTC)
    # Signal waiting coroutines that resolution is complete
    self._resolved_event.set()

PermissionService

PermissionService(timeout_seconds=30)

Service for managing permission requests without UI dependencies.

Source code in ccproxy/api/services/permission_service.py
def __init__(self, timeout_seconds: int = 30):
    self._timeout_seconds = timeout_seconds
    self._requests: dict[str, PermissionRequest] = {}
    self._expiry_task: asyncio.Task[None] | None = None
    self._shutdown = False
    self._event_queues: list[asyncio.Queue[dict[str, Any]]] = []
    self._lock = asyncio.Lock()

request_permission async

request_permission(tool_name, input)

Create a new permission request.

Parameters:

Name Type Description Default
tool_name str

Name of the tool requesting permission

required
input dict[str, str]

Input parameters for the tool

required

Returns:

Type Description
str

Permission request ID

Raises:

Type Description
ValueError

If tool_name is empty or input is None

Source code in ccproxy/api/services/permission_service.py
async def request_permission(self, tool_name: str, input: dict[str, str]) -> str:
    """Create a new permission request.

    Args:
        tool_name: Name of the tool requesting permission
        input: Input parameters for the tool

    Returns:
        Permission request ID

    Raises:
        ValueError: If tool_name is empty or input is None
    """
    # Input validation
    if not tool_name or not tool_name.strip():
        raise ValueError("Tool name cannot be empty")
    if input is None:
        raise ValueError("Input parameters cannot be None")

    # Sanitize input - ensure all values are strings
    sanitized_input = {k: str(v) for k, v in input.items()}

    now = datetime.now(UTC)
    request = PermissionRequest(
        tool_name=tool_name.strip(),
        input=sanitized_input,
        created_at=now,
        expires_at=now + timedelta(seconds=self._timeout_seconds),
    )

    async with self._lock:
        self._requests[request.id] = request

    logger.info(
        "permission_request_created",
        request_id=request.id,
        tool_name=tool_name,
    )

    event = PermissionEvent(
        type=EventType.PERMISSION_REQUEST,
        request_id=request.id,
        tool_name=request.tool_name,
        input=request.input,
        created_at=request.created_at.isoformat(),
        expires_at=request.expires_at.isoformat(),
        timeout_seconds=self._timeout_seconds,
    )
    await self._emit_event(event.model_dump(mode="json"))

    return request.id

get_status async

get_status(request_id)

Get the status of a permission request.

Parameters:

Name Type Description Default
request_id str

ID of the permission request

required

Returns:

Type Description
PermissionStatus | None

Status of the request or None if not found

Source code in ccproxy/api/services/permission_service.py
async def get_status(self, request_id: str) -> PermissionStatus | None:
    """Get the status of a permission request.

    Args:
        request_id: ID of the permission request

    Returns:
        Status of the request or None if not found
    """
    async with self._lock:
        request = self._requests.get(request_id)
        if not request:
            return None

        if request.is_expired():
            request.status = PermissionStatus.EXPIRED

        return request.status

get_request async

get_request(request_id)

Get a permission request by ID.

Parameters:

Name Type Description Default
request_id str

ID of the permission request

required

Returns:

Type Description
PermissionRequest | None

The request or None if not found

Source code in ccproxy/api/services/permission_service.py
async def get_request(self, request_id: str) -> PermissionRequest | None:
    """Get a permission request by ID.

    Args:
        request_id: ID of the permission request

    Returns:
        The request or None if not found
    """
    async with self._lock:
        return self._requests.get(request_id)

resolve async

resolve(request_id, allowed)

Manually resolve a permission request.

Parameters:

Name Type Description Default
request_id str

ID of the permission request

required
allowed bool

Whether to allow or deny the request

required

Returns:

Type Description
bool

True if resolved successfully, False if not found or already resolved

Raises:

Type Description
ValueError

If request_id is empty

Source code in ccproxy/api/services/permission_service.py
async def resolve(self, request_id: str, allowed: bool) -> bool:
    """Manually resolve a permission request.

    Args:
        request_id: ID of the permission request
        allowed: Whether to allow or deny the request

    Returns:
        True if resolved successfully, False if not found or already resolved

    Raises:
        ValueError: If request_id is empty
    """
    # Input validation
    if not request_id or not request_id.strip():
        raise ValueError("Request ID cannot be empty")

    async with self._lock:
        request = self._requests.get(request_id.strip())
        if not request or request.status != PermissionStatus.PENDING:
            return False

        try:
            request.resolve(allowed)
        except ValueError:
            return False

    logger.info(
        "permission_request_resolved",
        request_id=request_id,
        tool_name=request.tool_name,
        allowed=allowed,
    )

    # Emit resolution event
    event = PermissionEvent(
        type=EventType.PERMISSION_RESOLVED,
        request_id=request_id,
        allowed=allowed,
        resolved_at=request.resolved_at.isoformat()
        if request.resolved_at
        else None,
    )
    await self._emit_event(event.model_dump(mode="json"))

    return True

subscribe_to_events async

subscribe_to_events()

Subscribe to permission events.

Returns:

Type Description
Queue[dict[str, Any]]

An async queue that will receive events

Source code in ccproxy/api/services/permission_service.py
async def subscribe_to_events(self) -> asyncio.Queue[dict[str, Any]]:
    """Subscribe to permission events.

    Returns:
        An async queue that will receive events
    """
    queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue()
    async with self._lock:
        self._event_queues.append(queue)
    return queue

unsubscribe_from_events async

unsubscribe_from_events(queue)

Unsubscribe from permission events.

Parameters:

Name Type Description Default
queue Queue[dict[str, Any]]

The queue to unsubscribe

required
Source code in ccproxy/api/services/permission_service.py
async def unsubscribe_from_events(
    self, queue: asyncio.Queue[dict[str, Any]]
) -> None:
    """Unsubscribe from permission events.

    Args:
        queue: The queue to unsubscribe
    """
    async with self._lock:
        if queue in self._event_queues:
            self._event_queues.remove(queue)

get_pending_requests async

get_pending_requests()

Get all pending permission requests.

Returns:

Type Description
list[PermissionRequest]

List of pending requests

Source code in ccproxy/api/services/permission_service.py
async def get_pending_requests(self) -> list[PermissionRequest]:
    """Get all pending permission requests.

    Returns:
        List of pending requests
    """
    async with self._lock:
        pending = []
        now = datetime.now(UTC)
        for request in self._requests.values():
            if request.is_expired():
                request.status = PermissionStatus.EXPIRED
            elif request.status == PermissionStatus.PENDING:
                pending.append(request)
        return pending

wait_for_permission async

wait_for_permission(request_id, timeout_seconds=None)

Wait for a permission request to be resolved.

This method efficiently blocks until the permission is resolved (allowed/denied/expired) or the timeout is reached using an event-driven approach.

Parameters:

Name Type Description Default
request_id str

ID of the permission request to wait for

required
timeout_seconds int | None

Optional timeout in seconds. If None, uses request expiration time

None

Returns:

Type Description
PermissionStatus

The final status of the permission request

Raises:

Type Description
TimeoutError

If timeout is reached before resolution

PermissionNotFoundError

If request ID is not found

Source code in ccproxy/api/services/permission_service.py
async def wait_for_permission(
    self, request_id: str, timeout_seconds: int | None = None
) -> PermissionStatus:
    """Wait for a permission request to be resolved.

    This method efficiently blocks until the permission is resolved (allowed/denied/expired)
    or the timeout is reached using an event-driven approach.

    Args:
        request_id: ID of the permission request to wait for
        timeout_seconds: Optional timeout in seconds. If None, uses request expiration time

    Returns:
        The final status of the permission request

    Raises:
        asyncio.TimeoutError: If timeout is reached before resolution
        PermissionNotFoundError: If request ID is not found
    """
    async with self._lock:
        request = self._requests.get(request_id)
        if not request:
            raise PermissionNotFoundError(request_id)

        if request.status != PermissionStatus.PENDING:
            return request.status

    if timeout_seconds is None:
        timeout_seconds = request.time_remaining()

    try:
        # Efficiently wait for the event to be set
        await asyncio.wait_for(
            request._resolved_event.wait(), timeout=timeout_seconds
        )
    except TimeoutError as e:
        logger.warning(
            "permission_wait_timeout",
            request_id=request_id,
            timeout_seconds=timeout_seconds,
        )
        # Ensure status is updated to EXPIRED on timeout
        async with self._lock:
            if request.status == PermissionStatus.PENDING:
                request.status = PermissionStatus.EXPIRED
                request._resolved_event.set()  # Signal that it's resolved (as expired)
        raise TimeoutError(
            f"Confirmation wait timeout after {timeout_seconds:.1f}s"
        ) from e

    # The event is set, so the status is resolved
    return await self.get_status(request_id) or PermissionStatus.EXPIRED

get_permission_service

get_permission_service()

Get the global permission service instance.

Source code in ccproxy/api/services/permission_service.py
def get_permission_service() -> PermissionService:
    """Get the global permission service instance."""
    global _permission_service
    if _permission_service is None:
        _permission_service = PermissionService()
    return _permission_service