Skip to content

ccproxy.llms.formatters.common.streams

ccproxy.llms.formatters.common.streams

Shared streaming helpers for formatter adapters.

ReasoningPartState dataclass

ReasoningPartState(
    buffer=list(), signature=None, open=False
)

Mutable reasoning buffer for a specific summary segment.

ReasoningBuffer

ReasoningBuffer()

Utility to manage reasoning text buffers keyed by item/summary ids.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, dict[Any, ReasoningPartState]] = {}

ToolCallState dataclass

ToolCallState(
    id,
    index,
    call_id=None,
    item_id=None,
    name=None,
    arguments="",
    arguments_parts=list(),
    output_index=-1,
    emitted=False,
    initial_emitted=False,
    name_emitted=False,
    arguments_emitted=False,
    arguments_done_emitted=False,
    item_done_emitted=False,
    added_emitted=False,
    completed=False,
    final_arguments=None,
)

Mutable state for a single streaming tool call.

ToolCallTracker

ToolCallTracker()

Registry tracking streaming tool calls by item identifier.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, ToolCallState] = {}
    self._order: list[str] = []

IndexedToolCallTracker

IndexedToolCallTracker()

Registry tracking streaming tool calls keyed by integer index.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[int, ToolCallState] = {}

ObfuscationTokenFactory

ObfuscationTokenFactory(fallback_identifier)

Utility for building deterministic obfuscation tokens.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self, fallback_identifier: Callable[[], str]) -> None:
    self._fallback_identifier = fallback_identifier

build_anthropic_tool_use_block

build_anthropic_tool_use_block(
    state, *, default_id=None, parser=None
)

Create an Anthropic ToolUseBlock from a tracked tool-call state.

Source code in ccproxy/llms/formatters/common/streams.py
def build_anthropic_tool_use_block(
    state: ToolCallState,
    *,
    default_id: str | None = None,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> anthropic_models.ToolUseBlock:
    """Create an Anthropic ToolUseBlock from a tracked tool-call state."""

    tool_id = state.item_id or state.call_id or default_id or f"call_{state.index}"
    arguments_text = (
        state.final_arguments or state.arguments or "".join(state.arguments_parts)
    )
    parse_input = parser or (lambda text: {"arguments": text} if text else {})
    input_payload = parse_input(arguments_text)

    return anthropic_models.ToolUseBlock(
        type="tool_use",
        id=str(tool_id),
        name=str(state.name or "tool"),
        input=input_payload,
    )

emit_anthropic_tool_use_events

emit_anthropic_tool_use_events(
    index, state, *, parser=None
)

Build start/stop events for a tool-use block at the given index.

Source code in ccproxy/llms/formatters/common/streams.py
def emit_anthropic_tool_use_events(
    index: int,
    state: ToolCallState,
    *,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> list[anthropic_models.MessageStreamEvent]:
    """Build start/stop events for a tool-use block at the given index."""

    block = build_anthropic_tool_use_block(
        state,
        default_id=f"call_{state.index}",
        parser=parser,
    )
    return [
        anthropic_models.ContentBlockStartEvent(
            type="content_block_start", index=index, content_block=block
        ),
        anthropic_models.ContentBlockStopEvent(type="content_block_stop", index=index),
    ]