Skip to content

ccproxy.claude_sdk.stream_handle

ccproxy.claude_sdk.stream_handle

Stream handle for managing worker lifecycle and providing listeners.

StreamHandle

StreamHandle(
    message_iterator,
    session_id=None,
    request_id=None,
    session_client=None,
    session_config=None,
)

Handle for a streaming response that manages worker and listeners.

Parameters:

Name Type Description Default
message_iterator AsyncIterator[Any]

The SDK message iterator

required
session_id str | None

Optional session ID

None
request_id str | None

Optional request ID

None
session_client SessionClient | None

Optional session client

None
session_config SessionPoolSettings | None

Optional session pool configuration

None
Source code in ccproxy/claude_sdk/stream_handle.py
def __init__(
    self,
    message_iterator: AsyncIterator[Any],
    session_id: str | None = None,
    request_id: str | None = None,
    session_client: SessionClient | None = None,
    session_config: SessionPoolSettings | None = None,
):
    """Initialize the stream handle.

    Args:
        message_iterator: The SDK message iterator
        session_id: Optional session ID
        request_id: Optional request ID
        session_client: Optional session client
        session_config: Optional session pool configuration
    """
    self.handle_id = str(uuid.uuid4())
    self._message_iterator = message_iterator
    self.session_id = session_id
    self.request_id = request_id
    self._session_client = session_client

    # Timeout configuration
    self._session_config = session_config
    self._first_chunk_timeout = (
        session_config.stream_first_chunk_timeout if session_config else 3.0
    )
    self._ongoing_timeout = (
        session_config.stream_ongoing_timeout if session_config else 60.0
    )
    self._interrupt_timeout = (
        session_config.stream_interrupt_timeout if session_config else 10.0
    )

    # Worker management
    self._worker: StreamWorker | None = None
    self._worker_lock = asyncio.Lock()
    self._listeners: dict[str, QueueListener] = {}
    self._created_at = time.time()
    self._first_listener_at: float | None = None

    # Message lifecycle tracking for stale detection
    self._first_chunk_received_at: float | None = None
    self._completed_at: float | None = None
    self._has_result_message = False
    self._last_activity_at = time.time()

has_active_listeners property

has_active_listeners

Check if there are any active listeners.

worker_status property

worker_status

Get the worker status if worker exists.

is_completed property

is_completed

Check if stream has completed (received ResultMessage).

has_first_chunk property

has_first_chunk

Check if stream has received first chunk (SystemMessage init).

idle_seconds property

idle_seconds

Get seconds since last activity.

create_listener async

create_listener()

Create a new listener for this stream.

This method starts the worker on first listener and returns an async iterator for consuming messages.

Yields:

Type Description
AsyncIterator[Any]

Messages from the stream

Source code in ccproxy/claude_sdk/stream_handle.py
async def create_listener(self) -> AsyncIterator[Any]:
    """Create a new listener for this stream.

    This method starts the worker on first listener and returns
    an async iterator for consuming messages.

    Yields:
        Messages from the stream
    """
    # Start worker if needed
    await self._ensure_worker_started()

    if not self._worker:
        raise RuntimeError("Failed to start stream worker")

    # Create listener
    queue = self._worker.get_message_queue()
    listener = await queue.create_listener()
    self._listeners[listener.listener_id] = listener

    if self._first_listener_at is None:
        self._first_listener_at = time.time()

    logger.debug(
        "stream_handle_listener_created",
        handle_id=self.handle_id,
        listener_id=listener.listener_id,
        total_listeners=len(self._listeners),
        worker_status=self._worker.status.value,
    )

    try:
        # Yield messages from listener
        async for message in listener:
            yield message

    except GeneratorExit:
        # Client disconnected
        logger.debug(
            "stream_handle_listener_disconnected",
            handle_id=self.handle_id,
            listener_id=listener.listener_id,
        )

        # Check if this will be the last listener after removal
        remaining_listeners = len(self._listeners) - 1
        if remaining_listeners == 0 and self._session_client:
            logger.debug(
                "stream_handle_last_listener_disconnected",
                handle_id=self.handle_id,
                listener_id=listener.listener_id,
                message="Last listener disconnected, will trigger SDK interrupt in cleanup",
            )

        raise

    finally:
        # Remove listener
        await self._remove_listener(listener.listener_id)

        # Check if we should trigger cleanup
        await self._check_cleanup()

interrupt async

interrupt()

Interrupt the stream.

Returns:

Type Description
bool

True if interrupted successfully

Source code in ccproxy/claude_sdk/stream_handle.py
async def interrupt(self) -> bool:
    """Interrupt the stream.

    Returns:
        True if interrupted successfully
    """
    if not self._worker:
        logger.warning(
            "stream_handle_interrupt_no_worker",
            handle_id=self.handle_id,
        )
        return False

    logger.debug(
        "stream_handle_interrupting",
        handle_id=self.handle_id,
        worker_status=self._worker.status.value,
        active_listeners=len(self._listeners),
    )

    try:
        # Stop the worker
        await self._worker.stop(timeout=self._interrupt_timeout)

        # Close all listeners
        for listener in self._listeners.values():
            listener.close()
        self._listeners.clear()

        logger.info(
            "stream_handle_interrupted",
            handle_id=self.handle_id,
        )
        return True

    except Exception as e:
        logger.error(
            "stream_handle_interrupt_error",
            handle_id=self.handle_id,
            error=str(e),
        )
        return False

wait_for_completion async

wait_for_completion(timeout=None)

Wait for the stream to complete.

Parameters:

Name Type Description Default
timeout float | None

Optional timeout in seconds

None

Returns:

Type Description
bool

True if completed, False if timed out

Source code in ccproxy/claude_sdk/stream_handle.py
async def wait_for_completion(self, timeout: float | None = None) -> bool:
    """Wait for the stream to complete.

    Args:
        timeout: Optional timeout in seconds

    Returns:
        True if completed, False if timed out
    """
    if not self._worker:
        return True

    return await self._worker.wait_for_completion(timeout)

get_stats

get_stats()

Get stream handle statistics.

Returns:

Type Description
dict[str, Any]

Dictionary of statistics

Source code in ccproxy/claude_sdk/stream_handle.py
def get_stats(self) -> dict[str, Any]:
    """Get stream handle statistics.

    Returns:
        Dictionary of statistics
    """
    stats = {
        "handle_id": self.handle_id,
        "session_id": self.session_id,
        "request_id": self.request_id,
        "active_listeners": len(self._listeners),
        "lifetime_seconds": time.time() - self._created_at,
        "time_to_first_listener": (
            self._first_listener_at - self._created_at
            if self._first_listener_at
            else None
        ),
    }

    if self._worker:
        worker_stats = self._worker.get_stats()
        stats["worker_stats"] = worker_stats  # type: ignore[assignment]
    else:
        stats["worker_stats"] = None

    return stats

on_first_chunk_received

on_first_chunk_received()

Called when SystemMessage(init) is received - first chunk.

Source code in ccproxy/claude_sdk/stream_handle.py
def on_first_chunk_received(self) -> None:
    """Called when SystemMessage(init) is received - first chunk."""
    if self._first_chunk_received_at is None:
        self._first_chunk_received_at = time.time()
        self._last_activity_at = self._first_chunk_received_at
        logger.debug(
            "stream_handle_first_chunk_received",
            handle_id=self.handle_id,
            session_id=self.session_id,
        )

on_message_received

on_message_received(message)

Called when any message is received to update activity.

Source code in ccproxy/claude_sdk/stream_handle.py
def on_message_received(self, message: Any) -> None:
    """Called when any message is received to update activity."""
    self._last_activity_at = time.time()

on_completion

on_completion()

Called when ResultMessage is received - stream completed.

Source code in ccproxy/claude_sdk/stream_handle.py
def on_completion(self) -> None:
    """Called when ResultMessage is received - stream completed."""
    if not self._has_result_message:
        self._has_result_message = True
        self._completed_at = time.time()
        self._last_activity_at = self._completed_at
        logger.debug(
            "stream_handle_completed",
            handle_id=self.handle_id,
            session_id=self.session_id,
        )

is_stale

is_stale()

Check if stream is stale based on configurable timeout logic.

Returns:

Type Description
bool

True if stream should be considered stale

Source code in ccproxy/claude_sdk/stream_handle.py
def is_stale(self) -> bool:
    """Check if stream is stale based on configurable timeout logic.

    Returns:
        True if stream should be considered stale
    """
    if self.is_completed:
        # Completed streams are never stale
        return False

    if not self.has_first_chunk:
        # No first chunk received - configurable timeout
        return self.idle_seconds > self._first_chunk_timeout
    else:
        # First chunk received but not completed - configurable timeout
        return self.idle_seconds > self._ongoing_timeout

is_first_chunk_timeout

is_first_chunk_timeout()

Check if this is specifically a first chunk timeout.

Returns:

Type Description
bool

True if no first chunk received and timeout exceeded

Source code in ccproxy/claude_sdk/stream_handle.py
def is_first_chunk_timeout(self) -> bool:
    """Check if this is specifically a first chunk timeout.

    Returns:
        True if no first chunk received and timeout exceeded
    """
    return (
        not self.has_first_chunk and self.idle_seconds > self._first_chunk_timeout
    )

is_ongoing_timeout

is_ongoing_timeout()

Check if this is an ongoing stream timeout.

Returns:

Type Description
bool

True if first chunk received but ongoing timeout exceeded

Source code in ccproxy/claude_sdk/stream_handle.py
def is_ongoing_timeout(self) -> bool:
    """Check if this is an ongoing stream timeout.

    Returns:
        True if first chunk received but ongoing timeout exceeded
    """
    return (
        self.has_first_chunk
        and not self.is_completed
        and self.idle_seconds > self._ongoing_timeout
    )