Skip to content

ccproxy.claude_sdk.stream_worker

ccproxy.claude_sdk.stream_worker

Stream worker for consuming Claude SDK messages and distributing via queue.

WorkerStatus

Bases: str, Enum

Status of the stream worker.

StreamWorker

StreamWorker(
    worker_id,
    message_iterator,
    session_id=None,
    request_id=None,
    session_client=None,
    stream_handle=None,
)

Worker that consumes messages from Claude SDK and distributes via queue.

Parameters:

Name Type Description Default
worker_id str

Unique identifier for this worker

required
message_iterator AsyncIterator[Any]

Async iterator of SDK messages

required
session_id str | None

Optional session ID for logging

None
request_id str | None

Optional request ID for logging

None
session_client SessionClient | None

Optional session client for state management

None
stream_handle StreamHandle | None

Optional stream handle for message lifecycle tracking

None
Source code in ccproxy/claude_sdk/stream_worker.py
def __init__(
    self,
    worker_id: str,
    message_iterator: AsyncIterator[Any],
    session_id: str | None = None,
    request_id: str | None = None,
    session_client: SessionClient | None = None,
    stream_handle: StreamHandle | None = None,
):
    """Initialize the stream worker.

    Args:
        worker_id: Unique identifier for this worker
        message_iterator: Async iterator of SDK messages
        session_id: Optional session ID for logging
        request_id: Optional request ID for logging
        session_client: Optional session client for state management
        stream_handle: Optional stream handle for message lifecycle tracking
    """
    self.worker_id = worker_id
    self._message_iterator = message_iterator
    self.session_id = session_id
    self.request_id = request_id
    self._session_client = session_client
    self._stream_handle = stream_handle

    # Worker state
    self.status = WorkerStatus.IDLE
    self._message_queue = MessageQueue()
    self._worker_task: asyncio.Task[None] | None = None
    self._started_at: float | None = None
    self._completed_at: float | None = None

    # Statistics
    self._total_messages = 0
    self._messages_delivered = 0
    self._messages_discarded = 0
    self._last_message_time: float | None = None

start async

start()

Start the worker task.

Source code in ccproxy/claude_sdk/stream_worker.py
async def start(self) -> None:
    """Start the worker task."""
    if self.status != WorkerStatus.IDLE:
        logger.warning(
            "stream_worker_already_started",
            worker_id=self.worker_id,
            status=self.status,
        )
        return

    self.status = WorkerStatus.STARTING
    self._started_at = time.time()

    # Create worker task
    self._worker_task = asyncio.create_task(self._run_worker())

    logger.debug(
        "stream_worker_started",
        worker_id=self.worker_id,
        session_id=self.session_id,
        request_id=self.request_id,
    )

stop async

stop(timeout=5.0)

Stop the worker gracefully.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for worker to stop

5.0
Source code in ccproxy/claude_sdk/stream_worker.py
async def stop(self, timeout: float = 5.0) -> None:
    """Stop the worker gracefully.

    Args:
        timeout: Maximum time to wait for worker to stop
    """
    if self._worker_task and not self._worker_task.done():
        logger.debug(
            "stream_worker_stopping",
            worker_id=self.worker_id,
            timeout=timeout,
        )

        # Cancel the worker task
        self._worker_task.cancel()

        try:
            # Use asyncio.wait instead of wait_for to handle cancelled tasks properly
            done, pending = await asyncio.wait(
                [self._worker_task],
                timeout=timeout,
                return_when=asyncio.ALL_COMPLETED,
            )

            if pending:
                logger.warning(
                    "stream_worker_stop_timeout",
                    worker_id=self.worker_id,
                    timeout=timeout,
                )
            elif done:
                # Task completed (likely with CancelledError)
                logger.debug(
                    "stream_worker_stopped",
                    worker_id=self.worker_id,
                    task_cancelled=self._worker_task.cancelled(),
                )

        except Exception as e:
            logger.warning(
                "stream_worker_stop_error",
                worker_id=self.worker_id,
                error=str(e),
                error_type=type(e).__name__,
            )

wait_for_completion async

wait_for_completion(timeout=None)

Wait for the worker to complete.

Parameters:

Name Type Description Default
timeout float | None

Optional timeout in seconds

None

Returns:

Type Description
bool

True if completed successfully, False if timed out

Source code in ccproxy/claude_sdk/stream_worker.py
async def wait_for_completion(self, timeout: float | None = None) -> bool:
    """Wait for the worker to complete.

    Args:
        timeout: Optional timeout in seconds

    Returns:
        True if completed successfully, False if timed out
    """
    if not self._worker_task:
        return True

    try:
        if timeout:
            await asyncio.wait_for(self._worker_task, timeout=timeout)
        else:
            await self._worker_task
        return True
    except TimeoutError:
        return False

get_message_queue

get_message_queue()

Get the message queue for creating listeners.

Returns:

Type Description
MessageQueue

The worker's message queue

Source code in ccproxy/claude_sdk/stream_worker.py
def get_message_queue(self) -> MessageQueue:
    """Get the message queue for creating listeners.

    Returns:
        The worker's message queue
    """
    return self._message_queue

drain_remaining async

drain_remaining(timeout=30.0)

Drain remaining messages without listeners.

This is useful for ensuring the stream completes properly even after all listeners have disconnected.

Parameters:

Name Type Description Default
timeout float

Maximum time to spend draining

30.0

Returns:

Type Description
int

Number of messages drained

Source code in ccproxy/claude_sdk/stream_worker.py
async def drain_remaining(self, timeout: float = 30.0) -> int:
    """Drain remaining messages without listeners.

    This is useful for ensuring the stream completes properly
    even after all listeners have disconnected.

    Args:
        timeout: Maximum time to spend draining

    Returns:
        Number of messages drained
    """
    if self.status not in (WorkerStatus.RUNNING, WorkerStatus.STARTING):
        return 0

    self.status = WorkerStatus.DRAINING
    start_time = time.time()
    drained_count = 0

    logger.debug(
        "stream_worker_draining",
        worker_id=self.worker_id,
        timeout=timeout,
    )

    try:
        # Continue consuming but without broadcasting
        async for message in self._message_iterator:
            drained_count += 1
            self._total_messages += 1
            self._messages_discarded += 1

            logger.debug(
                "stream_worker_draining_message",
                worker_id=self.worker_id,
                message_type=type(message).__name__,
                drained_count=drained_count,
            )

            # Check timeout
            if time.time() - start_time > timeout:
                logger.warning(
                    "stream_worker_drain_timeout",
                    worker_id=self.worker_id,
                    drained_count=drained_count,
                    timeout=timeout,
                )
                break

            # Check for completion message
            if isinstance(message, sdk_models.ResultMessage):
                logger.debug(
                    "stream_worker_drain_complete",
                    worker_id=self.worker_id,
                    drained_count=drained_count,
                )
                break

    except Exception as e:
        logger.error(
            "stream_worker_drain_error",
            worker_id=self.worker_id,
            error=str(e),
            drained_count=drained_count,
        )

    return drained_count

get_stats

get_stats()

Get worker statistics.

Returns:

Type Description
dict[str, Any]

Dictionary of worker statistics

Source code in ccproxy/claude_sdk/stream_worker.py
def get_stats(self) -> dict[str, Any]:
    """Get worker statistics.

    Returns:
        Dictionary of worker statistics
    """
    runtime = None
    if self._started_at:
        end_time = self._completed_at or time.time()
        runtime = end_time - self._started_at

    queue_stats = self._message_queue.get_stats()

    return {
        "worker_id": self.worker_id,
        "status": self.status.value,
        "session_id": self.session_id,
        "request_id": self.request_id,
        "total_messages": self._total_messages,
        "messages_delivered": self._messages_delivered,
        "messages_discarded": self._messages_discarded,
        "runtime_seconds": runtime,
        "queue_stats": queue_stats,
    }