Skip to content

ccproxy.cli.commands.permission_handler

ccproxy.cli.commands.permission_handler

CLI command for handling confirmation requests via SSE stream.

SSEConfirmationHandler

SSEConfirmationHandler(
    api_url,
    terminal_handler,
    ui=True,
    auth_token=None,
    auto_reconnect=True,
)

Handles confirmation requests received via SSE stream.

Source code in ccproxy/cli/commands/permission_handler.py
def __init__(
    self,
    api_url: str,
    terminal_handler: ConfirmationHandlerProtocol,
    ui: bool = True,
    auth_token: str | None = None,
    auto_reconnect: bool = True,
):
    self.api_url = api_url.rstrip("/")
    self.terminal_handler = terminal_handler
    self.client: httpx.AsyncClient | None = None
    self.max_retries = 5
    self.base_delay = 1.0
    self.max_delay = 60.0
    self.ui = ui
    self.auth_token = auth_token
    self.auto_reconnect = auto_reconnect

    self._ongoing_requests: dict[str, asyncio.Task[bool]] = {}
    self._resolved_requests: dict[str, tuple[bool, str]] = {}
    self._resolved_by_us: set[str] = set()

handle_event async

handle_event(event_type, data)

Handle an SSE event by dispatching to specific handlers.

Parameters:

Name Type Description Default
event_type str

Type of the event

required
data dict[str, Any]

Event data

required
Source code in ccproxy/cli/commands/permission_handler.py
async def handle_event(self, event_type: str, data: dict[str, Any]) -> None:
    """Handle an SSE event by dispatching to specific handlers.

    Args:
        event_type: Type of the event
        data: Event data
    """
    if event_type == "ping":
        return

    from ccproxy.models.permissions import EventType

    handler_map = {
        EventType.PERMISSION_REQUEST.value: self._handle_permission_request,
        EventType.PERMISSION_RESOLVED.value: self._handle_permission_resolved,
    }

    handler = handler_map.get(event_type)
    if handler:
        await handler(data)
    else:
        logger.warning("unhandled_sse_event", event_type=event_type)

send_response async

send_response(request_id, allowed)

Send a confirmation response to the API.

Parameters:

Name Type Description Default
request_id str

ID of the confirmation request

required
allowed bool

Whether to allow or deny

required
Source code in ccproxy/cli/commands/permission_handler.py
async def send_response(self, request_id: str, allowed: bool) -> None:
    """Send a confirmation response to the API.

    Args:
        request_id: ID of the confirmation request
        allowed: Whether to allow or deny
    """
    if not self.client:
        logger.error("send_response_no_client", request_id=request_id)
        return

    try:
        response = await self.client.post(
            f"{self.api_url}/permissions/{request_id}/respond",
            json={"allowed": allowed},
        )

        if response.status_code == 200:
            logger.info(
                "permission_response_sent",
                request_id=request_id,
                allowed=allowed,
            )
        elif response.status_code == 409:
            # Already resolved by another handler
            logger.info(
                "permission_already_resolved",
                request_id=request_id,
                status_code=response.status_code,
            )
        else:
            logger.error(
                "permission_response_failed",
                request_id=request_id,
                status_code=response.status_code,
                response=response.text,
            )

    except Exception as e:
        logger.error(
            "permission_response_error",
            request_id=request_id,
            error=str(e),
            exc_info=True,
        )

parse_sse_stream async

parse_sse_stream(response)

Parse SSE events from the response stream.

Parameters:

Name Type Description Default
response Response

The httpx response with streaming content

required

Yields:

Type Description
AsyncIterator[tuple[str, dict[str, Any]]]

Tuples of (event_type, data)

Source code in ccproxy/cli/commands/permission_handler.py
async def parse_sse_stream(
    self, response: httpx.Response
) -> AsyncIterator[tuple[str, dict[str, Any]]]:
    """Parse SSE events from the response stream.

    Args:
        response: The httpx response with streaming content

    Yields:
        Tuples of (event_type, data)
    """
    buffer = ""
    async for chunk in response.aiter_text():
        buffer += chunk

        buffer = buffer.replace("\r\n", "\n")

        while "\n\n" in buffer:
            event_text, buffer = buffer.split("\n\n", 1)

            if not event_text.strip():
                continue

            event_type = "message"
            data_lines = []

            for line in event_text.split("\n"):
                line = line.strip()
                if line.startswith("event:"):
                    event_type = line[6:].strip()
                elif line.startswith("data:"):
                    data_lines.append(line[5:].strip())

            if data_lines:
                try:
                    data_json = " ".join(data_lines)
                    data = json.loads(data_json)
                    yield event_type, data
                except json.JSONDecodeError as e:
                    logger.error(
                        "sse_parse_error",
                        event_type=event_type,
                        data=" ".join(data_lines),
                        error=str(e),
                    )

run async

run()

Run the SSE client with reconnection logic.

Source code in ccproxy/cli/commands/permission_handler.py
async def run(self) -> None:
    """Run the SSE client with reconnection logic."""
    if not self.client:
        logger.error("run_no_client")
        return

    stream_url = f"{self.api_url}/permissions/stream"
    retry_count = 0

    logger.info(
        "connecting_to_permission_stream",
        url=stream_url,
    )
    print(f"Connecting to confirmation stream at {stream_url}...")

    while retry_count <= self.max_retries:
        try:
            await self._connect_and_handle_stream(stream_url)
            # If we get here, connection ended gracefully
            if self.auto_reconnect:
                # Reset retry count and reconnect
                retry_count = 0
                print("Connection closed. Reconnecting...")
                await asyncio.sleep(1.0)  # Brief pause before reconnecting
                continue
            else:
                print("Connection closed. Exiting (auto-reconnect disabled).")
                break

        except KeyboardInterrupt:
            logger.info("permission_handler_shutdown_requested")
            break

        except (
            httpx.ConnectError,
            httpx.TimeoutException,
            httpx.ReadTimeout,
        ) as e:
            retry_count += 1
            if retry_count > self.max_retries:
                logger.error(
                    "connection_failed_max_retries",
                    max_retries=self.max_retries,
                )
                raise typer.Exit(1) from None

            # Exponential backoff with jitter
            delay = min(self.base_delay * (2 ** (retry_count - 1)), self.max_delay)

            logger.warning(
                "connection_failed_retrying",
                attempt=retry_count,
                max_retries=self.max_retries,
                retry_delay=delay,
                error=str(e),
            )

            print(
                f"Connection failed (attempt {retry_count}/{self.max_retries}). Retrying in {delay}s..."
            )

            await asyncio.sleep(delay)
            continue

        except Exception as e:
            logger.error("sse_client_error", error=str(e), exc_info=True)
            raise typer.Exit(1) from e

connect

connect(
    api_url=Option(
        None,
        "--api-url",
        "-u",
        help="API server URL (defaults to settings)",
    ),
    no_ui=Option(False, "--no-ui", help="Disable UI mode"),
    verbose=Option(
        0,
        "-v",
        "--verbose",
        count=True,
        help="Increase verbosity (-v for INFO, -vv for DEBUG)",
    ),
    auth_token=Option(
        None,
        "--auth-token",
        "-t",
        help="Bearer token for API authentication (overrides config)",
        envvar="CCPROXY_AUTH_TOKEN",
    ),
    no_reconnect=Option(
        False,
        "--no-reconnect",
        help="Disable automatic reconnection when connection is lost",
    ),
)

Connect to the API server and handle confirmation requests.

This command connects to the CCProxy API server via Server-Sent Events and handles permission confirmation requests in the terminal.

Source code in ccproxy/cli/commands/permission_handler.py
@app.command()
def connect(
    api_url: str | None = typer.Option(
        None,
        "--api-url",
        "-u",
        help="API server URL (defaults to settings)",
    ),
    no_ui: bool = typer.Option(False, "--no-ui", help="Disable UI mode"),
    verbose: int = typer.Option(
        0,
        "-v",
        "--verbose",
        count=True,
        help="Increase verbosity (-v for INFO, -vv for DEBUG)",
    ),
    auth_token: str | None = typer.Option(
        None,
        "--auth-token",
        "-t",
        help="Bearer token for API authentication (overrides config)",
        envvar="CCPROXY_AUTH_TOKEN",
    ),
    no_reconnect: bool = typer.Option(
        False,
        "--no-reconnect",
        help="Disable automatic reconnection when connection is lost",
    ),
) -> None:
    """Connect to the API server and handle confirmation requests.

    This command connects to the CCProxy API server via Server-Sent Events
    and handles permission confirmation requests in the terminal.

    """
    # Configure logging level based on verbosity
    # Handle case where verbose might be OptionInfo (in tests) or int (runtime)
    verbose_count = verbose if isinstance(verbose, int) else 0

    if verbose_count >= 2:
        log_level = logging.DEBUG
    elif verbose_count >= 1:
        log_level = logging.INFO
    else:
        log_level = logging.WARNING

    # Configure root logger level
    logging.basicConfig(level=log_level)

    # Configure structlog to respect the same log level
    structlog.configure(
        wrapper_class=structlog.make_filtering_bound_logger(log_level),
    )

    settings = get_settings()

    # Use provided URL or default from settings
    if not api_url:
        api_url = f"http://{settings.server.host}:{settings.server.port}"

    # Determine auth token: CLI arg > config setting > None
    token = auth_token or settings.security.auth_token

    # Create handlers based on UI mode selection
    terminal_handler: ConfirmationHandlerProtocol = TextualPermissionHandler()

    async def run_handler() -> None:
        """Run the handler with proper resource management."""
        async with SSEConfirmationHandler(
            api_url,
            terminal_handler,
            not no_ui,
            auth_token=token,
            auto_reconnect=not no_reconnect,
        ) as sse_handler:
            await sse_handler.run()

    # Run the async handler
    try:
        asyncio.run(run_handler())
    except KeyboardInterrupt:
        logger.info("permission_handler_stopped")
    except Exception as e:
        logger.error("permission_handler_error", error=str(e), exc_info=True)
        raise typer.Exit(1) from e