async def get_session_client(
self, session_id: str, options: ClaudeCodeOptions
) -> SessionClient:
"""Get or create a session context for the given session_id."""
logger.debug(
"session_pool_get_client_start",
session_id=session_id,
pool_enabled=self.config.enabled,
current_sessions=len(self.sessions),
max_sessions=self.config.max_sessions,
session_exists=session_id in self.sessions,
)
if not self.config.enabled:
logger.error("session_pool_disabled", session_id=session_id)
raise ClaudeProxyError(
message="Session pool is disabled",
error_type="configuration_error",
status_code=500,
)
# Check session limit and get/create session
async with self._lock:
if (
session_id not in self.sessions
and len(self.sessions) >= self.config.max_sessions
):
logger.error(
"session_pool_at_capacity",
session_id=session_id,
current_sessions=len(self.sessions),
max_sessions=self.config.max_sessions,
)
raise ServiceUnavailableError(
f"Session pool at capacity: {self.config.max_sessions}"
)
options.continue_conversation = True
# Get existing session or create new one
if session_id in self.sessions:
session_client = self.sessions[session_id]
logger.debug(
"session_pool_existing_session_found",
session_id=session_id,
client_id=session_client.client_id,
session_status=session_client.status.value,
)
# Check if session is currently being interrupted
if session_client.status.value == "interrupting":
logger.warning(
"session_pool_interrupting_session",
session_id=session_id,
client_id=session_client.client_id,
message="Session is currently being interrupted, waiting for completion then creating new session",
)
# Wait for the interrupt process to complete properly
interrupt_completed = (
await session_client.wait_for_interrupt_complete(timeout=5.0)
)
if interrupt_completed:
logger.debug(
"session_pool_interrupt_completed",
session_id=session_id,
client_id=session_client.client_id,
message="Interrupt completed successfully, proceeding with session replacement",
)
else:
logger.warning(
"session_pool_interrupt_timeout",
session_id=session_id,
client_id=session_client.client_id,
message="Interrupt did not complete within 5 seconds, proceeding anyway",
)
# Don't try to reuse a session that was being interrupted
await self._remove_session_unlocked(session_id)
session_client = await self._create_session_unlocked(
session_id, options
)
# Check if session has an active stream that needs cleanup
elif (
session_client.has_active_stream
or session_client.active_stream_handle
):
logger.debug(
"session_pool_active_stream_detected",
session_id=session_id,
client_id=session_client.client_id,
has_stream=session_client.has_active_stream,
has_handle=bool(session_client.active_stream_handle),
idle_seconds=session_client.metrics.idle_seconds,
message="Session has active stream/handle, checking if cleanup needed",
)
# Check timeout types based on proper message lifecycle timing
# - No SystemMessage received within configured timeout (first chunk timeout) -> terminate session
# - SystemMessage received but no activity for configured timeout (ongoing timeout) -> interrupt stream
# - Never check for completed streams (ResultMessage received)
handle = session_client.active_stream_handle
if handle is not None:
is_first_chunk_timeout = handle.is_first_chunk_timeout()
is_ongoing_timeout = handle.is_ongoing_timeout()
else:
# Handle was cleared by another thread, no timeout checks needed
is_first_chunk_timeout = False
is_ongoing_timeout = False
if session_client.active_stream_handle and (
is_first_chunk_timeout or is_ongoing_timeout
):
old_handle_id = session_client.active_stream_handle.handle_id
if is_first_chunk_timeout:
# First chunk timeout indicates connection issue - terminate session client
logger.warning(
"session_pool_first_chunk_timeout",
session_id=session_id,
old_handle_id=old_handle_id,
idle_seconds=session_client.active_stream_handle.idle_seconds,
message=f"No first chunk received within {self.config.stream_first_chunk_timeout} seconds, terminating session client",
)
# Remove the entire session - connection is likely broken
await self._remove_session_unlocked(session_id)
session_client = await self._create_session_unlocked(
session_id, options
)
elif is_ongoing_timeout:
# Ongoing timeout - interrupt the stream but keep session
logger.info(
"session_pool_interrupting_ongoing_timeout",
session_id=session_id,
old_handle_id=old_handle_id,
idle_seconds=session_client.active_stream_handle.idle_seconds,
has_first_chunk=session_client.active_stream_handle.has_first_chunk,
is_completed=session_client.active_stream_handle.is_completed,
message=f"Stream idle for {self.config.stream_ongoing_timeout}+ seconds, interrupting stream but keeping session",
)
try:
# Interrupt the old stream handle to stop its worker
interrupted = await session_client.active_stream_handle.interrupt()
if interrupted:
logger.info(
"session_pool_interrupted_ongoing_timeout",
session_id=session_id,
old_handle_id=old_handle_id,
message="Successfully interrupted ongoing timeout stream",
)
else:
logger.debug(
"session_pool_interrupt_ongoing_not_needed",
session_id=session_id,
old_handle_id=old_handle_id,
message="Ongoing timeout stream was already completed",
)
except Exception as e:
logger.warning(
"session_pool_interrupt_ongoing_failed",
session_id=session_id,
old_handle_id=old_handle_id,
error=str(e),
error_type=type(e).__name__,
message="Failed to interrupt ongoing timeout stream, clearing anyway",
)
finally:
# Always clear the handle after interrupt attempt
session_client.active_stream_handle = None
session_client.has_active_stream = False
elif session_client.active_stream_handle and not (
is_first_chunk_timeout or is_ongoing_timeout
):
# Stream is recent, likely from a previous request that just finished
# Just clear the handle without interrupting to allow immediate reuse
logger.debug(
"session_pool_clearing_recent_stream",
session_id=session_id,
old_handle_id=session_client.active_stream_handle.handle_id,
idle_seconds=session_client.active_stream_handle.idle_seconds,
has_first_chunk=session_client.active_stream_handle.has_first_chunk,
is_completed=session_client.active_stream_handle.is_completed,
message="Clearing recent stream handle for immediate reuse",
)
session_client.active_stream_handle = None
session_client.has_active_stream = False
else:
# No handle but has_active_stream flag is set, just clear the flag
session_client.has_active_stream = False
logger.debug(
"session_pool_stream_cleared",
session_id=session_id,
client_id=session_client.client_id,
was_interrupted=(is_first_chunk_timeout or is_ongoing_timeout),
was_recent=not (is_first_chunk_timeout or is_ongoing_timeout),
was_first_chunk_timeout=is_first_chunk_timeout,
was_ongoing_timeout=is_ongoing_timeout,
message="Stream state cleared, session ready for reuse",
)
# Check if session is still valid
elif session_client.is_expired():
logger.debug("session_expired", session_id=session_id)
await self._remove_session_unlocked(session_id)
session_client = await self._create_session_unlocked(
session_id, options
)
elif (
not await session_client.is_healthy()
and self.config.connection_recovery
):
logger.debug("session_unhealthy_recovering", session_id=session_id)
await session_client.connect()
# Mark session as reused since we're recovering an existing session
session_client.mark_as_reused()
else:
logger.debug(
"session_pool_reusing_healthy_session",
session_id=session_id,
client_id=session_client.client_id,
)
# Mark session as reused
session_client.mark_as_reused()
else:
logger.debug("session_pool_creating_new_session", session_id=session_id)
session_client = await self._create_session_unlocked(
session_id, options
)
# Ensure session is connected before returning (inside lock to prevent race conditions)
if not await session_client.ensure_connected():
logger.error(
"session_pool_connection_failed",
session_id=session_id,
)
raise ServiceUnavailableError(
f"Failed to establish session connection: {session_id}"
)
logger.debug(
"session_pool_get_client_complete",
session_id=session_id,
client_id=session_client.client_id,
session_status=session_client.status,
session_age_seconds=session_client.metrics.age_seconds,
session_message_count=session_client.metrics.message_count,
)
return session_client