Skip to content

ccproxy.llms.streaming

ccproxy.llms.streaming

Streaming utilities for LLM response formatting.

This module provides Server-Sent Events (SSE) formatting for various LLM streaming response formats including OpenAI-compatible and Anthropic formats.

ClaudeAccumulator

ClaudeAccumulator()

Bases: StreamAccumulator

Accumulate Anthropic/Claude streaming events.

Source code in ccproxy/llms/streaming/accumulators.py
def __init__(self) -> None:
    super().__init__()
    self._index_to_key: dict[str, str] = {}
    self.content_blocks: list[dict[str, Any]] = []
    self.content_block_map: dict[str, dict[str, Any]] = {}  # Maps block_id to block
    self.message_metadata: dict[str, Any] = {
        "id": None,
        "type": "message",
        "role": "assistant",
        "model": None,
    }
    self._usage: dict[str, int] = {}
    self.stop_reason: str | None = None

accumulate

accumulate(event_name, event_data)

Accumulate Claude streaming events.

Processes Claude-specific event types like: - content_block_start - content_block_delta - content_block_stop

Parameters:

Name Type Description Default
event_name str

Name of the event

required
event_data dict[str, Any]

Data associated with the event

required
Source code in ccproxy/llms/streaming/accumulators.py
def accumulate(self, event_name: str, event_data: dict[str, Any]) -> None:
    """Accumulate Claude streaming events.

    Processes Claude-specific event types like:
    - content_block_start
    - content_block_delta
    - content_block_stop

    Args:
        event_name: Name of the event
        event_data: Data associated with the event
    """
    if event_name == "message_start":
        if (
            isinstance(event_data, dict)
            and event_data.get("type") == "message_start"
        ):
            message = event_data.get("message", {})
            if isinstance(message, dict):
                self.message_metadata["id"] = (
                    message.get("id") or self.message_metadata["id"]
                )
                self.message_metadata["type"] = message.get("type", "message")
                self.message_metadata["role"] = message.get("role", "assistant")
                self.message_metadata["model"] = (
                    message.get("model") or self.message_metadata["model"]
                )

                usage = message.get("usage")
                if isinstance(usage, dict):
                    self._merge_usage(usage)

    elif event_name == "message_delta":
        if (
            isinstance(event_data, dict)
            and event_data.get("type") == "message_delta"
        ):
            delta = event_data.get("delta")
            if isinstance(delta, dict):
                stop_reason = delta.get("stop_reason")
                if isinstance(stop_reason, str):
                    self.stop_reason = stop_reason

            usage = event_data.get("usage")
            if isinstance(usage, dict):
                self._merge_usage(usage)

    elif event_name == "message_stop":
        if (
            isinstance(event_data, dict)
            and event_data.get("type") == "message_stop"
        ):
            # No additional fields required, but keep hook for completeness.
            pass

    if event_name == "content_block_start":
        if (
            isinstance(event_data, dict)
            and event_data.get("type") == "content_block_start"
        ):
            block = event_data.get("content_block", {})
            if not isinstance(block, dict):
                return

            index_value = str(event_data.get("index", 0))
            block_id = block.get("id") or f"block_{index_value}_{len(self.tools)}"
            self._index_to_key[index_value] = block_id

            # Store block based on its type
            block_type = block.get("type", "")

            if block_type == "tool_use":
                input_payload = block.get("input")
                order = len(self.tools)
                self.tools[block_id] = {
                    "id": block.get("id"),
                    "name": block.get("name"),
                    "input": input_payload
                    if isinstance(input_payload, dict)
                    else {},
                    "partial_json": "",
                    "index": order,
                    "order": order,
                    "type": "tool_use",
                }

            # Save all content blocks for rebuilding the full response
            self.content_block_map[block_id] = {
                "id": block.get("id", block_id),
                "type": block_type,
                "index": int(index_value),
            }

            # Add type-specific fields
            if block_type == "text":
                self.content_block_map[block_id]["text"] = ""
            elif block_type == "tool_use":
                self.content_block_map[block_id]["name"] = block.get("name")
                self.content_block_map[block_id]["input"] = block.get("input", {})
            elif block_type == "thinking":
                self.content_block_map[block_id]["thinking"] = ""
                signature = block.get("signature")
                if isinstance(signature, str) and signature:
                    self.content_block_map[block_id]["signature"] = signature

            # Set current content block for delta updates
            self.current_content_block = (
                str(block_id) if block_id is not None else None
            )

    elif event_name == "content_block_delta":
        if (
            isinstance(event_data, dict)
            and event_data.get("type") == "content_block_delta"
        ):
            index_value = str(event_data.get("index", 0))
            block_id = self._index_to_key.get(index_value)
            delta = event_data.get("delta", {})

            if block_id and isinstance(delta, dict):
                # For tool use blocks
                if (
                    delta.get("type") == "input_json_delta"
                    and block_id in self.tools
                ):
                    self.tools[block_id]["partial_json"] += delta.get(
                        "partial_json", ""
                    )

                # For text blocks
                elif (
                    delta.get("type") in {"text_delta", "text"}
                    and block_id in self.content_block_map
                ):
                    block = self.content_block_map[block_id]
                    if block.get("type") == "text":
                        block["text"] = block.get("text", "") + delta.get(
                            "text", ""
                        )
                        self.text_content += delta.get("text", "")

                # For thinking blocks
                elif (
                    delta.get("type") in {"thinking_delta", "thinking"}
                    and block_id in self.content_block_map
                ):
                    block = self.content_block_map[block_id]
                    if block.get("type") == "thinking":
                        block["thinking"] = block.get("thinking", "") + delta.get(
                            "thinking", ""
                        )

    elif event_name == "content_block_stop":
        if (
            isinstance(event_data, dict)
            and event_data.get("type") == "content_block_stop"
        ):
            index_value = str(event_data.get("index", 0))
            block_id = self._index_to_key.get(index_value)

            # Finalize tool use blocks by parsing JSON
            if block_id in self.tools and self.tools[block_id]["partial_json"]:
                try:
                    payload = self.tools[block_id]["partial_json"]
                    self.tools[block_id]["input"] = json.loads(payload)

                    # Also update in content block map
                    if block_id in self.content_block_map:
                        self.content_block_map[block_id]["input"] = json.loads(
                            payload
                        )
                except json.JSONDecodeError as exc:
                    logger.warning(
                        "claude_tool_json_decode_failed",
                        error=str(exc),
                        raw=self.tools[block_id]["partial_json"],
                    )

            # Finalize the current content block and add to ordered list
            if block_id in self.content_block_map:
                block = self.content_block_map[block_id]
                if block not in self.content_blocks:
                    self.content_blocks.append(block)

get_complete_tool_calls

get_complete_tool_calls()

Get complete tool calls accumulated so far.

Returns:

Type Description
list[dict[str, Any]]

List of complete tool calls

Source code in ccproxy/llms/streaming/accumulators.py
def get_complete_tool_calls(self) -> list[dict[str, Any]]:
    """Get complete tool calls accumulated so far.

    Returns:
        List of complete tool calls
    """
    complete: list[dict[str, Any]] = []

    for tool_data in self.tools.values():
        if tool_data.get("input") is None:
            continue

        complete.append(
            {
                "id": tool_data.get("id"),
                "type": "function",
                "name": tool_data.get("name"),
                "input": tool_data.get("input"),
                "function": {
                    "name": tool_data.get("name"),
                    "arguments": json.dumps(
                        tool_data.get("input", {}), ensure_ascii=False
                    ),
                },
                "index": tool_data.get("index"),
                "order": tool_data.get("order"),
            }
        )

    return complete

rebuild_response_object

rebuild_response_object(response)

Rebuild the complete Claude response with all accumulated content.

Parameters:

Name Type Description Default
response dict[str, Any]

Original Claude response

required

Returns:

Type Description
dict[str, Any]

Rebuilt response with complete content

Source code in ccproxy/llms/streaming/accumulators.py
def rebuild_response_object(self, response: dict[str, Any]) -> dict[str, Any]:
    """Rebuild the complete Claude response with all accumulated content.

    Args:
        response: Original Claude response

    Returns:
        Rebuilt response with complete content
    """
    content_blocks: list[dict[str, Any]] = []
    if self.content_blocks:
        sorted_blocks = sorted(self.content_blocks, key=lambda x: x.get("index", 0))
        for block in sorted_blocks:
            block_type = block.get("type")
            if block_type == "text":
                content_blocks.append(
                    {
                        "type": "text",
                        "text": block.get("text", ""),
                    }
                )
            elif block_type == "tool_use":
                entry = {
                    "type": "tool_use",
                    "id": block.get("id"),
                    "name": block.get("name"),
                    "input": block.get("input", {}),
                }
                content_blocks.append(
                    {k: v for k, v in entry.items() if v not in (None, "")}
                )
            elif block_type == "thinking":
                content_blocks.append(
                    {
                        "type": "thinking",
                        "thinking": block.get("thinking", ""),
                        "signature": block.get("signature", ""),
                    }
                )

    usage_payload = {
        "input_tokens": int(self._usage.get("input_tokens", 0)),
        "output_tokens": int(self._usage.get("output_tokens", 0)),
    }
    if "cache_read_input_tokens" in self._usage:
        usage_payload["cache_read_input_tokens"] = int(
            self._usage.get("cache_read_input_tokens", 0)
        )
    else:
        usage_payload["cache_read_input_tokens"] = 0

    rebuilt: dict[str, Any] = {
        "id": self.message_metadata.get("id") or response.get("id"),
        "type": self.message_metadata.get("type", "message"),
        "role": self.message_metadata.get("role", "assistant"),
        "content": content_blocks,
        "model": self.message_metadata.get("model") or response.get("model"),
        "stop_reason": self.stop_reason or response.get("stop_reason"),
        "usage": usage_payload,
    }

    if self.text_content:
        rebuilt["text"] = self.text_content

    return rebuilt

get_block_info

get_block_info(index)

Return (block_id, block_data) for a content block index.

Source code in ccproxy/llms/streaming/accumulators.py
def get_block_info(self, index: int) -> tuple[str, dict[str, Any]] | None:
    """Return (block_id, block_data) for a content block index."""

    if index < 0:
        return None

    block_id = self._index_to_key.get(str(index))
    if not block_id:
        return None

    block = self.content_block_map.get(block_id)
    if block is None:
        return None

    return block_id, block

get_tool_entry

get_tool_entry(identifier)

Fetch the tool metadata tracked by the accumulator.

Parameters:

Name Type Description Default
identifier int | str

Either the integer index from the stream event or the underlying block identifier tracked by the accumulator.

required

Returns:

Type Description
dict[str, Any] | None

The tracked tool entry if present.

Source code in ccproxy/llms/streaming/accumulators.py
def get_tool_entry(
    self,
    identifier: int | str,
) -> dict[str, Any] | None:
    """Fetch the tool metadata tracked by the accumulator.

    Args:
        identifier: Either the integer index from the stream event or the
            underlying block identifier tracked by the accumulator.

    Returns:
        The tracked tool entry if present.
    """

    block_id: str | None
    if isinstance(identifier, int):
        info = self.get_block_info(identifier)
        block_id = info[0] if info else None
    else:
        block_id = identifier

    if not block_id:
        return None

    return self.tools.get(block_id)

OpenAIAccumulator

OpenAIAccumulator()

Bases: StreamAccumulator

Accumulate tool calls emitted via OpenAI chat/completion deltas.

Source code in ccproxy/llms/streaming/accumulators.py
def __init__(self) -> None:
    super().__init__()
    # Track the most recent entry key per choice index so anonymous deltas
    # append to the correct in-flight tool call instead of creating a new slot.
    self._index_to_key: dict[str, str] = {}
    self.choices: dict[int, dict[str, Any]] = {}
    self.message_content: dict[int, str] = {}

accumulate

accumulate(event_name, event_data)

Accumulate OpenAI streaming events.

Parameters:

Name Type Description Default
event_name str

Name of the event

required
event_data dict[str, Any]

Data associated with the event

required
Source code in ccproxy/llms/streaming/accumulators.py
def accumulate(self, event_name: str, event_data: dict[str, Any]) -> None:
    """Accumulate OpenAI streaming events.

    Args:
        event_name: Name of the event
        event_data: Data associated with the event
    """
    if not isinstance(event_data, dict) or "choices" not in event_data:
        return

    for choice in event_data.get("choices", []):
        if not isinstance(choice, dict):
            continue

        # Track choice index
        choice_index = choice.get("index", 0)

        # Initialize choice if not already tracked
        if choice_index not in self.choices:
            self.choices[choice_index] = {
                "index": choice_index,
                "message": {"role": "assistant", "content": ""},
                "finish_reason": None,
            }
            self.message_content[choice_index] = ""

        # Update finish reason if provided
        if "finish_reason" in choice:
            self.choices[choice_index]["finish_reason"] = choice["finish_reason"]

        # Update message content if provided
        delta = choice.get("delta", {})
        if not isinstance(delta, dict):
            continue

        # Update message role if provided
        if "role" in delta:
            self.choices[choice_index]["message"]["role"] = delta["role"]

        # Update message content if provided
        if "content" in delta and delta["content"] is not None:
            content = delta["content"]
            self.message_content[choice_index] += content
            self.choices[choice_index]["message"]["content"] += content
            self.text_content += content

        # Process tool calls
        if "tool_calls" not in delta:
            continue

        for tool_call in delta.get("tool_calls", []) or []:
            if not isinstance(tool_call, dict):
                continue

            index = int(tool_call.get("index", 0))
            index_key = str(index)

            previous_key = self._index_to_key.get(index_key)
            tool_id = tool_call.get("id")
            if isinstance(tool_id, str) and tool_id:
                key = tool_id
            else:
                key = previous_key or f"call_{index}"

            self._index_to_key[index_key] = key

            migrated_entry = None
            if previous_key and previous_key != key:
                migrated_entry = self.tools.pop(previous_key, None)

            entry = self.tools.get(key)
            if entry is None:
                if migrated_entry is not None:
                    entry = migrated_entry
                else:
                    entry = {
                        "id": None,
                        "type": None,
                        "function": {"name": None, "arguments": ""},
                        "index": index,
                        "order": len(self.tools),
                    }
                self.tools[key] = entry

            entry.setdefault("function", {"name": None, "arguments": ""})
            entry.setdefault("order", len(self.tools))
            entry["index"] = index

            if isinstance(tool_id, str) and tool_id:
                entry["id"] = tool_id
            elif not entry.get("id"):
                entry["id"] = key

            if "type" in tool_call:
                entry["type"] = tool_call["type"]

            function = tool_call.get("function", {})
            if isinstance(function, dict):
                if "name" in function:
                    name_value = function["name"]
                    if name_value:
                        entry["function"]["name"] = name_value
                if "arguments" in function:
                    entry["function"]["arguments"] += function["arguments"]

get_complete_tool_calls

get_complete_tool_calls()

Get complete tool calls accumulated so far.

Returns:

Type Description
list[dict[str, Any]]

List of complete tool calls

Source code in ccproxy/llms/streaming/accumulators.py
def get_complete_tool_calls(self) -> list[dict[str, Any]]:
    """Get complete tool calls accumulated so far.

    Returns:
        List of complete tool calls
    """
    complete: list[dict[str, Any]] = []

    for call_data in self.tools.values():
        arguments = call_data["function"].get("arguments")
        if not arguments:
            continue

        complete.append(
            {
                "id": call_data.get("id"),
                "type": call_data.get("type"),
                "index": call_data.get("index"),
                "order": call_data.get("order"),
                "function": {
                    "name": call_data["function"].get("name"),
                    "arguments": arguments,
                },
            }
        )

    return complete

rebuild_response_object

rebuild_response_object(response)

Rebuild the complete OpenAI response with all accumulated content.

Parameters:

Name Type Description Default
response dict[str, Any]

Original OpenAI response

required

Returns:

Type Description
dict[str, Any]

Rebuilt response with complete content

Source code in ccproxy/llms/streaming/accumulators.py
def rebuild_response_object(self, response: dict[str, Any]) -> dict[str, Any]:
    """Rebuild the complete OpenAI response with all accumulated content.

    Args:
        response: Original OpenAI response

    Returns:
        Rebuilt response with complete content
    """
    # Create a copy of the original response
    rebuilt = dict(response)

    # Rebuild choices with accumulated data
    if self.choices:
        # Convert choices dict to list and sort by index
        choice_list = list(self.choices.values())
        choice_list.sort(key=lambda x: x.get("index", 0))

        # Update choices in the response
        rebuilt["choices"] = choice_list

        # Update messages with tool calls
        tool_calls = self.get_complete_tool_calls()
        if tool_calls:
            # Add tool calls to each choice's message
            for choice in rebuilt["choices"]:
                if "message" in choice:
                    choice["message"]["tool_calls"] = tool_calls

    return rebuilt

ResponsesAccumulator

ResponsesAccumulator()

Bases: StreamAccumulator

Accumulate events emitted by the OpenAI Responses API using typed models.

Source code in ccproxy/llms/streaming/accumulators.py
def __init__(self) -> None:
    super().__init__()
    self._items: dict[str, openai_models.OutputItem] = {}
    self._items_by_index: dict[int, str] = {}
    self._text_fragments: dict[tuple[str, int], list[str]] = {}
    self._reasoning_summary: dict[
        str, dict[int, openai_models.ReasoningSummaryPart]
    ] = {}
    self._reasoning_text: dict[tuple[str, int], list[str]] = {}
    self._function_arguments: dict[str, list[str]] = {}
    self._latest_response: openai_models.ResponseObject | None = None
    self.completed_response: openai_models.ResponseObject | None = None
    self._sequence_counter = 0

accumulate

accumulate(event_name, event_data)

Accumulate Responses API streaming events.

Source code in ccproxy/llms/streaming/accumulators.py
def accumulate(
    self,
    event_name: str,
    event_data: dict[str, Any] | openai_models.BaseStreamEvent,
) -> None:
    """Accumulate Responses API streaming events."""

    event = self._coerce_stream_event(event_name, event_data)
    if event is None:
        return

    if isinstance(event, openai_models.ResponseCreatedEvent):
        self._latest_response = event.response
        return

    if isinstance(event, openai_models.ResponseInProgressEvent):
        self._latest_response = event.response
        return

    if isinstance(event, openai_models.ResponseCompletedEvent):
        self.completed_response = event.response
        return

    if isinstance(event, openai_models.ResponseOutputItemAddedEvent):
        self._record_output_item(event.output_index, event.item)
        return

    if isinstance(event, openai_models.ResponseOutputItemDoneEvent):
        self._merge_output_item(event.output_index, event.item)
        return

    if isinstance(event, openai_models.ResponseOutputTextDeltaEvent):
        self._accumulate_text_delta(
            item_id=event.item_id,
            content_index=event.content_index,
            delta=event.delta,
        )
        return

    if isinstance(event, openai_models.ResponseOutputTextDoneEvent):
        self._finalize_text(
            item_id=event.item_id,
            content_index=event.content_index,
            text=event.text,
        )
        return

    if isinstance(event, openai_models.ResponseFunctionCallArgumentsDeltaEvent):
        self._accumulate_function_arguments(event.item_id, event.delta)
        return

    if isinstance(event, openai_models.ResponseFunctionCallArgumentsDoneEvent):
        self._finalize_function_arguments(event.item_id, event.arguments)
        return

    if isinstance(event, openai_models.ReasoningSummaryPartAddedEvent):
        self._store_reasoning_summary_part(
            item_id=event.item_id,
            summary_index=event.summary_index,
            part=event.part,
        )
        return

    if isinstance(event, openai_models.ReasoningSummaryPartDoneEvent):
        self._store_reasoning_summary_part(
            item_id=event.item_id,
            summary_index=event.summary_index,
            part=event.part,
        )
        return

    if isinstance(event, openai_models.ReasoningSummaryTextDeltaEvent):
        self._accumulate_reasoning_text(
            item_id=event.item_id,
            summary_index=event.summary_index,
            delta=event.delta,
        )
        return

    if isinstance(event, openai_models.ReasoningSummaryTextDoneEvent):
        self._finalize_reasoning_text(
            item_id=event.item_id,
            summary_index=event.summary_index,
            text=event.text,
        )
        return

get_complete_tool_calls

get_complete_tool_calls()

Get complete tool calls accumulated so far.

Source code in ccproxy/llms/streaming/accumulators.py
def get_complete_tool_calls(self) -> list[dict[str, Any]]:
    """Get complete tool calls accumulated so far."""

    complete: list[dict[str, Any]] = []
    for item in self._items.values():
        if item.type != "function_call":
            continue
        arguments = self._get_function_arguments(item.id)
        if not (item.name and arguments):
            continue
        if item.status and item.status != "completed":
            continue

        complete.append(
            {
                "id": item.id,
                "type": "function_call",
                "call_id": item.call_id,
                "function": {
                    "name": item.name,
                    "arguments": arguments,
                },
            }
        )

    return complete

rebuild_response_object

rebuild_response_object(response)

Rebuild a complete Responses API payload with accumulated data.

Source code in ccproxy/llms/streaming/accumulators.py
def rebuild_response_object(self, response: dict[str, Any]) -> dict[str, Any]:
    """Rebuild a complete Responses API payload with accumulated data."""

    base_response = self.completed_response or self._latest_response
    response_model = self._coerce_response_object(base_response or response)
    if response_model is None:
        response_model = openai_models.ResponseObject(
            id=str(response.get("id", "response")),
            created_at=int(response.get("created_at", 0)),
            status=str(response.get("status", "completed")),
            model=str(response.get("model", "")),
            output=[],
            parallel_tool_calls=bool(response.get("parallel_tool_calls", False)),
        )

    outputs = self._build_outputs()
    if outputs:
        response_model = response_model.model_copy(update={"output": outputs})

    function_calls = self.get_complete_tool_calls()
    reasoning_summary = self._build_reasoning_summary()

    payload = response_model.model_dump()

    if function_calls:
        payload["tool_calls"] = function_calls

    if not reasoning_summary:
        fallback_summary: list[dict[str, Any]] = []
        for output_entry in payload.get("output", []):
            if not isinstance(output_entry, dict):
                continue
            if output_entry.get("type") != "reasoning":
                continue
            summary_list = output_entry.get("summary")
            if isinstance(summary_list, list):
                for part in summary_list:
                    if isinstance(part, dict):
                        fallback_summary.append(part)
        if fallback_summary:
            reasoning_summary = fallback_summary

    if reasoning_summary:
        reasoning_obj = payload.get("reasoning") or {}
        reasoning_obj["summary"] = reasoning_summary
        payload["reasoning"] = reasoning_obj

    if self.text_content:
        payload["text"] = self.text_content

    return payload

get_completed_response

get_completed_response()

Return the final response payload captured from the stream, if any.

Source code in ccproxy/llms/streaming/accumulators.py
def get_completed_response(self) -> dict[str, Any] | None:
    """Return the final response payload captured from the stream, if any."""

    if isinstance(self.completed_response, openai_models.ResponseObject):
        return self.completed_response.model_dump()
    return None

StreamAccumulator

StreamAccumulator()

Base class for accumulating streaming response chunks.

Source code in ccproxy/llms/streaming/accumulators.py
def __init__(self) -> None:
    self.tools: dict[str, dict[str, Any]] = {}
    self.content_blocks: list[dict[str, Any]] = []
    self.current_content_block: str | None = None
    self.text_content: str = ""

accumulate

accumulate(event_name, event_data)

Accumulate streaming events.

Parameters:

Name Type Description Default
event_name str

Name of the event (e.g., 'content_block_start')

required
event_data dict[str, Any]

Data associated with the event

required
Source code in ccproxy/llms/streaming/accumulators.py
def accumulate(self, event_name: str, event_data: dict[str, Any]) -> None:
    """Accumulate streaming events.

    Args:
        event_name: Name of the event (e.g., 'content_block_start')
        event_data: Data associated with the event
    """
    raise NotImplementedError

get_complete_tool_calls

get_complete_tool_calls()

Get complete tool calls accumulated so far.

Returns:

Type Description
list[dict[str, Any]]

List of complete tool calls

Source code in ccproxy/llms/streaming/accumulators.py
def get_complete_tool_calls(self) -> list[dict[str, Any]]:
    """Get complete tool calls accumulated so far.

    Returns:
        List of complete tool calls
    """
    raise NotImplementedError

rebuild_response_object

rebuild_response_object(response)

Rebuild the complete response object with accumulated content.

This method takes a response object and rebuilds it to include all accumulated content like tool calls, content blocks, thinking/reasoning, etc.

Parameters:

Name Type Description Default
response dict[str, Any]

The original response object

required

Returns:

Type Description
dict[str, Any]

The updated response with all accumulated content

Source code in ccproxy/llms/streaming/accumulators.py
def rebuild_response_object(self, response: dict[str, Any]) -> dict[str, Any]:
    """Rebuild the complete response object with accumulated content.

    This method takes a response object and rebuilds it to include all accumulated
    content like tool calls, content blocks, thinking/reasoning, etc.

    Args:
        response: The original response object

    Returns:
        The updated response with all accumulated content
    """
    raise NotImplementedError

AnthropicSSEFormatter

Formats streaming responses to match Anthropic's Messages API SSE format.

format_event staticmethod

format_event(event_type, data)

Format an event for Anthropic Messages API Server-Sent Events.

Parameters:

Name Type Description Default
event_type str

Event type (e.g., 'message_start', 'content_block_delta')

required
data dict[str, Any]

Event data dictionary

required

Returns:

Type Description
str

Formatted SSE string with event and data lines

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_event(event_type: str, data: dict[str, Any]) -> str:
    """Format an event for Anthropic Messages API Server-Sent Events.

    Args:
        event_type: Event type (e.g., 'message_start', 'content_block_delta')
        data: Event data dictionary

    Returns:
        Formatted SSE string with event and data lines
    """
    json_data = json.dumps(data, separators=(",", ":"))
    return f"event: {event_type}\ndata: {json_data}\n\n"

format_ping staticmethod

format_ping()

Format a ping event.

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_ping() -> str:
    """Format a ping event."""
    return 'event: ping\ndata: {"type": "ping"}\n\n'

format_done staticmethod

format_done()

Format the final [DONE] event.

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_done() -> str:
    """Format the final [DONE] event."""
    return "data: [DONE]\n\n"

OpenAISSEFormatter

Formats streaming responses to match OpenAI's SSE format.

format_data_event staticmethod

format_data_event(data)

Format a data event for OpenAI-compatible Server-Sent Events.

Parameters:

Name Type Description Default
data dict[str, Any]

Event data dictionary

required

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_data_event(data: dict[str, Any]) -> str:
    """Format a data event for OpenAI-compatible Server-Sent Events.

    Args:
        data: Event data dictionary

    Returns:
        Formatted SSE string
    """
    json_data = json.dumps(data, separators=(",", ":"))
    return f"data: {json_data}\n\n"

format_first_chunk staticmethod

format_first_chunk(
    message_id, model, created, role="assistant"
)

Format the first chunk with role and basic metadata.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
role str

Role of the assistant

'assistant'

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_first_chunk(
    message_id: str, model: str, created: int, role: str = "assistant"
) -> str:
    """Format the first chunk with role and basic metadata.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        role: Role of the assistant

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": 0,
                "delta": {"role": role},
                "logprobs": None,
                "finish_reason": None,
            }
        ],
    }
    return OpenAISSEFormatter.format_data_event(data)

format_content_chunk staticmethod

format_content_chunk(
    message_id, model, created, content, choice_index=0
)

Format a content chunk with text delta.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
content str

Text content to include in the delta

required
choice_index int

Index of the choice (usually 0)

0

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_content_chunk(
    message_id: str, model: str, created: int, content: str, choice_index: int = 0
) -> str:
    """Format a content chunk with text delta.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        content: Text content to include in the delta
        choice_index: Index of the choice (usually 0)

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": choice_index,
                "delta": {"content": content},
                "logprobs": None,
                "finish_reason": None,
            }
        ],
    }
    return OpenAISSEFormatter.format_data_event(data)

format_tool_call_chunk staticmethod

format_tool_call_chunk(
    message_id,
    model,
    created,
    tool_call_id,
    function_name=None,
    function_arguments=None,
    tool_call_index=0,
    choice_index=0,
)

Format a tool call chunk.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
tool_call_id str

ID of the tool call

required
function_name str | None

Name of the function being called

None
function_arguments str | None

Arguments for the function

None
tool_call_index int

Index of the tool call

0
choice_index int

Index of the choice (usually 0)

0

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_tool_call_chunk(
    message_id: str,
    model: str,
    created: int,
    tool_call_id: str,
    function_name: str | None = None,
    function_arguments: str | None = None,
    tool_call_index: int = 0,
    choice_index: int = 0,
) -> str:
    """Format a tool call chunk.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        tool_call_id: ID of the tool call
        function_name: Name of the function being called
        function_arguments: Arguments for the function
        tool_call_index: Index of the tool call
        choice_index: Index of the choice (usually 0)

    Returns:
        Formatted SSE string
    """
    tool_call: dict[str, Any] = {
        "index": tool_call_index,
        "id": tool_call_id,
        "type": "function",
        "function": {},
    }

    if function_name is not None:
        tool_call["function"]["name"] = function_name

    if function_arguments is not None:
        tool_call["function"]["arguments"] = function_arguments

    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": choice_index,
                "delta": {"tool_calls": [tool_call]},
                "logprobs": None,
                "finish_reason": None,
            }
        ],
    }
    return OpenAISSEFormatter.format_data_event(data)

format_final_chunk staticmethod

format_final_chunk(
    message_id,
    model,
    created,
    finish_reason="stop",
    choice_index=0,
    usage=None,
)

Format the final chunk with finish_reason.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
finish_reason str

Reason for completion (stop, length, tool_calls, etc.)

'stop'
choice_index int

Index of the choice (usually 0)

0
usage dict[str, int] | None

Optional usage information to include

None

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_final_chunk(
    message_id: str,
    model: str,
    created: int,
    finish_reason: str = "stop",
    choice_index: int = 0,
    usage: dict[str, int] | None = None,
) -> str:
    """Format the final chunk with finish_reason.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        finish_reason: Reason for completion (stop, length, tool_calls, etc.)
        choice_index: Index of the choice (usually 0)
        usage: Optional usage information to include

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {
                "index": choice_index,
                "delta": {},
                "logprobs": None,
                "finish_reason": finish_reason,
            }
        ],
    }

    # Add usage if provided
    if usage:
        data["usage"] = usage

    return OpenAISSEFormatter.format_data_event(data)

format_error_chunk staticmethod

format_error_chunk(
    message_id, model, created, error_type, error_message
)

Format an error chunk.

Parameters:

Name Type Description Default
message_id str

Unique identifier for the completion

required
model str

Model name being used

required
created int

Unix timestamp when the completion was created

required
error_type str

Type of error

required
error_message str

Error message

required

Returns:

Type Description
str

Formatted SSE string

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_error_chunk(
    message_id: str, model: str, created: int, error_type: str, error_message: str
) -> str:
    """Format an error chunk.

    Args:
        message_id: Unique identifier for the completion
        model: Model name being used
        created: Unix timestamp when the completion was created
        error_type: Type of error
        error_message: Error message

    Returns:
        Formatted SSE string
    """
    data = {
        "id": message_id,
        "object": "chat.completion.chunk",
        "created": created,
        "model": model,
        "choices": [
            {"index": 0, "delta": {}, "logprobs": None, "finish_reason": "error"}
        ],
        "error": {"type": error_type, "message": error_message},
    }
    return OpenAISSEFormatter.format_data_event(data)

format_done staticmethod

format_done()

Format the final DONE event.

Returns:

Type Description
str

Formatted SSE termination string

Source code in ccproxy/llms/streaming/formatters.py
@staticmethod
def format_done() -> str:
    """Format the final DONE event.

    Returns:
        Formatted SSE termination string
    """
    return "data: [DONE]\n\n"

AnthropicStreamProcessor

AnthropicStreamProcessor(
    model="claude-3-5-sonnet-20241022",
)

Processes OpenAI streaming data into Anthropic SSE format.

Parameters:

Name Type Description Default
model str

Model name for responses

'claude-3-5-sonnet-20241022'
Source code in ccproxy/llms/streaming/processors.py
def __init__(self, model: str = "claude-3-5-sonnet-20241022"):
    """Initialize the stream processor.

    Args:
        model: Model name for responses
    """
    self.model = model
    self.formatter = AnthropicSSEFormatter()

process_stream async

process_stream(stream)

Process OpenAI-format streaming data into Anthropic SSE format.

Parameters:

Name Type Description Default
stream AsyncIterator[dict[str, Any]]

Async iterator of OpenAI-style response chunks

required

Yields:

Type Description
AsyncIterator[str]

Anthropic-formatted SSE strings with proper event: lines

Source code in ccproxy/llms/streaming/processors.py
async def process_stream(
    self, stream: AsyncIterator[dict[str, Any]]
) -> AsyncIterator[str]:
    """Process OpenAI-format streaming data into Anthropic SSE format.

    Args:
        stream: Async iterator of OpenAI-style response chunks

    Yields:
        Anthropic-formatted SSE strings with proper event: lines
    """
    message_started = False
    content_block_started = False

    async for chunk in stream:
        if not isinstance(chunk, dict):
            continue

        chunk_type = chunk.get("type")

        if chunk_type == "message_start":
            if not message_started:
                yield self.formatter.format_event("message_start", chunk)
                message_started = True

        elif chunk_type == "content_block_start":
            if not content_block_started:
                yield self.formatter.format_event("content_block_start", chunk)
                content_block_started = True

        elif chunk_type == "content_block_delta":
            yield self.formatter.format_event("content_block_delta", chunk)

        elif chunk_type == "ping":
            yield self.formatter.format_ping()

        elif chunk_type == "content_block_stop":
            yield self.formatter.format_event("content_block_stop", chunk)

        elif chunk_type == "message_delta":
            yield self.formatter.format_event("message_delta", chunk)

        elif chunk_type == "message_stop":
            yield self.formatter.format_event("message_stop", chunk)
            break

OpenAIStreamProcessor

OpenAIStreamProcessor(
    message_id=None,
    model="claude-3-5-sonnet-20241022",
    created=None,
    enable_usage=True,
    enable_tool_calls=True,
    enable_thinking_serialization=None,
    output_format="sse",
)

Processes Anthropic/Claude streaming responses into OpenAI format.

Parameters:

Name Type Description Default
message_id str | None

Response ID, generated if not provided

None
model str

Model name for responses

'claude-3-5-sonnet-20241022'
created int | None

Creation timestamp, current time if not provided

None
enable_usage bool

Whether to include usage information

True
enable_tool_calls bool

Whether to process tool calls

True
output_format Literal['sse', 'dict']

Output format - "sse" for Server-Sent Events strings, "dict" for dict objects

'sse'
Source code in ccproxy/llms/streaming/processors.py
def __init__(
    self,
    message_id: str | None = None,
    model: str = "claude-3-5-sonnet-20241022",
    created: int | None = None,
    enable_usage: bool = True,
    enable_tool_calls: bool = True,
    enable_thinking_serialization: bool | None = None,
    output_format: Literal["sse", "dict"] = "sse",
):
    """Initialize the stream processor.

    Args:
        message_id: Response ID, generated if not provided
        model: Model name for responses
        created: Creation timestamp, current time if not provided
        enable_usage: Whether to include usage information
        enable_tool_calls: Whether to process tool calls
        output_format: Output format - "sse" for Server-Sent Events strings, "dict" for dict objects
    """
    # Import here to avoid circular imports
    from ccproxy.llms.models.openai import generate_responses_id

    self.message_id = message_id or generate_responses_id()
    self.model = model
    self.created = created or int(time.time())
    self.enable_usage = enable_usage
    self.enable_tool_calls = enable_tool_calls
    self.output_format = output_format
    if enable_thinking_serialization is None:
        # Prefer service Settings.llm.openai_thinking_xml if available
        setting_val: bool | None = None
        try:
            from ccproxy.config.settings import Settings

            cfg = Settings.from_config()
            setting_val = bool(
                getattr(getattr(cfg, "llm", {}), "openai_thinking_xml", True)
            )
        except Exception:
            setting_val = None

        if setting_val is not None:
            self.enable_thinking_serialization = setting_val
        else:
            # Fallback to env-based toggle
            env_val = (
                os.getenv("LLM__OPENAI_THINKING_XML")
                or os.getenv("OPENAI_STREAM_ENABLE_THINKING_SERIALIZATION")
                or "true"
            ).lower()
            self.enable_thinking_serialization = env_val not in (
                "0",
                "false",
                "no",
                "off",
            )
    else:
        self.enable_thinking_serialization = enable_thinking_serialization
    self.formatter = OpenAISSEFormatter()

    # State tracking
    self.role_sent = False
    self.accumulated_content = ""
    self.tool_calls: dict[str, dict[str, Any]] = {}
    self.usage_info: dict[str, int] | None = None
    # Thinking block tracking
    self.current_thinking_text = ""
    self.current_thinking_signature: str | None = None
    self.thinking_block_active = False

process_stream async

process_stream(claude_stream)

Process a Claude/Anthropic stream into OpenAI format.

Parameters:

Name Type Description Default
claude_stream AsyncIterator[dict[str, Any]]

Async iterator of Claude response chunks

required

Yields:

Type Description
AsyncIterator[str | dict[str, Any]]

OpenAI-formatted SSE strings or dict objects based on output_format

Source code in ccproxy/llms/streaming/processors.py
async def process_stream(
    self, claude_stream: AsyncIterator[dict[str, Any]]
) -> AsyncIterator[str | dict[str, Any]]:
    """Process a Claude/Anthropic stream into OpenAI format.

    Args:
        claude_stream: Async iterator of Claude response chunks

    Yields:
        OpenAI-formatted SSE strings or dict objects based on output_format
    """
    # Get logger with request context at the start of the function
    logger = get_logger(__name__)

    try:
        chunk_count = 0
        processed_count = 0
        logger.debug(
            "openai_stream_processor_start",
            message_id=self.message_id,
            model=self.model,
            output_format=self.output_format,
            enable_usage=self.enable_usage,
            enable_tool_calls=self.enable_tool_calls,
            category="streaming_conversion",
            enable_thinking_serialization=self.enable_thinking_serialization,
        )

        async for chunk in claude_stream:
            chunk_count += 1
            chunk_type = chunk.get("type", "unknown")

            logger.trace(
                "openai_processor_input_chunk",
                chunk_number=chunk_count,
                chunk_type=chunk_type,
                category="format_detection",
            )

            async for sse_chunk in self._process_chunk(chunk):
                processed_count += 1
                yield sse_chunk

        logger.debug(
            "openai_stream_complete",
            total_chunks=chunk_count,
            processed_chunks=processed_count,
            message_id=self.message_id,
            category="streaming_conversion",
        )

        # Send final chunk
        if self.usage_info and self.enable_usage:
            yield self._format_chunk_output(
                finish_reason="stop",
                usage=self.usage_info,
            )
        else:
            yield self._format_chunk_output(finish_reason="stop")

        # Send DONE event (only for SSE format)
        if self.output_format == "sse":
            yield self.formatter.format_done()

    except (OSError, PermissionError) as e:
        logger.error("stream_processing_io_error", error=str(e), exc_info=e)
        # Send error chunk for IO errors
        if self.output_format == "sse":
            yield self.formatter.format_error_chunk(
                self.message_id,
                self.model,
                self.created,
                "error",
                f"IO error: {str(e)}",
            )
            yield self.formatter.format_done()
        else:
            # Dict format error
            yield self._create_chunk_dict(finish_reason="error")
    except Exception as e:
        logger.error("stream_processing_error", error=str(e), exc_info=e)
        # Send error chunk
        if self.output_format == "sse":
            yield self.formatter.format_error_chunk(
                self.message_id, self.model, self.created, "error", str(e)
            )
            yield self.formatter.format_done()
        else:
            # Dict format error
            yield self._create_chunk_dict(finish_reason="error")