Skip to content

ccproxy.plugins.claude_sdk.session_pool

ccproxy.plugins.claude_sdk.session_pool

Session-aware connection pool for persistent Claude SDK connections.

SessionPool

SessionPool(config=None)

Manages persistent Claude SDK connections by session.

Source code in ccproxy/plugins/claude_sdk/session_pool.py
def __init__(self, config: SessionPoolSettings | None = None):
    self.config = config or SessionPoolSettings()
    self.sessions: dict[str, SessionClient] = {}
    self.cleanup_task: asyncio.Task[None] | None = None
    self._shutdown = False
    self._lock = asyncio.Lock()

start async

start()

Start the session pool and cleanup task.

Source code in ccproxy/plugins/claude_sdk/session_pool.py
async def start(self) -> None:
    """Start the session pool and cleanup task."""
    if not self.config.enabled:
        return

    logger.debug(
        "session_pool_starting",
        max_sessions=self.config.max_sessions,
        ttl=self.config.session_ttl,
        cleanup_interval=self.config.cleanup_interval,
    )

    self.cleanup_task = await create_managed_task(
        self._cleanup_loop(),
        name="session_pool_cleanup",
        creator="SessionPool",
    )

stop async

stop()

Stop the session pool and cleanup all sessions.

Source code in ccproxy/plugins/claude_sdk/session_pool.py
async def stop(self) -> None:
    """Stop the session pool and cleanup all sessions."""
    self._shutdown = True

    if self.cleanup_task:
        self.cleanup_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self.cleanup_task

    # Disconnect all active sessions
    async with self._lock:
        disconnect_tasks = [
            session_client.disconnect() for session_client in self.sessions.values()
        ]

        if disconnect_tasks:
            await asyncio.gather(*disconnect_tasks, return_exceptions=True)

        self.sessions.clear()

    logger.debug("session_pool_stopped")

get_session_client async

get_session_client(session_id, options)

Get or create a session context for the given session_id.

Source code in ccproxy/plugins/claude_sdk/session_pool.py
async def get_session_client(
    self, session_id: str, options: ClaudeAgentOptions
) -> SessionClient:
    """Get or create a session context for the given session_id."""
    logger.debug(
        "session_pool_get_client_start",
        session_id=session_id,
        pool_enabled=self.config.enabled,
        current_sessions=len(self.sessions),
        max_sessions=self.config.max_sessions,
        session_exists=session_id in self.sessions,
    )

    # Validate pool is enabled
    self._validate_pool_enabled(session_id)

    # Get or create session with proper locking
    async with self._lock:
        session_client = await self._get_or_create_session(session_id, options)

        # Ensure connected before returning
        await self._ensure_session_connected(session_client, session_id)

    logger.debug(
        "session_pool_get_client_complete",
        session_id=session_id,
        client_id=session_client.client_id,
        session_status=session_client.status,
        session_age_seconds=session_client.metrics.age_seconds,
        session_message_count=session_client.metrics.message_count,
    )
    return session_client

interrupt_session async

interrupt_session(session_id)

Interrupt a specific session due to client disconnection.

Parameters:

Name Type Description Default
session_id str

The session ID to interrupt

required

Returns:

Type Description
bool

True if session was found and interrupted, False otherwise

Source code in ccproxy/plugins/claude_sdk/session_pool.py
async def interrupt_session(self, session_id: str) -> bool:
    """Interrupt a specific session due to client disconnection.

    Args:
        session_id: The session ID to interrupt

    Returns:
        True if session was found and interrupted, False otherwise
    """
    async with self._lock:
        if session_id not in self.sessions:
            logger.warning("session_not_found", session_id=session_id)
            return False

        session_client = self.sessions[session_id]

    try:
        # Interrupt the session with 30-second timeout (allows for longer SDK response times)
        await asyncio.wait_for(session_client.interrupt(), timeout=30.0)
        logger.debug("session_interrupted", session_id=session_id)

        # Remove the session to prevent reuse
        await self._remove_session(session_id)
        return True

    except (TimeoutError, Exception) as e:
        logger.error(
            "session_interrupt_failed",
            session_id=session_id,
            error=str(e)
            if not isinstance(e, TimeoutError)
            else "Timeout after 30s",
        )
        # Always remove the session on failure
        with contextlib.suppress(Exception):
            await self._remove_session(session_id)
        return False

interrupt_all_sessions async

interrupt_all_sessions()

Interrupt all active sessions (stops ongoing operations).

Returns:

Type Description
int

Number of sessions that were interrupted

Source code in ccproxy/plugins/claude_sdk/session_pool.py
async def interrupt_all_sessions(self) -> int:
    """Interrupt all active sessions (stops ongoing operations).

    Returns:
        Number of sessions that were interrupted
    """
    # Get snapshot of all sessions
    async with self._lock:
        session_items = list(self.sessions.items())

    interrupted_count = 0

    logger.debug(
        "session_interrupt_all_requested",
        total_sessions=len(session_items),
    )

    for session_id, session_client in session_items:
        try:
            await session_client.interrupt()
            interrupted_count += 1
        except asyncio.CancelledError as e:
            logger.warning(
                "session_interrupt_cancelled_during_all",
                session_id=session_id,
                error=str(e),
                exc_info=e,
            )
        except TimeoutError as e:
            logger.error(
                "session_interrupt_timeout_during_all",
                session_id=session_id,
                error=str(e),
                exc_info=e,
            )
        except Exception as e:
            logger.error(
                "session_interrupt_failed_during_all",
                session_id=session_id,
                error=str(e),
                exc_info=e,
            )

    logger.debug(
        "session_interrupt_all_completed",
        interrupted_count=interrupted_count,
        total_requested=len(session_items),
    )

    return interrupted_count

has_session async

has_session(session_id)

Check if a session exists in the pool.

Parameters:

Name Type Description Default
session_id str

The session ID to check

required

Returns:

Type Description
bool

True if session exists, False otherwise

Source code in ccproxy/plugins/claude_sdk/session_pool.py
async def has_session(self, session_id: str) -> bool:
    """Check if a session exists in the pool.

    Args:
        session_id: The session ID to check

    Returns:
        True if session exists, False otherwise
    """
    async with self._lock:
        return session_id in self.sessions

get_stats async

get_stats()

Get session pool statistics.

Source code in ccproxy/plugins/claude_sdk/session_pool.py
async def get_stats(self) -> dict[str, Any]:
    """Get session pool statistics."""
    async with self._lock:
        sessions_list = list(self.sessions.values())
        total_sessions = len(self.sessions)

    active_sessions = sum(
        1 for s in sessions_list if s.status == SessionStatus.ACTIVE
    )

    total_messages = sum(s.metrics.message_count for s in sessions_list)

    return {
        "enabled": self.config.enabled,
        "total_sessions": total_sessions,
        "active_sessions": active_sessions,
        "max_sessions": self.config.max_sessions,
        "total_messages": total_messages,
        "session_ttl": self.config.session_ttl,
        "cleanup_interval": self.config.cleanup_interval,
    }