Skip to content

ccproxy.llms.formatters.common.streams

ccproxy.llms.formatters.common.streams

Shared streaming helpers for formatter adapters.

ReasoningPartState dataclass

ReasoningPartState(
    buffer=list(), signature=None, open=False
)

Mutable reasoning buffer for a specific summary segment.

ReasoningBuffer

ReasoningBuffer()

Utility to manage reasoning text buffers keyed by item/summary ids.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, dict[Any, ReasoningPartState]] = {}

ToolCallState dataclass

ToolCallState(
    id,
    index,
    call_id=None,
    item_id=None,
    name=None,
    arguments="",
    arguments_parts=list(),
    output_index=-1,
    emitted=False,
    initial_emitted=False,
    name_emitted=False,
    arguments_emitted=False,
    arguments_done_emitted=False,
    item_done_emitted=False,
    added_emitted=False,
    completed=False,
    final_arguments=None,
)

Mutable state for a single streaming tool call.

ToolCallTracker

ToolCallTracker()

Registry tracking streaming tool calls by item identifier.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, ToolCallState] = {}
    self._order: list[str] = []

IndexedToolCallTracker

IndexedToolCallTracker()

Registry tracking streaming tool calls keyed by integer index.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[int, ToolCallState] = {}

ObfuscationTokenFactory

ObfuscationTokenFactory(fallback_identifier)

Utility for building deterministic obfuscation tokens.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self, fallback_identifier: Callable[[], str]) -> None:
    self._fallback_identifier = fallback_identifier

build_anthropic_tool_use_block

build_anthropic_tool_use_block(
    state, *, default_id=None, parser=None
)

Create an Anthropic ToolUseBlock from a tracked tool-call state.

Source code in ccproxy/llms/formatters/common/streams.py
def build_anthropic_tool_use_block(
    state: ToolCallState,
    *,
    default_id: str | None = None,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> anthropic_models.ToolUseBlock:
    """Create an Anthropic ToolUseBlock from a tracked tool-call state."""

    tool_id = state.item_id or state.call_id or default_id or f"call_{state.index}"
    arguments_text = (
        state.final_arguments or state.arguments or "".join(state.arguments_parts)
    )
    parse_input = parser or (lambda text: {"arguments": text} if text else {})
    input_payload = parse_input(arguments_text)

    return anthropic_models.ToolUseBlock(
        type="tool_use",
        id=str(tool_id),
        name=str(state.name or "tool"),
        input=input_payload,
    )

emit_anthropic_tool_use_events

emit_anthropic_tool_use_events(
    index, state, *, parser=None
)

Build start/delta/stop events for a tool-use block at the given index.

Per Anthropic streaming spec, tool_use.input starts empty in content_block_start and is filled via input_json_delta events; consumers that follow the spec (e.g. the Anthropic SDK) ignore an input attached directly to the start event.

Source code in ccproxy/llms/formatters/common/streams.py
def emit_anthropic_tool_use_events(
    index: int,
    state: ToolCallState,
    *,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> list[anthropic_models.MessageStreamEvent]:
    """Build start/delta/stop events for a tool-use block at the given index.

    Per Anthropic streaming spec, tool_use.input starts empty in
    content_block_start and is filled via input_json_delta events; consumers
    that follow the spec (e.g. the Anthropic SDK) ignore an input attached
    directly to the start event.
    """

    block = build_anthropic_tool_use_block(
        state,
        default_id=f"call_{state.index}",
        parser=parser,
    )
    input_payload = block.input or {}
    start_block = block.model_copy(update={"input": {}})
    partial_json = json.dumps(input_payload) if input_payload else ""

    events: list[anthropic_models.MessageStreamEvent] = [
        anthropic_models.ContentBlockStartEvent(
            type="content_block_start", index=index, content_block=start_block
        ),
    ]
    if partial_json:
        events.append(
            anthropic_models.ContentBlockDeltaEvent(
                type="content_block_delta",
                index=index,
                delta=anthropic_models.InputJsonDelta(
                    type="input_json_delta", partial_json=partial_json
                ),
            )
        )
    events.append(
        anthropic_models.ContentBlockStopEvent(type="content_block_stop", index=index)
    )
    return events