Skip to content

ccproxy.llms.formatters.common

ccproxy.llms.formatters.common

Shared helpers used by formatter adapters.

IndexedToolCallTracker

IndexedToolCallTracker()

Registry tracking streaming tool calls keyed by integer index.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[int, ToolCallState] = {}

ObfuscationTokenFactory

ObfuscationTokenFactory(fallback_identifier)

Utility for building deterministic obfuscation tokens.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self, fallback_identifier: Callable[[], str]) -> None:
    self._fallback_identifier = fallback_identifier

ReasoningBuffer

ReasoningBuffer()

Utility to manage reasoning text buffers keyed by item/summary ids.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, dict[Any, ReasoningPartState]] = {}

ReasoningPartState dataclass

ReasoningPartState(
    buffer=list(), signature=None, open=False
)

Mutable reasoning buffer for a specific summary segment.

ToolCallState dataclass

ToolCallState(
    id,
    index,
    call_id=None,
    item_id=None,
    name=None,
    arguments="",
    arguments_parts=list(),
    output_index=-1,
    emitted=False,
    initial_emitted=False,
    name_emitted=False,
    arguments_emitted=False,
    arguments_done_emitted=False,
    item_done_emitted=False,
    added_emitted=False,
    completed=False,
    final_arguments=None,
)

Mutable state for a single streaming tool call.

ToolCallTracker

ToolCallTracker()

Registry tracking streaming tool calls by item identifier.

Source code in ccproxy/llms/formatters/common/streams.py
def __init__(self) -> None:
    self._states: dict[str, ToolCallState] = {}
    self._order: list[str] = []

ThinkingSegment dataclass

ThinkingSegment(thinking, signature=None)

Lightweight reasoning segment mirroring Anthropic's ThinkingBlock.

ensure_identifier

ensure_identifier(prefix, existing=None)

Return a stable identifier and suffix for the given prefix.

If an existing identifier already matches the prefix we reuse its suffix. Existing identifiers that begin with resp_ are also understood so both resp and alternate prefixes can build consistent derived identifiers.

Source code in ccproxy/llms/formatters/common/identifiers.py
def ensure_identifier(prefix: str, existing: str | None = None) -> tuple[str, str]:
    """Return a stable identifier and suffix for the given prefix.

    If an existing identifier already matches the prefix we reuse its suffix.
    Existing identifiers that begin with ``resp_`` are also understood so both
    ``resp`` and alternate prefixes can build consistent derived identifiers.
    """

    if isinstance(existing, str) and existing.startswith(f"{prefix}_"):
        return existing, normalize_suffix(existing)

    if (
        isinstance(existing, str)
        and existing
        and prefix == "resp"
        and existing.startswith("resp_")
    ):
        return existing, normalize_suffix(existing)

    if (
        isinstance(existing, str)
        and existing
        and existing.startswith("resp_")
        and prefix != "resp"
    ):
        suffix = normalize_suffix(existing)
        return f"{prefix}_{suffix}", suffix

    suffix = uuid.uuid4().hex
    return f"{prefix}_{suffix}", suffix

normalize_suffix

normalize_suffix(identifier)

Return the suffix part of an identifier split on the first underscore.

Source code in ccproxy/llms/formatters/common/identifiers.py
def normalize_suffix(identifier: str) -> str:
    """Return the suffix part of an identifier split on the first underscore."""

    if "_" in identifier:
        return identifier.split("_", 1)[1]
    return identifier

build_anthropic_tool_use_block

build_anthropic_tool_use_block(
    state, *, default_id=None, parser=None
)

Create an Anthropic ToolUseBlock from a tracked tool-call state.

Source code in ccproxy/llms/formatters/common/streams.py
def build_anthropic_tool_use_block(
    state: ToolCallState,
    *,
    default_id: str | None = None,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> anthropic_models.ToolUseBlock:
    """Create an Anthropic ToolUseBlock from a tracked tool-call state."""

    tool_id = state.item_id or state.call_id or default_id or f"call_{state.index}"
    arguments_text = (
        state.final_arguments or state.arguments or "".join(state.arguments_parts)
    )
    parse_input = parser or (lambda text: {"arguments": text} if text else {})
    input_payload = parse_input(arguments_text)

    return anthropic_models.ToolUseBlock(
        type="tool_use",
        id=str(tool_id),
        name=str(state.name or "tool"),
        input=input_payload,
    )

emit_anthropic_tool_use_events

emit_anthropic_tool_use_events(
    index, state, *, parser=None
)

Build start/stop events for a tool-use block at the given index.

Source code in ccproxy/llms/formatters/common/streams.py
def emit_anthropic_tool_use_events(
    index: int,
    state: ToolCallState,
    *,
    parser: Callable[[str], dict[str, Any]] | None = None,
) -> list[anthropic_models.MessageStreamEvent]:
    """Build start/stop events for a tool-use block at the given index."""

    block = build_anthropic_tool_use_block(
        state,
        default_id=f"call_{state.index}",
        parser=parser,
    )
    return [
        anthropic_models.ContentBlockStartEvent(
            type="content_block_start", index=index, content_block=block
        ),
        anthropic_models.ContentBlockStopEvent(type="content_block_stop", index=index),
    ]

merge_thinking_segments

merge_thinking_segments(segments)

Collapse adjacent segments that share the same signature.

Source code in ccproxy/llms/formatters/common/thinking.py
def merge_thinking_segments(
    segments: Iterable[ThinkingSegment],
) -> list[ThinkingSegment]:
    """Collapse adjacent segments that share the same signature."""

    merged: list[ThinkingSegment] = []
    for segment in segments:
        text = segment.thinking if isinstance(segment.thinking, str) else None
        if not text:
            continue
        signature = segment.signature or None
        if merged and merged[-1].signature == signature:
            merged[-1] = ThinkingSegment(
                thinking=f"{merged[-1].thinking}{text}",
                signature=signature,
            )
        else:
            merged.append(ThinkingSegment(thinking=text, signature=signature))
    return merged

convert_anthropic_usage_to_openai_completion_usage

convert_anthropic_usage_to_openai_completion_usage(usage)

Translate Anthropic Usage values into OpenAI Completion usage.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_anthropic_usage_to_openai_completion_usage(
    usage: Any,
) -> openai_models.CompletionUsage:
    """Translate Anthropic Usage values into OpenAI Completion usage."""

    snapshot = anthropic_usage_snapshot(usage)
    cached_tokens = snapshot.cache_read_tokens or snapshot.cache_creation_tokens

    prompt_tokens_details = openai_models.PromptTokensDetails(
        cached_tokens=cached_tokens,
        audio_tokens=0,
    )
    completion_tokens_details = openai_models.CompletionTokensDetails(
        reasoning_tokens=0,
        audio_tokens=0,
        accepted_prediction_tokens=0,
        rejected_prediction_tokens=0,
    )

    return openai_models.CompletionUsage(
        prompt_tokens=snapshot.input_tokens,
        completion_tokens=snapshot.output_tokens,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
        prompt_tokens_details=prompt_tokens_details,
        completion_tokens_details=completion_tokens_details,
    )

convert_anthropic_usage_to_openai_responses_usage

convert_anthropic_usage_to_openai_responses_usage(usage)

Translate Anthropic Usage values into OpenAI Responses usage.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_anthropic_usage_to_openai_responses_usage(
    usage: Any,
) -> openai_models.ResponseUsage:
    """Translate Anthropic Usage values into OpenAI Responses usage."""

    snapshot = anthropic_usage_snapshot(usage)
    cached_tokens = snapshot.cache_read_tokens or snapshot.cache_creation_tokens

    input_tokens_details = openai_models.InputTokensDetails(cached_tokens=cached_tokens)
    output_tokens_details = openai_models.OutputTokensDetails(reasoning_tokens=0)

    return openai_models.ResponseUsage(
        input_tokens=snapshot.input_tokens,
        input_tokens_details=input_tokens_details,
        output_tokens=snapshot.output_tokens,
        output_tokens_details=output_tokens_details,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
    )

convert_openai_completion_usage_to_responses_usage

convert_openai_completion_usage_to_responses_usage(usage)

Map Completion usage payloads into Responses Usage structures.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_openai_completion_usage_to_responses_usage(
    usage: Any,
) -> openai_models.ResponseUsage:
    """Map Completion usage payloads into Responses Usage structures."""

    snapshot = openai_completion_usage_snapshot(usage)

    input_tokens_details = openai_models.InputTokensDetails(
        cached_tokens=snapshot.cache_read_tokens
    )
    output_tokens_details = openai_models.OutputTokensDetails(
        reasoning_tokens=snapshot.reasoning_tokens
    )

    return openai_models.ResponseUsage(
        input_tokens=snapshot.input_tokens,
        input_tokens_details=input_tokens_details,
        output_tokens=snapshot.output_tokens,
        output_tokens_details=output_tokens_details,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
    )

convert_openai_responses_usage_to_anthropic_usage

convert_openai_responses_usage_to_anthropic_usage(usage)

Translate OpenAI Responses usage into Anthropic Usage models.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_openai_responses_usage_to_anthropic_usage(
    usage: Any,
) -> anthropic_models.Usage:
    """Translate OpenAI Responses usage into Anthropic Usage models."""

    snapshot = openai_response_usage_snapshot(usage)

    return anthropic_models.Usage(
        input_tokens=snapshot.input_tokens,
        output_tokens=snapshot.output_tokens,
        cache_read_input_tokens=snapshot.cache_read_tokens,
        cache_creation_input_tokens=snapshot.cache_creation_tokens,
    )

convert_openai_responses_usage_to_completion_usage

convert_openai_responses_usage_to_completion_usage(usage)

Normalize Responses usage into the legacy CompletionUsage envelope.

Source code in ccproxy/llms/formatters/common/usage.py
def convert_openai_responses_usage_to_completion_usage(
    usage: Any,
) -> openai_models.CompletionUsage:
    """Normalize Responses usage into the legacy CompletionUsage envelope."""

    snapshot = openai_response_usage_snapshot(usage)

    prompt_tokens_details = openai_models.PromptTokensDetails(
        cached_tokens=snapshot.cache_read_tokens,
        audio_tokens=0,
    )
    completion_tokens_details = openai_models.CompletionTokensDetails(
        reasoning_tokens=snapshot.reasoning_tokens,
        audio_tokens=0,
        accepted_prediction_tokens=0,
        rejected_prediction_tokens=0,
    )

    return openai_models.CompletionUsage(
        prompt_tokens=snapshot.input_tokens,
        completion_tokens=snapshot.output_tokens,
        total_tokens=snapshot.input_tokens + snapshot.output_tokens,
        prompt_tokens_details=prompt_tokens_details,
        completion_tokens_details=completion_tokens_details,
    )