Skip to content

ccproxy.claude_sdk.message_queue

ccproxy.claude_sdk.message_queue

Message queue system for broadcasting SDK messages to multiple listeners.

MessageType

Bases: str, Enum

Types of messages that can be sent through the queue.

QueueMessage dataclass

QueueMessage(type, data=None, error=None, timestamp=time())

Message wrapper for queue communication.

QueueListener

QueueListener(listener_id=None)

Individual listener that consumes messages from the queue.

Parameters:

Name Type Description Default
listener_id str | None

Optional ID for the listener, generated if not provided

None
Source code in ccproxy/claude_sdk/message_queue.py
def __init__(self, listener_id: str | None = None):
    """Initialize a queue listener.

    Args:
        listener_id: Optional ID for the listener, generated if not provided
    """
    self.listener_id = listener_id or str(uuid.uuid4())
    self._queue: asyncio.Queue[QueueMessage] = asyncio.Queue()
    self._closed = False
    self._created_at = time.time()

is_closed property

is_closed

Check if the listener is closed.

queue_size property

queue_size

Get the current queue size.

get_message async

get_message()

Get the next message from the queue.

Returns:

Type Description
QueueMessage

The next queued message

Raises:

Type Description
QueueEmpty

If queue is empty and closed

Source code in ccproxy/claude_sdk/message_queue.py
async def get_message(self) -> QueueMessage:
    """Get the next message from the queue.

    Returns:
        The next queued message

    Raises:
        asyncio.QueueEmpty: If queue is empty and closed
    """
    if self._closed and self._queue.empty():
        raise asyncio.QueueEmpty("Listener is closed")

    return await self._queue.get()

put_message async

put_message(message)

Put a message into this listener's queue.

Parameters:

Name Type Description Default
message QueueMessage

Message to queue

required
Source code in ccproxy/claude_sdk/message_queue.py
async def put_message(self, message: QueueMessage) -> None:
    """Put a message into this listener's queue.

    Args:
        message: Message to queue
    """
    if not self._closed:
        await self._queue.put(message)

close

close()

Close the listener, preventing new messages.

Source code in ccproxy/claude_sdk/message_queue.py
def close(self) -> None:
    """Close the listener, preventing new messages."""
    self._closed = True
    # Put a shutdown message to unblock any waiting consumers
    with contextlib.suppress(asyncio.QueueFull):
        self._queue.put_nowait(QueueMessage(type=MessageType.SHUTDOWN))

MessageQueue

MessageQueue(max_listeners=100)

Message queue that broadcasts to multiple listeners with discard logic.

Parameters:

Name Type Description Default
max_listeners int

Maximum number of concurrent listeners

100
Source code in ccproxy/claude_sdk/message_queue.py
def __init__(self, max_listeners: int = 100):
    """Initialize the message queue.

    Args:
        max_listeners: Maximum number of concurrent listeners
    """
    self._listeners: dict[str, QueueListener] = {}
    self._lock = asyncio.Lock()
    self._max_listeners = max_listeners
    self._total_messages_received = 0
    self._total_messages_delivered = 0
    self._total_messages_discarded = 0
    self._created_at = time.time()

create_listener async

create_listener(listener_id=None)

Create a new listener for this queue.

Parameters:

Name Type Description Default
listener_id str | None

Optional ID for the listener

None

Returns:

Type Description
QueueListener

A new QueueListener instance

Raises:

Type Description
RuntimeError

If max listeners exceeded

Source code in ccproxy/claude_sdk/message_queue.py
async def create_listener(self, listener_id: str | None = None) -> QueueListener:
    """Create a new listener for this queue.

    Args:
        listener_id: Optional ID for the listener

    Returns:
        A new QueueListener instance

    Raises:
        RuntimeError: If max listeners exceeded
    """
    async with self._lock:
        if len(self._listeners) >= self._max_listeners:
            raise RuntimeError(
                f"Maximum listeners ({self._max_listeners}) exceeded"
            )

        listener = QueueListener(listener_id)
        self._listeners[listener.listener_id] = listener

        logger.debug(
            "message_queue_listener_added",
            listener_id=listener.listener_id,
            active_listeners=len(self._listeners),
        )

        return listener

remove_listener async

remove_listener(listener_id)

Remove a listener from the queue.

Parameters:

Name Type Description Default
listener_id str

ID of the listener to remove

required
Source code in ccproxy/claude_sdk/message_queue.py
async def remove_listener(self, listener_id: str) -> None:
    """Remove a listener from the queue.

    Args:
        listener_id: ID of the listener to remove
    """
    async with self._lock:
        if listener_id in self._listeners:
            listener = self._listeners.pop(listener_id)
            listener.close()

            logger.debug(
                "message_queue_listener_removed",
                listener_id=listener_id,
                active_listeners=len(self._listeners),
                listener_queue_size=listener.queue_size,
            )

has_listeners async

has_listeners()

Check if any active listeners exist.

Returns:

Type Description
bool

True if at least one listener is registered

Source code in ccproxy/claude_sdk/message_queue.py
async def has_listeners(self) -> bool:
    """Check if any active listeners exist.

    Returns:
        True if at least one listener is registered
    """
    async with self._lock:
        return len(self._listeners) > 0

get_listener_count async

get_listener_count()

Get the current number of active listeners.

Returns:

Type Description
int

Number of active listeners

Source code in ccproxy/claude_sdk/message_queue.py
async def get_listener_count(self) -> int:
    """Get the current number of active listeners.

    Returns:
        Number of active listeners
    """
    async with self._lock:
        return len(self._listeners)

broadcast async

broadcast(message)

Broadcast a message to all active listeners.

Parameters:

Name Type Description Default
message Any

The message to broadcast

required

Returns:

Type Description
int

Number of listeners that received the message

Source code in ccproxy/claude_sdk/message_queue.py
async def broadcast(self, message: Any) -> int:
    """Broadcast a message to all active listeners.

    Args:
        message: The message to broadcast

    Returns:
        Number of listeners that received the message
    """
    self._total_messages_received += 1

    async with self._lock:
        if not self._listeners:
            self._total_messages_discarded += 1
            logger.debug(
                "message_queue_discard",
                reason="no_listeners",
                message_type=type(message).__name__,
            )
            return 0

        # Create queue message
        queue_msg = QueueMessage(type=MessageType.DATA, data=message)

        # Broadcast to all listeners
        delivered_count = 0
        for listener_id, listener in list(self._listeners.items()):
            if listener.is_closed:
                # Remove closed listeners
                self._listeners.pop(listener_id, None)
                continue

            try:
                # Use put_nowait to avoid blocking
                listener._queue.put_nowait(queue_msg)
                delivered_count += 1
            except asyncio.QueueFull:
                logger.warning(
                    "message_queue_listener_full",
                    listener_id=listener_id,
                    queue_size=listener.queue_size,
                )

        self._total_messages_delivered += delivered_count

        if delivered_count == 0:
            self._total_messages_discarded += 1

        logger.debug(
            "message_queue_broadcast",
            listeners_count=len(self._listeners),
            delivered_count=delivered_count,
            message_type=type(message).__name__,
        )

        return delivered_count

broadcast_error async

broadcast_error(error)

Broadcast an error to all listeners.

Parameters:

Name Type Description Default
error Exception

The error to broadcast

required
Source code in ccproxy/claude_sdk/message_queue.py
async def broadcast_error(self, error: Exception) -> None:
    """Broadcast an error to all listeners.

    Args:
        error: The error to broadcast
    """
    async with self._lock:
        queue_msg = QueueMessage(type=MessageType.ERROR, error=error)

        for listener in self._listeners.values():
            if not listener.is_closed:
                with contextlib.suppress(asyncio.QueueFull):
                    listener._queue.put_nowait(queue_msg)

        logger.debug(
            "message_queue_broadcast_error",
            error_type=type(error).__name__,
            listeners_count=len(self._listeners),
        )

broadcast_complete async

broadcast_complete()

Broadcast completion signal to all listeners.

Source code in ccproxy/claude_sdk/message_queue.py
async def broadcast_complete(self) -> None:
    """Broadcast completion signal to all listeners."""
    async with self._lock:
        queue_msg = QueueMessage(type=MessageType.COMPLETE)

        for listener in self._listeners.values():
            if not listener.is_closed:
                with contextlib.suppress(asyncio.QueueFull):
                    listener._queue.put_nowait(queue_msg)

        logger.debug(
            "message_queue_broadcast_complete",
            listeners_count=len(self._listeners),
        )

broadcast_shutdown async

broadcast_shutdown()

Broadcast shutdown signal to all listeners (for interrupts).

Source code in ccproxy/claude_sdk/message_queue.py
async def broadcast_shutdown(self) -> None:
    """Broadcast shutdown signal to all listeners (for interrupts)."""
    async with self._lock:
        queue_msg = QueueMessage(type=MessageType.SHUTDOWN)

        for listener in self._listeners.values():
            if not listener.is_closed:
                with contextlib.suppress(asyncio.QueueFull):
                    listener._queue.put_nowait(queue_msg)

        logger.debug(
            "message_queue_broadcast_shutdown",
            listeners_count=len(self._listeners),
            message="Shutdown signal sent to all listeners due to interrupt",
        )

close async

close()

Close the message queue and all listeners.

Source code in ccproxy/claude_sdk/message_queue.py
async def close(self) -> None:
    """Close the message queue and all listeners."""
    async with self._lock:
        # Send shutdown to all listeners
        queue_msg = QueueMessage(type=MessageType.SHUTDOWN)

        for listener in self._listeners.values():
            listener.close()

        self._listeners.clear()

        logger.debug(
            "message_queue_closed",
            total_messages_received=self._total_messages_received,
            total_messages_delivered=self._total_messages_delivered,
            total_messages_discarded=self._total_messages_discarded,
            lifetime_seconds=time.time() - self._created_at,
        )

get_stats

get_stats()

Get queue statistics.

Returns:

Type Description
dict[str, Any]

Dictionary of queue statistics

Source code in ccproxy/claude_sdk/message_queue.py
def get_stats(self) -> dict[str, Any]:
    """Get queue statistics.

    Returns:
        Dictionary of queue statistics
    """
    return {
        "active_listeners": len(self._listeners),
        "max_listeners": self._max_listeners,
        "total_messages_received": self._total_messages_received,
        "total_messages_delivered": self._total_messages_delivered,
        "total_messages_discarded": self._total_messages_discarded,
        "lifetime_seconds": time.time() - self._created_at,
        "delivery_rate": (
            self._total_messages_delivered / self._total_messages_received
            if self._total_messages_received > 0
            else 0.0
        ),
    }