Skip to content

ccproxy.llms.formatters.common

ccproxy.llms.formatters.common

Shared helpers used by formatter adapters.

IndexedToolCallTracker

IndexedToolCallTracker()

Registry tracking streaming tool calls keyed by integer index.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[int, ToolCallState] = {}

ObfuscationTokenFactory

ObfuscationTokenFactory(fallback_identifier)

Utility for building deterministic obfuscation tokens.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self, fallback_identifier: Callable[[], str]) -> None:
    self._fallback_identifier = fallback_identifier

ReasoningBuffer

ReasoningBuffer()

Utility to manage reasoning text buffers keyed by item/summary ids.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, dict[Any, ReasoningPartState]] = {}

ReasoningPartState dataclass

ReasoningPartState(
    buffer=list(), signature=None, open=False
)

Mutable reasoning buffer for a specific summary segment.

ToolCallState dataclass

ToolCallState(
    id,
    index,
    call_id=None,
    item_id=None,
    name=None,
    arguments="",
    arguments_parts=list(),
    output_index=-1,
    emitted=False,
    initial_emitted=False,
    name_emitted=False,
    arguments_emitted=False,
    arguments_done_emitted=False,
    item_done_emitted=False,
    added_emitted=False,
    completed=False,
    final_arguments=None,
)

Mutable state for a single streaming tool call.

ToolCallTracker

ToolCallTracker()

Registry tracking streaming tool calls by item identifier.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, ToolCallState] = {}
    self._order: list[str] = []

ThinkingSegment dataclass

ThinkingSegment(thinking, signature=None)

Lightweight reasoning segment mirroring Anthropic's ThinkingBlock.

ensure_identifier

ensure_identifier(prefix, existing=None)

Return a stable identifier and suffix for the given prefix.

If an existing identifier already matches the prefix we reuse its suffix. Existing identifiers that begin with resp_ are also understood so both resp and alternate prefixes can build consistent derived identifiers.

Source code in ccproxy/llms/formatters/common/identifiers.py
def ensure_identifier(prefix: str, existing: str | None = None) -> tuple[str, str]:
    """Return a stable identifier and suffix for the given prefix.

    If an existing identifier already matches the prefix we reuse its suffix.
    Existing identifiers that begin with ``resp_`` are also understood so both
    ``resp`` and alternate prefixes can build consistent derived identifiers.
    """

    if isinstance(existing, str) and existing.startswith(f"{prefix}_"):
        return existing, normalize_suffix(existing)

    if (
        isinstance(existing, str)
        and existing
        and prefix == "resp"
        and existing.startswith("resp_")
    ):
        return existing, normalize_suffix(existing)

    if (
        isinstance(existing, str)
        and existing
        and existing.startswith("resp_")
        and prefix != "resp"
    ):
        suffix = normalize_suffix(existing)
        return f"{prefix}_{suffix}", suffix

    suffix = uuid.uuid4().hex
    return f"{prefix}_{suffix}", suffix

normalize_suffix

normalize_suffix(identifier)

Return the suffix part of an identifier split on the first underscore.

Source code in ccproxy/llms/formatters/common/identifiers.py
def normalize_suffix(identifier: str) -> str:
    """Return the suffix part of an identifier split on the first underscore."""

    if "_" in identifier:
        return identifier.split("_", 1)[1]
    return identifier

build_anthropic_tool_use_block

build_anthropic_tool_use_block(
    state, *, default_id=None, parser=None
)

Create an Anthropic ToolUseBlock from a tracked tool-call state.

Source code in ccproxy/llms/formatters/common/streams.py
def build_anthropic_tool_use_block(
    state: ToolCallState,
    *,
    default_id: str | None = None,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> anthropic_models.ToolUseBlock:
    """Create an Anthropic ToolUseBlock from a tracked tool-call state."""

    tool_id = state.item_id or state.call_id or default_id or f"call_{state.index}"
    arguments_text = (
        state.final_arguments or state.arguments or "".join(state.arguments_parts)
    )
    parse_input = parser or (lambda text: {"arguments": text} if text else {})
    input_payload = parse_input(arguments_text)

    return anthropic_models.ToolUseBlock(
        type="tool_use",
        id=str(tool_id),
        name=str(state.name or "tool"),
        input=input_payload,
    )

emit_anthropic_tool_use_events

emit_anthropic_tool_use_events(
    index, state, *, parser=None
)

Build start/delta/stop events for a tool-use block at the given index.

Per Anthropic streaming spec, tool_use.input starts empty in content_block_start and is filled via input_json_delta events; consumers that follow the spec (e.g. the Anthropic SDK) ignore an input attached directly to the start event.

Source code in ccproxy/llms/formatters/common/streams.py
def emit_anthropic_tool_use_events(
    index: int,
    state: ToolCallState,
    *,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> list[anthropic_models.MessageStreamEvent]:
    """Build start/delta/stop events for a tool-use block at the given index.

    Per Anthropic streaming spec, tool_use.input starts empty in
    content_block_start and is filled via input_json_delta events; consumers
    that follow the spec (e.g. the Anthropic SDK) ignore an input attached
    directly to the start event.
    """

    block = build_anthropic_tool_use_block(
        state,
        default_id=f"call_{state.index}",
        parser=parser,
    )
    input_payload = block.input or {}
    start_block = block.model_copy(update={"input": {}})
    partial_json = json.dumps(input_payload) if input_payload else ""

    events: list[anthropic_models.MessageStreamEvent] = [
        anthropic_models.ContentBlockStartEvent(
            type="content_block_start", index=index, content_block=start_block
        ),
    ]
    if partial_json:
        events.append(
            anthropic_models.ContentBlockDeltaEvent(
                type="content_block_delta",
                index=index,
                delta=anthropic_models.InputJsonDelta(
                    type="input_json_delta", partial_json=partial_json
                ),
            )
        )
    events.append(
        anthropic_models.ContentBlockStopEvent(type="content_block_stop", index=index)
    )
    return events

merge_thinking_segments

merge_thinking_segments(segments)

Collapse adjacent segments that share the same signature.

Source code in ccproxy/llms/formatters/common/thinking.py
def merge_thinking_segments(
    segments: Iterable[ThinkingSegment],
) -> list[ThinkingSegment]:
    """Collapse adjacent segments that share the same signature."""

    merged: list[ThinkingSegment] = []
    for segment in segments:
        text = segment.thinking if isinstance(segment.thinking, str) else None
        if not text:
            continue
        signature = segment.signature or None
        if merged and merged[-1].signature == signature:
            merged[-1] = ThinkingSegment(
                thinking=f"{merged[-1].thinking}{text}",
                signature=signature,
            )
        else:
            merged.append(ThinkingSegment(thinking=text, signature=signature))
    return merged

convert_anthropic_usage_to_openai_completion_usage

convert_anthropic_usage_to_openai_completion_usage(usage)

Translate Anthropic Usage values into OpenAI Completion usage.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_anthropic_usage_to_openai_completion_usage(
    usage: Any,
) -> openai_models.CompletionUsage:
    """Translate Anthropic Usage values into OpenAI Completion usage."""

    snapshot = anthropic_usage_snapshot(usage)
    cached_tokens = snapshot.cache_read_tokens or snapshot.cache_creation_tokens

    prompt_tokens_details = openai_models.PromptTokensDetails(
        cached_tokens=cached_tokens,
        audio_tokens=0,
    )
    completion_tokens_details = openai_models.CompletionTokensDetails(
        reasoning_tokens=0,
        audio_tokens=0,
        accepted_prediction_tokens=0,
        rejected_prediction_tokens=0,
    )

    return openai_models.CompletionUsage(
        prompt_tokens=snapshot.input_tokens,
        completion_tokens=snapshot.output_tokens,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
        prompt_tokens_details=prompt_tokens_details,
        completion_tokens_details=completion_tokens_details,
    )

convert_anthropic_usage_to_openai_responses_usage

convert_anthropic_usage_to_openai_responses_usage(usage)

Translate Anthropic Usage values into OpenAI Responses usage.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_anthropic_usage_to_openai_responses_usage(
    usage: Any,
) -> openai_models.ResponseUsage:
    """Translate Anthropic Usage values into OpenAI Responses usage."""

    snapshot = anthropic_usage_snapshot(usage)
    cached_tokens = snapshot.cache_read_tokens or snapshot.cache_creation_tokens

    input_tokens_details = openai_models.InputTokensDetails(cached_tokens=cached_tokens)
    output_tokens_details = openai_models.OutputTokensDetails(reasoning_tokens=0)

    return openai_models.ResponseUsage(
        input_tokens=snapshot.input_tokens,
        input_tokens_details=input_tokens_details,
        output_tokens=snapshot.output_tokens,
        output_tokens_details=output_tokens_details,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
    )

convert_openai_completion_usage_to_responses_usage

convert_openai_completion_usage_to_responses_usage(usage)

Map Completion usage payloads into Responses Usage structures.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_openai_completion_usage_to_responses_usage(
    usage: Any,
) -> openai_models.ResponseUsage:
    """Map Completion usage payloads into Responses Usage structures."""

    snapshot = openai_completion_usage_snapshot(usage)

    input_tokens_details = openai_models.InputTokensDetails(
        cached_tokens=snapshot.cache_read_tokens
    )
    output_tokens_details = openai_models.OutputTokensDetails(
        reasoning_tokens=snapshot.reasoning_tokens
    )

    return openai_models.ResponseUsage(
        input_tokens=snapshot.input_tokens,
        input_tokens_details=input_tokens_details,
        output_tokens=snapshot.output_tokens,
        output_tokens_details=output_tokens_details,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
    )

convert_openai_responses_usage_to_anthropic_usage

convert_openai_responses_usage_to_anthropic_usage(usage)

Translate OpenAI Responses usage into Anthropic Usage models.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_openai_responses_usage_to_anthropic_usage(
    usage: Any,
) -> anthropic_models.Usage:
    """Translate OpenAI Responses usage into Anthropic Usage models."""

    snapshot = openai_response_usage_snapshot(usage)

    return anthropic_models.Usage(
        input_tokens=snapshot.input_tokens,
        output_tokens=snapshot.output_tokens,
        cache_read_input_tokens=snapshot.cache_read_tokens,
        cache_creation_input_tokens=snapshot.cache_creation_tokens,
    )

convert_openai_responses_usage_to_completion_usage

convert_openai_responses_usage_to_completion_usage(usage)

Normalize Responses usage into the legacy CompletionUsage envelope.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_openai_responses_usage_to_completion_usage(
    usage: Any,
) -> openai_models.CompletionUsage:
    """Normalize Responses usage into the legacy CompletionUsage envelope."""

    snapshot = openai_response_usage_snapshot(usage)

    prompt_tokens_details = openai_models.PromptTokensDetails(
        cached_tokens=snapshot.cache_read_tokens,
        audio_tokens=0,
    )
    completion_tokens_details = openai_models.CompletionTokensDetails(
        reasoning_tokens=snapshot.reasoning_tokens,
        audio_tokens=0,
        accepted_prediction_tokens=0,
        rejected_prediction_tokens=0,
    )

    return openai_models.CompletionUsage(
        prompt_tokens=snapshot.input_tokens,
        completion_tokens=snapshot.output_tokens,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
        prompt_tokens_details=prompt_tokens_details,
        completion_tokens_details=completion_tokens_details,
    )